diff --git a/swh/loader/cvs/cvs2gitdump/cvs2gitdump.py b/swh/loader/cvs/cvs2gitdump/cvs2gitdump.py index 30b84a0..ece4ac8 100644 --- a/swh/loader/cvs/cvs2gitdump/cvs2gitdump.py +++ b/swh/loader/cvs/cvs2gitdump/cvs2gitdump.py @@ -1,672 +1,712 @@ #!/usr/local/bin/python # # Copyright (c) 2012 YASUOKA Masahiko # # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. # Usage # # First import: # % git init --bare /git/openbsd.git # % python cvs2gitdump.py -k OpenBSD -e openbsd.org /cvs/openbsd/src \ # > openbsd.dump # % git --git-dir /git/openbsd.git fast-import < openbsd.dump # # Periodic import: # % sudo cvsync # % python cvs2gitdump.py -k OpenBSD -e openbsd.org /cvs/openbsd/src \ # /git/openbsd.git > openbsd2.dump # % git --git-dir /git/openbsd.git fast-import < openbsd2.dump # import copy import getopt import os import re import subprocess import sys import time from typing import Dict, List, Optional, Tuple, TypeVar import swh.loader.cvs.rcsparse as rcsparse CHANGESET_FUZZ_SEC = 300 def usage(): print('usage: cvs2gitdump [-ah] [-z fuzz] [-e email_domain] ' '[-E log_encodings]\n' '\t[-k rcs_keywords] [-b branch] [-m module] [-l last_revision]\n' '\tcvsroot [git_dir]', file=sys.stderr) def main() -> None: email_domain = None do_incremental = False git_tip = None git_branch = 'master' dump_all = False log_encoding = 'utf-8,iso-8859-1' rcs = RcsKeywords() modules = [] last_revision = None fuzzsec = CHANGESET_FUZZ_SEC try: opts, args = getopt.getopt(sys.argv[1:], 'ab:hm:z:e:E:k:t:l:') for opt, v in opts: if opt == '-z': fuzzsec = int(v) elif opt == '-e': email_domain = v elif opt == '-a': dump_all = True elif opt == '-b': git_branch = v elif opt == '-E': log_encoding = v elif opt == '-k': rcs.add_id_keyword(v) elif opt == '-m': if v == '.git': print('Cannot handle the path named \'.git\'', file=sys.stderr) sys.exit(1) modules.append(v) elif opt == '-l': last_revision = v elif opt == '-h': usage() sys.exit(1) except getopt.GetoptError as msg: print(msg, file=sys.stderr) usage() sys.exit(1) if len(args) == 0 or len(args) > 2: usage() sys.exit(1) log_encodings = log_encoding.split(',') cvsroot = args[0] while cvsroot[-1] == '/': cvsroot = cvsroot[:-1] if len(args) == 2: do_incremental = True git = subprocess.Popen( ['git', '--git-dir=' + args[1], '-c', 'i18n.logOutputEncoding=UTF-8', 'log', '--max-count', '1', '--date=raw', '--format=%ae%n%ad%n%H', git_branch], encoding='utf-8', stdout=subprocess.PIPE) assert git.stdout is not None outs = git.stdout.readlines() git.wait() if git.returncode != 0: print("Couldn't exec git", file=sys.stderr) sys.exit(git.returncode) git_tip = outs[2].strip() if last_revision is not None: git = subprocess.Popen( ['git', '--git-dir=' + args[1], '-c', 'i18n.logOutputEncoding=UTF-8', 'log', '--max-count', '1', '--date=raw', '--format=%ae%n%ad%n%H', last_revision], encoding='utf-8', stdout=subprocess.PIPE) assert git.stdout is not None outs = git.stdout.readlines() git.wait() if git.returncode != 0: print("Coundn't exec git", file=sys.stderr) sys.exit(git.returncode) last_author = outs[0].strip() last_ctime = float(outs[1].split()[0]) # strip off the domain part from the last author since cvs doesn't have # the domain part. if do_incremental and email_domain is not None and \ last_author.lower().endswith(('@' + email_domain).lower()): last_author = last_author[:-1 * (1 + len(email_domain))] cvs = CvsConv(cvsroot, rcs, not do_incremental, fuzzsec) print('** walk cvs tree', file=sys.stderr) if len(modules) == 0: cvs.walk() else: for module in modules: cvs.walk(module) changesets = sorted(cvs.changesets) nchangesets = len(changesets) print('** cvs has %d changeset' % (nchangesets), file=sys.stderr) if nchangesets <= 0: sys.exit(0) if not dump_all: # don't use last 10 minutes for safety max_time_max = changesets[-1].max_time - 600 else: max_time_max = changesets[-1].max_time found_last_revision = False markseq = cvs.markseq extags = set() for k in changesets: if do_incremental and not found_last_revision: if k.min_time == last_ctime and k.author == last_author: found_last_revision = True for tag in k.tags: extags.add(tag) continue if k.max_time > max_time_max: break marks = {} for f in k.revs: if not do_incremental: marks[f.markseq] = f else: markseq = markseq + 1 git_dump_file(f.path, f.rev, rcs, markseq) marks[markseq] = f log = rcsparse.rcsfile(k.revs[0].path).getlog(k.revs[0].rev) for i, e in enumerate(log_encodings): try: how = 'ignore' if i == len(log_encodings) - 1 else 'strict' log_str = log.decode(e, how) break except UnicodeError: pass log = log_str.encode('utf-8', 'ignore') output('commit refs/heads/' + git_branch) markseq = markseq + 1 output('mark :%d' % (markseq)) email = k.author if email_domain is None \ else k.author + '@' + email_domain output('author %s <%s> %d +0000' % (k.author, email, k.min_time)) output('committer %s <%s> %d +0000' % (k.author, email, k.min_time)) output('data', len(log)) output(log, end='') if do_incremental and git_tip is not None: output('from', git_tip) git_tip = None for m in marks: f = marks[m] mode = 0o100755 if os.access(f.path, os.X_OK) else 0o100644 fn = file_path(cvs.cvsroot, f.path) if f.state == 'dead': output('D', fn) else: output('M %o :%d %s' % (mode, m, fn)) output('') for tag in k.tags: if tag in extags: continue output('reset refs/tags/%s' % (tag)) output('from :%d' % (markseq)) output('') if do_incremental and not found_last_revision: raise Exception('could not find the last revision') print('** dumped', file=sys.stderr) # # Encode by UTF-8 always for string objects since encoding for git-fast-import # is UTF-8. Also write without conversion for a bytes object (file bodies # might be various encodings) # def output(*args, end='\n') -> None: if len(args) == 0: pass elif len(args) > 1 or isinstance(args[0], str): lines = ' '.join( [arg if isinstance(arg, str) else str(arg) for arg in args]) sys.stdout.buffer.write(lines.encode('utf-8')) else: sys.stdout.buffer.write(args[0]) if len(end) > 0: sys.stdout.buffer.write(end.encode('utf-8')) class FileRevision: def __init__(self, path: str, rev: str, state: str, markseq: int) -> None: self.path = path self.rev = rev self.state = state self.markseq = markseq class ChangeSetKey: def __init__( self, branch: str, author, timestamp: int, log: bytes, commitid: Optional[str], fuzzsec: int ) -> None: self.branch = branch self.author = author self.min_time = timestamp self.max_time = timestamp self.commitid = commitid self.fuzzsec = fuzzsec self.revs: List[FileRevision] = [] self.tags: List[str] = [] self.log_hash = 0 h = 0 for c in log: h = 31 * h + c self.log_hash = h def __lt__(self, other) -> bool: return self._cmp(other) < 0 def __gt__(self, other) -> bool: return self._cmp(other) > 0 def __eq__(self, other) -> bool: return self._cmp(other) == 0 def __le__(self, other) -> bool: return self._cmp(other) <= 0 def __ge__(self, other) -> bool: return self._cmp(other) >= 0 def __ne__(self, other) -> bool: return self._cmp(other) != 0 def _cmp(self, anon) -> int: if not isinstance(anon, ChangeSetKey): raise TypeError() # compare by the commitid cid = _cmp2(self.commitid, anon.commitid) if cid == 0 and self.commitid is not None: # both have commitid and they are same return 0 # compare by the time ma = anon.min_time - self.max_time mi = self.min_time - anon.max_time ct = self.min_time - anon.min_time if ma > self.fuzzsec or mi > self.fuzzsec: return ct if cid != 0: # only one has the commitid, this means different commit return cid if ct == 0 else ct # compare by log, branch and author c = _cmp2(self.log_hash, anon.log_hash) if c == 0: c = _cmp2(self.branch, anon.branch) if c == 0: c = _cmp2(self.author, anon.author) if c == 0: return 0 return ct if ct != 0 else c def merge(self, anot: "ChangeSetKey") -> None: self.max_time = max(self.max_time, anot.max_time) self.min_time = min(self.min_time, anot.min_time) self.revs.extend(anot.revs) def __hash__(self) -> int: return hash(self.branch + '/' + self.author) * 31 + self.log_hash def put_file(self, path: str, rev: str, state: str, markseq: int): self.revs.append(FileRevision(path, rev, state, markseq)) TCmp = TypeVar("TCmp", int, str) def _cmp2(a: Optional[TCmp], b: Optional[TCmp]) -> int: _a = a is not None _b = b is not None return (a > b) - (a < b) if _a and _b else (_a > _b) - (_a < _b) # type: ignore class CvsConv: def __init__(self, cvsroot: str, rcs: "RcsKeywords", dumpfile: bool, fuzzsec: int) -> None: self.cvsroot = cvsroot self.rcs = rcs self.changesets: Dict[ChangeSetKey, ChangeSetKey] = dict() self.dumpfile = dumpfile self.markseq = 0 self.tags: Dict[str, ChangeSetKey] = dict() self.fuzzsec = fuzzsec def walk(self, module: Optional[str] =None) -> None: p = [self.cvsroot] if module is not None: p.append(module) path = os.path.join(*p) for root, dirs, files in os.walk(path): if '.git' in dirs: print('Ignore %s: cannot handle the path named \'.git\'' % ( root + os.sep + '.git'), file=sys.stderr) dirs.remove('.git') if '.git' in files: print('Ignore %s: cannot handle the path named \'.git\'' % ( root + os.sep + '.git'), file=sys.stderr) files.remove('.git') for f in files: if not f[-2:] == ',v': continue self.parse_file(root + os.sep + f) for t, c in list(self.tags.items()): c.tags.append(t) def parse_file(self, path: str) -> None: rtags: Dict[str, List[str]] = dict() rcsfile = rcsparse.rcsfile(path) branches = {'1': 'HEAD', '1.1.1': 'VENDOR'} for k, v_ in list(rcsfile.symbols.items()): r = v_.split('.') if len(r) == 3: branches[v_] = 'VENDOR' elif len(r) >= 3 and r[-2] == '0': branches['.'.join(r[:-2] + r[-1:])] = k if len(r) == 2 and branches[r[0]] == 'HEAD': if v_ not in rtags: rtags[v_] = list() rtags[v_].append(k) revs: List[Tuple[str, Tuple[str, int, str, str, List[str], str, str]]] = list(rcsfile.revs.items()) # sort by revision descending to priorize 1.1.1.1 than 1.1 revs.sort(key=lambda a: a[1][0], reverse=True) # sort by time revs.sort(key=lambda a: a[1][1]) novendor = False have_initial_revision = False last_vendor_status = None for k, v in revs: r = k.split('.') if len(r) == 4 and r[0] == '1' and r[1] == '1' and r[2] == '1' \ and r[3] == '1': if have_initial_revision: continue if v[3] == 'dead': continue last_vendor_status = v[3] have_initial_revision = True elif len(r) == 4 and r[0] == '1' and r[1] == '1' and r[2] == '1': if novendor: continue last_vendor_status = v[3] elif len(r) == 2: if r[0] == '1' and r[1] == '1': if have_initial_revision: continue if v[3] == 'dead': continue have_initial_revision = True elif r[0] == '1' and r[1] != '1': novendor = True if last_vendor_status == 'dead' and v[3] == 'dead': last_vendor_status = None continue last_vendor_status = None else: # trunk only continue if self.dumpfile: self.markseq = self.markseq + 1 git_dump_file(path, k, self.rcs, self.markseq) b = '.'.join(r[:-1]) try: a = ChangeSetKey( branches[b], v[2], v[1], rcsfile.getlog(v[0]), v[6], self.fuzzsec) except Exception as e: print('Aborted at %s %s' % (path, v[0]), file=sys.stderr) raise e a.put_file(path, k, v[3], self.markseq) while a in self.changesets: c = self.changesets[a] del self.changesets[a] c.merge(a) a = c self.changesets[a] = a if k in rtags: for t in rtags[k]: if t not in self.tags or \ self.tags[t].max_time < a.max_time: self.tags[t] = a def file_path(r: str, p: str) -> str: if r.endswith('/'): r = r[:-1] if p[-2:] == ',v': path = p[:-2] # drop ",v" else: path = p p_ = path.split('/') if len(p_) > 0 and p_[-2] == 'Attic': path = '/'.join(p_[:-2] + [p_[-1]]) if path.startswith(r): path = path[len(r) + 1:] return path def git_dump_file(path: str, k, rcs, markseq) -> None: try: cont = rcs.expand_keyword(path, rcsparse.rcsfile(path), k) except RuntimeError as msg: print('Unexpected runtime error on parsing', path, k, ':', msg, file=sys.stderr) print('unlimit the resource limit may fix this problem.', file=sys.stderr) sys.exit(1) output('blob') output('mark :%d' % markseq) output('data', len(cont)) output(cont) class RcsKeywords: RCS_KW_AUTHOR = (1 << 0) RCS_KW_DATE = (1 << 1) RCS_KW_LOG = (1 << 2) RCS_KW_NAME = (1 << 3) RCS_KW_RCSFILE = (1 << 4) RCS_KW_REVISION = (1 << 5) RCS_KW_SOURCE = (1 << 6) RCS_KW_STATE = (1 << 7) RCS_KW_FULLPATH = (1 << 8) RCS_KW_MDOCDATE = (1 << 9) RCS_KW_LOCKER = (1 << 10) RCS_KW_ID = (RCS_KW_RCSFILE | RCS_KW_REVISION | RCS_KW_DATE | RCS_KW_AUTHOR | RCS_KW_STATE) RCS_KW_HEADER = (RCS_KW_ID | RCS_KW_FULLPATH) rcs_expkw = { b"Author": RCS_KW_AUTHOR, b"Date": RCS_KW_DATE, b"Header": RCS_KW_HEADER, b"Id": RCS_KW_ID, b"Log": RCS_KW_LOG, b"Name": RCS_KW_NAME, b"RCSfile": RCS_KW_RCSFILE, b"Revision": RCS_KW_REVISION, b"Source": RCS_KW_SOURCE, b"State": RCS_KW_STATE, b"Mdocdate": RCS_KW_MDOCDATE, b"Locker": RCS_KW_LOCKER } RCS_KWEXP_NONE = (1 << 0) RCS_KWEXP_NAME = (1 << 1) # include keyword name RCS_KWEXP_VAL = (1 << 2) # include keyword value RCS_KWEXP_LKR = (1 << 3) # include name of locker RCS_KWEXP_OLD = (1 << 4) # generate old keyword string RCS_KWEXP_ERR = (1 << 5) # mode has an error RCS_KWEXP_DEFAULT = (RCS_KWEXP_NAME | RCS_KWEXP_VAL) RCS_KWEXP_KVL = (RCS_KWEXP_NAME | RCS_KWEXP_VAL | RCS_KWEXP_LKR) def __init__(self) -> None: self.rerecomple() def rerecomple(self) -> None: pat = b'|'.join(list(self.rcs_expkw.keys())) self.re_kw = re.compile(b".*?\\$(" + pat + b")[\\$:]") def add_id_keyword(self, keyword) -> None: self.rcs_expkw[keyword.encode('ascii')] = self.RCS_KW_ID self.rerecomple() def kflag_get(self, flags: Optional[str]) -> int: if flags is None: return self.RCS_KWEXP_DEFAULT fl = 0 for fc in flags: if fc == 'k': fl |= self.RCS_KWEXP_NAME elif fc == 'v': fl |= self.RCS_KWEXP_VAL elif fc == 'l': fl |= self.RCS_KWEXP_LKR elif fc == 'o': if len(flags) != 1: fl |= self.RCS_KWEXP_ERR fl |= self.RCS_KWEXP_OLD elif fc == 'b': if len(flags) != 1: fl |= self.RCS_KWEXP_ERR fl |= self.RCS_KWEXP_NONE else: fl |= self.RCS_KWEXP_ERR return fl def expand_keyword(self, filename: str, rcs: rcsparse.rcsfile, r: str) -> bytes: + """ + Check out a file with keywords expanded. Expansion rules are specific + to each keyword, and some cases specific to undocumented behaviour of CVS. + Our implementation does not expand some keywords (see comments in the code). + For a list of keywords and their expansion rules, see: + https://www.gnu.org/software/trans-coord/manual/cvs/cvs.html#Keyword-list + (also available in 'info cvs' if cvs is installed) + """ rev = rcs.revs[r] mode = self.kflag_get(rcs.expand) if (mode & (self.RCS_KWEXP_NONE | self.RCS_KWEXP_OLD)) != 0: return rcs.checkout(rev[0]) ret = [] - for line in rcs.checkout(rev[0]).split(b'\n'): + for line in rcs.checkout(rev[0]).splitlines(keepends=True): logbuf = None m = self.re_kw.match(line) if m is None: # No RCS Keywords, use it as it is - ret += [line] + ret.append(line) continue line0 = b'' while m is not None: + logbuf = None try: dsign = m.end(1) + line[m.end(1):].index(b'$') except ValueError: + # No RCS Keywords, use it as it is + ret.append(line) break prefix = line[:m.start(1) - 1] next_match_segment = copy.deepcopy(line[dsign:]) line = line[dsign + 1:] - line0 += prefix expbuf = '' if (mode & self.RCS_KWEXP_NAME) != 0: expbuf += '$' expbuf += m.group(1).decode('ascii') if (mode & self.RCS_KWEXP_VAL) != 0: expbuf += ': ' if (mode & self.RCS_KWEXP_VAL) != 0: expkw = self.rcs_expkw[m.group(1)] if (expkw & self.RCS_KW_RCSFILE) != 0: expbuf += filename \ if (expkw & self.RCS_KW_FULLPATH) != 0 \ else os.path.basename(filename) expbuf += " " if (expkw & self.RCS_KW_REVISION) != 0: expbuf += rev[0] expbuf += " " if (expkw & self.RCS_KW_DATE) != 0: expbuf += time.strftime( "%Y/%m/%d %H:%M:%S ", time.gmtime(rev[1])) if (expkw & self.RCS_KW_MDOCDATE) != 0: d = time.gmtime(rev[1]) expbuf += time.strftime( "%B%e %Y " if (d.tm_mday < 10) else "%B %e %Y ", d) if (expkw & self.RCS_KW_AUTHOR) != 0: expbuf += rev[2] expbuf += " " if (expkw & self.RCS_KW_STATE) != 0: expbuf += rev[3] expbuf += " " if (expkw & self.RCS_KW_LOG) != 0: + # Unlike other keywords, the Log keyword expands over multiple lines. + # The terminating '$' of the Log keyword appears on the line which + # contains the log keyword itself. Then follow all log message lines, + # and those lines are followed by content which follows the Log keyword. + # For example, the line: + # + # $Log$ content which follows + # + # must be expanded like this: + # + # $Log: delta,v $ + # Revision 1.2 2021/11/29 14:24:18 stsp + # log message line 1 + # log message line 2 + # content which follows + # + # If we did not trim the Log keyword's trailing "$" here then + # the last line would read instead: + # + # $ content which follows + assert(next_match_segment[0] == ord('$')) + next_match_segment = next_match_segment[1:] p = prefix expbuf += filename \ if (expkw & self.RCS_KW_FULLPATH) != 0 \ else os.path.basename(filename) expbuf += " " logbuf = p + ( 'Revision %s %s %s\n' % ( rev[0], time.strftime( "%Y/%m/%d %H:%M:%S", time.gmtime(rev[1])), rev[2])).encode('ascii') - for lline in rcs.getlog(rev[0]).rstrip().split(b'\n'): - if len(lline) == 0: - logbuf += p.rstrip() + b'\n' - else: - logbuf += p + lline.lstrip() + b'\n' - if len(line) == 0: - logbuf += p.rstrip() - else: - logbuf += p + line.lstrip() - line = b'' + for lline in rcs.getlog(rev[0]).splitlines(keepends=True): + logbuf += p + lline if (expkw & self.RCS_KW_SOURCE) != 0: expbuf += filename expbuf += " " if (expkw & (self.RCS_KW_NAME | self.RCS_KW_LOCKER)) != 0: + # We do not expand Name and Locker keywords. + # The Name keyword is only expanded when a file is checked + # out with an explicit tag name .perhaps this will be needed + # if the loader learns about CVS tags some day. + # The Locker keyword only expands if the file is currently + # locked via 'cvs admin -l', which is not part of the + # information we want to preserve about source code. expbuf += " " if (mode & self.RCS_KWEXP_NAME) != 0: expbuf += '$' - line0 += expbuf[:255].encode('ascii') + if logbuf is not None: + ret.append(prefix + expbuf.encode('ascii') + b'\n' + logbuf) + else: + line0 += prefix + expbuf[:255].encode('ascii') m = self.re_kw.match(next_match_segment) if m: line = next_match_segment - if (mode & self.RCS_KWEXP_NAME) != 0 and line0[-1] == ord('$'): + if (mode & self.RCS_KWEXP_NAME) != 0 and (expkw & self.RCS_KW_LOG) == 0 and line0[-1] == ord('$'): # There is another keyword on this line that needs expansion. # Avoid a double "$$" in the expanded string. This $ terminates # the previous keyword and marks the beginning of the next one. line0 = line0[:-1] - - ret += [line0 + line] - if logbuf is not None: - ret += [logbuf] - return b'\n'.join(ret) + elif logbuf is not None: + # Trim whitespace from the beginning of text following the Log keyword. + # But leave a lone trailing empty line as-is. Which seems inconsistent, + # but testing suggests that this matches CVS's behaviour. + if len(line) == 1 and line[0] == ord('\n'): + ret.append(line0 + prefix + line) + else: + ret.append(line0 + prefix + line.lstrip()) + else: + ret.append(line0 + line) + return b''.join(ret) # ---------------------------------------------------------------------- # entry point # ---------------------------------------------------------------------- if __name__ == '__main__': main() diff --git a/swh/loader/cvs/tests/data/greek-repository8.tgz b/swh/loader/cvs/tests/data/greek-repository8.tgz new file mode 100644 index 0000000..6eaa254 Binary files /dev/null and b/swh/loader/cvs/tests/data/greek-repository8.tgz differ diff --git a/swh/loader/cvs/tests/test_loader.py b/swh/loader/cvs/tests/test_loader.py index 3e8c1c4..7503891 100644 --- a/swh/loader/cvs/tests/test_loader.py +++ b/swh/loader/cvs/tests/test_loader.py @@ -1,861 +1,949 @@ # Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import os from typing import Any, Dict from swh.loader.cvs.loader import CvsLoader from swh.loader.tests import ( assert_last_visit_matches, check_snapshot, get_stats, prepare_repository_from_archive, ) from swh.model.hashutil import hash_to_bytes from swh.model.model import Snapshot, SnapshotBranch, TargetType RUNBABY_SNAPSHOT = Snapshot( id=hash_to_bytes("1cff69ab9bd70822d5e3006092f943ccaafdcf57"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("ef511d258fa55035c2bc2a5b05cad233cee1d328"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_not_found_no_mock(swh_storage, tmp_path): """Given an unknown repository, the loader visit ends up in status not_found""" unknown_repo_url = "unknown-repository" loader = CvsLoader(swh_storage, unknown_repo_url, cvsroot_path=tmp_path) assert loader.load() == {"status": "uneventful"} assert_last_visit_matches( swh_storage, unknown_repo_url, status="not_found", type="cvs", ) def test_loader_cvs_visit(swh_storage, datadir, tmp_path): """Eventful visit should yield 1 snapshot""" archive_name = "runbaby" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=RUNBABY_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 5, "directory": 2, "origin": 1, "origin_visit": 1, "release": 0, "revision": 1, "skipped_content": 0, "snapshot": 1, } check_snapshot(RUNBABY_SNAPSHOT, loader.storage) def test_loader_cvs_2_visits_no_change(swh_storage, datadir, tmp_path): """Eventful visit followed by uneventful visit should yield the same snapshot """ archive_name = "runbaby" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} visit_status1 = assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=RUNBABY_SNAPSHOT.id, ) loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "uneventful"} visit_status2 = assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=RUNBABY_SNAPSHOT.id, ) assert visit_status1.date < visit_status2.date assert visit_status1.snapshot == visit_status2.snapshot stats = get_stats(loader.storage) assert stats["origin_visit"] == 1 + 1 # computed twice the same snapshot assert stats["snapshot"] == 1 GREEK_SNAPSHOT = Snapshot( id=hash_to_bytes("5e74af67d69dfd7aea0eb118154d062f71f50120"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("e18b92f14cd5b3efb3fcb4ea46cfaf97f25f301b"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_with_file_additions_and_deletions(swh_storage, datadir, tmp_path): """Eventful conversion of history with file additions and deletions""" archive_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 8, "directory": 20, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 7, } check_snapshot(GREEK_SNAPSHOT, loader.storage) def test_loader_cvs_pserver_with_file_additions_and_deletions( swh_storage, datadir, tmp_path ): """Eventful CVS pserver conversion with file additions and deletions""" archive_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 8, "directory": 20, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 7, } check_snapshot(GREEK_SNAPSHOT, loader.storage) GREEK_SNAPSHOT2 = Snapshot( id=hash_to_bytes("048885ae2145ffe81588aea95dcf75c536ecdf26"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("55eb1438c03588607ce4b8db8f45e8e23075951b"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_2_visits_with_change(swh_storage, datadir, tmp_path): """Eventful visit followed by eventful visit should yield two snapshots""" archive_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} visit_status1 = assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 8, "directory": 20, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 7, } archive_name2 = "greek-repository2" archive_path2 = os.path.join(datadir, f"{archive_name2}.tgz") repo_url = prepare_repository_from_archive(archive_path2, archive_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} visit_status2 = assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT2.id, ) stats = get_stats(loader.storage) assert stats == { "content": 10, "directory": 23, "origin": 1, "origin_visit": 2, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 8, } check_snapshot(GREEK_SNAPSHOT2, loader.storage) assert visit_status1.date < visit_status2.date assert visit_status1.snapshot != visit_status2.snapshot def test_loader_cvs_visit_pserver(swh_storage, datadir, tmp_path): """Eventful visit to CVS pserver should yield 1 snapshot""" archive_name = "runbaby" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/runbaby" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=RUNBABY_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 5, "directory": 2, "origin": 1, "origin_visit": 1, "release": 0, "revision": 1, "skipped_content": 0, "snapshot": 1, } check_snapshot(RUNBABY_SNAPSHOT, loader.storage) GREEK_SNAPSHOT3 = Snapshot( id=hash_to_bytes("cd801546b0137c82f01b9b67848ba8261d64ebbb"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("14980990790ce1921db953c4c9ae03dd8861e8d6"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_visit_pserver_no_eol(swh_storage, datadir, tmp_path): """Visit to CVS pserver with file that lacks trailing eol""" archive_name = "greek-repository3" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT3.id, ) stats = get_stats(loader.storage) assert stats == { "content": 9, "directory": 23, "origin": 1, "origin_visit": 1, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 8, } check_snapshot(GREEK_SNAPSHOT3, loader.storage) GREEK_SNAPSHOT4 = Snapshot( id=hash_to_bytes("26e943053ea9c5f961336a72328cac22026ed3b5"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("ed784aff0e0743244bb1f30ba21c8abcd0d460ab"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_visit_expand_id_keyword(swh_storage, datadir, tmp_path): """Visit to CVS repository with file with an RCS Id keyword""" archive_name = "greek-repository4" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT4.id, ) stats = get_stats(loader.storage) assert stats == { "content": 12, "directory": 31, "origin": 1, "origin_visit": 1, "release": 0, "revision": 11, "skipped_content": 0, "snapshot": 11, } check_snapshot(GREEK_SNAPSHOT4, loader.storage) def test_loader_cvs_visit_pserver_expand_id_keyword(swh_storage, datadir, tmp_path): """Visit to CVS pserver with file with an RCS Id keyword""" archive_name = "greek-repository4" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT4.id, ) stats = get_stats(loader.storage) assert stats == { "content": 12, "directory": 31, "origin": 1, "origin_visit": 1, "release": 0, "revision": 11, "skipped_content": 0, "snapshot": 11, } check_snapshot(GREEK_SNAPSHOT4, loader.storage) GREEK_SNAPSHOT5 = Snapshot( id=hash_to_bytes("ee6faeaf50aa513c53c8ba29194116a5ef88add6"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("4320f152cc61ed660d25fdeebc787b3099e55a96"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_with_file_deleted_and_readded(swh_storage, datadir, tmp_path): """Eventful conversion of history with file deletion and re-addition""" archive_name = "greek-repository5" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT5.id, ) stats = get_stats(loader.storage) assert stats == { "content": 9, "directory": 22, "origin": 1, "origin_visit": 1, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 8, } check_snapshot(GREEK_SNAPSHOT5, loader.storage) def test_loader_cvs_pserver_with_file_deleted_and_readded( swh_storage, datadir, tmp_path ): """Eventful pserver conversion with file deletion and re-addition""" archive_name = "greek-repository5" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT5.id, ) stats = get_stats(loader.storage) assert stats == { "content": 9, "directory": 22, "origin": 1, "origin_visit": 1, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 8, } check_snapshot(GREEK_SNAPSHOT5, loader.storage) DINO_SNAPSHOT = Snapshot( id=hash_to_bytes("417021c16e17c5e0038cf0e73dbf48a6142c8304"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("df61a776c401a178cc796545849fc87bdadb2001"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_readded_file_in_attic(swh_storage, datadir, tmp_path): """Conversion of history with RCS files in the Attic""" # This repository has some file revisions marked "dead" in the Attic only. # This is different to the re-added file tests above, where the RCS file # was moved out of the Attic again as soon as the corresponding deleted # file was re-added. Failure to detect the "dead" file revisions in the # Attic would result in errors in our converted history. archive_name = "dino-readded-file" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/src" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=DINO_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 38, "directory": 105, "origin": 1, "origin_visit": 1, "release": 0, "revision": 35, "skipped_content": 0, "snapshot": 35, } check_snapshot(DINO_SNAPSHOT, loader.storage) def test_loader_cvs_pserver_readded_file_in_attic(swh_storage, datadir, tmp_path): """Conversion over pserver with RCS files in the Attic""" # This repository has some file revisions marked "dead" in the Attic only. # This is different to the re-added file tests above, where the RCS file # was moved out of the Attic again as soon as the corresponding deleted # file was re-added. Failure to detect the "dead" file revisions in the # Attic would result in errors in our converted history. # This has special implications for the pserver case, because the "dead" # revisions will not appear in in the output of 'cvs rlog' by default. archive_name = "dino-readded-file" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/src" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=DINO_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 38, "directory": 105, "origin": 1, "origin_visit": 1, "release": 0, "revision": 35, "skipped_content": 0, "snapshot": 35, } check_snapshot(DINO_SNAPSHOT, loader.storage) DINO_SNAPSHOT2 = Snapshot( id=hash_to_bytes("a9d6ce0b4f22dc4fd752ad4c25ec9ea71ed568d7"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("150616a2a3206f00a73f2d6a017dde22c52e4a83"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_split_commits_by_commitid(swh_storage, datadir, tmp_path): """Conversion of RCS history which needs to be split by commit ID""" # This repository has some file revisions which use the same log message # and can only be told apart by commit IDs. Without commit IDs, these commits # would get merged into a single commit in our conversion result. archive_name = "dino-commitid" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/dino" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=DINO_SNAPSHOT2.id, ) check_snapshot(DINO_SNAPSHOT2, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 18, "directory": 36, "origin": 1, "origin_visit": 1, "release": 0, "revision": 18, "skipped_content": 0, "snapshot": 18, } def test_loader_cvs_pserver_split_commits_by_commitid(swh_storage, datadir, tmp_path): """Conversion via pserver which needs to be split by commit ID""" # This repository has some file revisions which use the same log message # and can only be told apart by commit IDs. Without commit IDs, these commits # would get merged into a single commit in our conversion result. archive_name = "dino-commitid" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/dino" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=DINO_SNAPSHOT2.id, ) check_snapshot(DINO_SNAPSHOT2, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 18, "directory": 36, "origin": 1, "origin_visit": 1, "release": 0, "revision": 18, "skipped_content": 0, "snapshot": 18, } GREEK_SNAPSHOT6 = Snapshot( id=hash_to_bytes("b4c9423b2711c181251deb458d4ab4a3172948ac"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("f317c720e1929fec0afce10e6a8cfd24ef76dfc7"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_empty_lines_in_log_message(swh_storage, datadir, tmp_path): """Conversion of RCS history with empty lines in a log message""" archive_name = "greek-repository6" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT6.id, ) check_snapshot(GREEK_SNAPSHOT6, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 9, "directory": 22, "origin": 1, "origin_visit": 1, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 8, } def test_loader_cvs_pserver_empty_lines_in_log_message(swh_storage, datadir, tmp_path): """Conversion via pserver with empty lines in a log message""" archive_name = "greek-repository6" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT6.id, ) check_snapshot(GREEK_SNAPSHOT6, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 9, "directory": 22, "origin": 1, "origin_visit": 1, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 8, } def get_head_revision_paths_info(loader: CvsLoader) -> Dict[bytes, Dict[str, Any]]: assert loader.snapshot is not None root_dir = loader.snapshot.branches[b"HEAD"].target revision = loader.storage.revision_get([root_dir])[0] assert revision is not None paths = {} for entry in loader.storage.directory_ls(revision.directory, recursive=True): paths[entry["name"]] = entry return paths def test_loader_cvs_with_header_keyword(swh_storage, datadir, tmp_path): """Eventful conversion of history with Header keyword in a file""" archive_name = "greek-repository7" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} repo_url = f"fake://{repo_url[7:]}" loader2 = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader2.load() == {"status": "eventful"} # We cannot verify the snapshot ID. It is unpredicable due to use of the $Header$ # RCS keyword which contains the temporary directory where the repository is stored. expected_stats = { "content": 9, "directory": 22, "origin": 2, "origin_visit": 2, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 8, } stats = get_stats(loader.storage) assert stats == expected_stats stats = get_stats(loader2.storage) assert stats == expected_stats # Ensure that file 'alpha', which contains a $Header$ keyword, # was imported with equal content via file:// and fake:// URLs. paths = get_head_revision_paths_info(loader) paths2 = get_head_revision_paths_info(loader2) alpha = paths[b"greek-tree/alpha"] alpha2 = paths2[b"greek-tree/alpha"] assert alpha["sha1"] == alpha2["sha1"] + + +GREEK_SNAPSHOT8 = Snapshot( + id=hash_to_bytes("b98a2744199723be827d48bad2f65ee1c2df7513"), + branches={ + b"HEAD": SnapshotBranch( + target=hash_to_bytes("ee8be88b458b7fbca3037ab05e56552578e66faa"), + target_type=TargetType.REVISION, + ) + }, +) + + +def test_loader_cvs_expand_log_keyword(swh_storage, datadir, tmp_path): + """Conversion of RCS history with Log keyword in files""" + archive_name = "greek-repository8" + extracted_name = "greek-repository" + archive_path = os.path.join(datadir, f"{archive_name}.tgz") + repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) + repo_url += "/greek-tree" # CVS module name + + loader = CvsLoader( + swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) + ) + + assert loader.load() == {"status": "eventful"} + + assert_last_visit_matches( + loader.storage, + repo_url, + status="full", + type="cvs", + snapshot=GREEK_SNAPSHOT8.id, + ) + + check_snapshot(GREEK_SNAPSHOT8, loader.storage) + + stats = get_stats(loader.storage) + assert stats == { + "content": 14, + "directory": 31, + "origin": 1, + "origin_visit": 1, + "release": 0, + "revision": 11, + "skipped_content": 0, + "snapshot": 11, + } + + +def test_loader_cvs_pserver_expand_log_keyword(swh_storage, datadir, tmp_path): + """Conversion of RCS history with Log keyword in files""" + archive_name = "greek-repository8" + extracted_name = "greek-repository" + archive_path = os.path.join(datadir, f"{archive_name}.tgz") + repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) + repo_url += "/greek-tree" # CVS module name + + # Ask our cvsclient to connect via the 'cvs server' command + repo_url = f"fake://{repo_url[7:]}" + + loader = CvsLoader( + swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) + ) + + assert loader.load() == {"status": "eventful"} + + assert_last_visit_matches( + loader.storage, + repo_url, + status="full", + type="cvs", + snapshot=GREEK_SNAPSHOT8.id, + ) + + check_snapshot(GREEK_SNAPSHOT8, loader.storage) + + stats = get_stats(loader.storage) + assert stats == { + "content": 14, + "directory": 31, + "origin": 1, + "origin_visit": 1, + "release": 0, + "revision": 11, + "skipped_content": 0, + "snapshot": 11, + }