diff --git a/swh/loader/cvs/cvs2gitdump/cvs2gitdump.py b/swh/loader/cvs/cvs2gitdump/cvs2gitdump.py index ece4ac8..eb7e18d 100644 --- a/swh/loader/cvs/cvs2gitdump/cvs2gitdump.py +++ b/swh/loader/cvs/cvs2gitdump/cvs2gitdump.py @@ -1,712 +1,726 @@ #!/usr/local/bin/python # # Copyright (c) 2012 YASUOKA Masahiko # # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. # Usage # # First import: # % git init --bare /git/openbsd.git # % python cvs2gitdump.py -k OpenBSD -e openbsd.org /cvs/openbsd/src \ # > openbsd.dump # % git --git-dir /git/openbsd.git fast-import < openbsd.dump # # Periodic import: # % sudo cvsync # % python cvs2gitdump.py -k OpenBSD -e openbsd.org /cvs/openbsd/src \ # /git/openbsd.git > openbsd2.dump # % git --git-dir /git/openbsd.git fast-import < openbsd2.dump # import copy import getopt import os import re import subprocess import sys import time from typing import Dict, List, Optional, Tuple, TypeVar import swh.loader.cvs.rcsparse as rcsparse CHANGESET_FUZZ_SEC = 300 def usage(): print('usage: cvs2gitdump [-ah] [-z fuzz] [-e email_domain] ' '[-E log_encodings]\n' '\t[-k rcs_keywords] [-b branch] [-m module] [-l last_revision]\n' '\tcvsroot [git_dir]', file=sys.stderr) def main() -> None: email_domain = None do_incremental = False git_tip = None git_branch = 'master' dump_all = False log_encoding = 'utf-8,iso-8859-1' rcs = RcsKeywords() modules = [] last_revision = None fuzzsec = CHANGESET_FUZZ_SEC try: opts, args = getopt.getopt(sys.argv[1:], 'ab:hm:z:e:E:k:t:l:') for opt, v in opts: if opt == '-z': fuzzsec = int(v) elif opt == '-e': email_domain = v elif opt == '-a': dump_all = True elif opt == '-b': git_branch = v elif opt == '-E': log_encoding = v elif opt == '-k': rcs.add_id_keyword(v) elif opt == '-m': if v == '.git': print('Cannot handle the path named \'.git\'', file=sys.stderr) sys.exit(1) modules.append(v) elif opt == '-l': last_revision = v elif opt == '-h': usage() sys.exit(1) except getopt.GetoptError as msg: print(msg, file=sys.stderr) usage() sys.exit(1) if len(args) == 0 or len(args) > 2: usage() sys.exit(1) log_encodings = log_encoding.split(',') cvsroot = args[0] while cvsroot[-1] == '/': cvsroot = cvsroot[:-1] if len(args) == 2: do_incremental = True git = subprocess.Popen( ['git', '--git-dir=' + args[1], '-c', 'i18n.logOutputEncoding=UTF-8', 'log', '--max-count', '1', '--date=raw', '--format=%ae%n%ad%n%H', git_branch], encoding='utf-8', stdout=subprocess.PIPE) assert git.stdout is not None outs = git.stdout.readlines() git.wait() if git.returncode != 0: print("Couldn't exec git", file=sys.stderr) sys.exit(git.returncode) git_tip = outs[2].strip() if last_revision is not None: git = subprocess.Popen( ['git', '--git-dir=' + args[1], '-c', 'i18n.logOutputEncoding=UTF-8', 'log', '--max-count', '1', '--date=raw', '--format=%ae%n%ad%n%H', last_revision], encoding='utf-8', stdout=subprocess.PIPE) assert git.stdout is not None outs = git.stdout.readlines() git.wait() if git.returncode != 0: print("Coundn't exec git", file=sys.stderr) sys.exit(git.returncode) last_author = outs[0].strip() last_ctime = float(outs[1].split()[0]) # strip off the domain part from the last author since cvs doesn't have # the domain part. if do_incremental and email_domain is not None and \ last_author.lower().endswith(('@' + email_domain).lower()): last_author = last_author[:-1 * (1 + len(email_domain))] cvs = CvsConv(cvsroot, rcs, not do_incremental, fuzzsec) print('** walk cvs tree', file=sys.stderr) if len(modules) == 0: cvs.walk() else: for module in modules: cvs.walk(module) changesets = sorted(cvs.changesets) nchangesets = len(changesets) print('** cvs has %d changeset' % (nchangesets), file=sys.stderr) if nchangesets <= 0: sys.exit(0) if not dump_all: # don't use last 10 minutes for safety max_time_max = changesets[-1].max_time - 600 else: max_time_max = changesets[-1].max_time found_last_revision = False markseq = cvs.markseq extags = set() for k in changesets: if do_incremental and not found_last_revision: if k.min_time == last_ctime and k.author == last_author: found_last_revision = True for tag in k.tags: extags.add(tag) continue if k.max_time > max_time_max: break marks = {} for f in k.revs: if not do_incremental: marks[f.markseq] = f else: markseq = markseq + 1 git_dump_file(f.path, f.rev, rcs, markseq) marks[markseq] = f log = rcsparse.rcsfile(k.revs[0].path).getlog(k.revs[0].rev) for i, e in enumerate(log_encodings): try: how = 'ignore' if i == len(log_encodings) - 1 else 'strict' log_str = log.decode(e, how) break except UnicodeError: pass log = log_str.encode('utf-8', 'ignore') output('commit refs/heads/' + git_branch) markseq = markseq + 1 output('mark :%d' % (markseq)) email = k.author if email_domain is None \ else k.author + '@' + email_domain output('author %s <%s> %d +0000' % (k.author, email, k.min_time)) output('committer %s <%s> %d +0000' % (k.author, email, k.min_time)) output('data', len(log)) output(log, end='') if do_incremental and git_tip is not None: output('from', git_tip) git_tip = None for m in marks: f = marks[m] mode = 0o100755 if os.access(f.path, os.X_OK) else 0o100644 fn = file_path(cvs.cvsroot, f.path) if f.state == 'dead': output('D', fn) else: output('M %o :%d %s' % (mode, m, fn)) output('') for tag in k.tags: if tag in extags: continue output('reset refs/tags/%s' % (tag)) output('from :%d' % (markseq)) output('') if do_incremental and not found_last_revision: raise Exception('could not find the last revision') print('** dumped', file=sys.stderr) # # Encode by UTF-8 always for string objects since encoding for git-fast-import # is UTF-8. Also write without conversion for a bytes object (file bodies # might be various encodings) # def output(*args, end='\n') -> None: if len(args) == 0: pass elif len(args) > 1 or isinstance(args[0], str): lines = ' '.join( [arg if isinstance(arg, str) else str(arg) for arg in args]) sys.stdout.buffer.write(lines.encode('utf-8')) else: sys.stdout.buffer.write(args[0]) if len(end) > 0: sys.stdout.buffer.write(end.encode('utf-8')) class FileRevision: def __init__(self, path: str, rev: str, state: str, markseq: int) -> None: self.path = path self.rev = rev self.state = state self.markseq = markseq class ChangeSetKey: def __init__( self, branch: str, author, timestamp: int, log: bytes, commitid: Optional[str], fuzzsec: int ) -> None: self.branch = branch self.author = author self.min_time = timestamp self.max_time = timestamp self.commitid = commitid self.fuzzsec = fuzzsec self.revs: List[FileRevision] = [] self.tags: List[str] = [] self.log_hash = 0 h = 0 for c in log: h = 31 * h + c self.log_hash = h def __lt__(self, other) -> bool: return self._cmp(other) < 0 def __gt__(self, other) -> bool: return self._cmp(other) > 0 def __eq__(self, other) -> bool: return self._cmp(other) == 0 def __le__(self, other) -> bool: return self._cmp(other) <= 0 def __ge__(self, other) -> bool: return self._cmp(other) >= 0 def __ne__(self, other) -> bool: return self._cmp(other) != 0 def _cmp(self, anon) -> int: if not isinstance(anon, ChangeSetKey): raise TypeError() # compare by the commitid cid = _cmp2(self.commitid, anon.commitid) if cid == 0 and self.commitid is not None: # both have commitid and they are same return 0 # compare by the time ma = anon.min_time - self.max_time mi = self.min_time - anon.max_time ct = self.min_time - anon.min_time if ma > self.fuzzsec or mi > self.fuzzsec: return ct if cid != 0: # only one has the commitid, this means different commit return cid if ct == 0 else ct # compare by log, branch and author c = _cmp2(self.log_hash, anon.log_hash) if c == 0: c = _cmp2(self.branch, anon.branch) if c == 0: c = _cmp2(self.author, anon.author) if c == 0: return 0 return ct if ct != 0 else c def merge(self, anot: "ChangeSetKey") -> None: self.max_time = max(self.max_time, anot.max_time) self.min_time = min(self.min_time, anot.min_time) self.revs.extend(anot.revs) def __hash__(self) -> int: return hash(self.branch + '/' + self.author) * 31 + self.log_hash def put_file(self, path: str, rev: str, state: str, markseq: int): self.revs.append(FileRevision(path, rev, state, markseq)) TCmp = TypeVar("TCmp", int, str) def _cmp2(a: Optional[TCmp], b: Optional[TCmp]) -> int: _a = a is not None _b = b is not None return (a > b) - (a < b) if _a and _b else (_a > _b) - (_a < _b) # type: ignore class CvsConv: def __init__(self, cvsroot: str, rcs: "RcsKeywords", dumpfile: bool, fuzzsec: int) -> None: self.cvsroot = cvsroot self.rcs = rcs self.changesets: Dict[ChangeSetKey, ChangeSetKey] = dict() self.dumpfile = dumpfile self.markseq = 0 self.tags: Dict[str, ChangeSetKey] = dict() self.fuzzsec = fuzzsec def walk(self, module: Optional[str] =None) -> None: p = [self.cvsroot] if module is not None: p.append(module) path = os.path.join(*p) for root, dirs, files in os.walk(path): if '.git' in dirs: print('Ignore %s: cannot handle the path named \'.git\'' % ( root + os.sep + '.git'), file=sys.stderr) dirs.remove('.git') if '.git' in files: print('Ignore %s: cannot handle the path named \'.git\'' % ( root + os.sep + '.git'), file=sys.stderr) files.remove('.git') for f in files: if not f[-2:] == ',v': continue self.parse_file(root + os.sep + f) for t, c in list(self.tags.items()): c.tags.append(t) def parse_file(self, path: str) -> None: rtags: Dict[str, List[str]] = dict() rcsfile = rcsparse.rcsfile(path) branches = {'1': 'HEAD', '1.1.1': 'VENDOR'} for k, v_ in list(rcsfile.symbols.items()): r = v_.split('.') if len(r) == 3: branches[v_] = 'VENDOR' elif len(r) >= 3 and r[-2] == '0': branches['.'.join(r[:-2] + r[-1:])] = k if len(r) == 2 and branches[r[0]] == 'HEAD': if v_ not in rtags: rtags[v_] = list() rtags[v_].append(k) revs: List[Tuple[str, Tuple[str, int, str, str, List[str], str, str]]] = list(rcsfile.revs.items()) # sort by revision descending to priorize 1.1.1.1 than 1.1 revs.sort(key=lambda a: a[1][0], reverse=True) # sort by time revs.sort(key=lambda a: a[1][1]) novendor = False have_initial_revision = False last_vendor_status = None for k, v in revs: r = k.split('.') if len(r) == 4 and r[0] == '1' and r[1] == '1' and r[2] == '1' \ and r[3] == '1': if have_initial_revision: continue if v[3] == 'dead': continue last_vendor_status = v[3] have_initial_revision = True elif len(r) == 4 and r[0] == '1' and r[1] == '1' and r[2] == '1': if novendor: continue last_vendor_status = v[3] elif len(r) == 2: if r[0] == '1' and r[1] == '1': if have_initial_revision: continue if v[3] == 'dead': continue have_initial_revision = True elif r[0] == '1' and r[1] != '1': novendor = True if last_vendor_status == 'dead' and v[3] == 'dead': last_vendor_status = None continue last_vendor_status = None else: # trunk only continue if self.dumpfile: self.markseq = self.markseq + 1 git_dump_file(path, k, self.rcs, self.markseq) b = '.'.join(r[:-1]) try: a = ChangeSetKey( branches[b], v[2], v[1], rcsfile.getlog(v[0]), v[6], self.fuzzsec) except Exception as e: print('Aborted at %s %s' % (path, v[0]), file=sys.stderr) raise e a.put_file(path, k, v[3], self.markseq) while a in self.changesets: c = self.changesets[a] del self.changesets[a] c.merge(a) a = c self.changesets[a] = a if k in rtags: for t in rtags[k]: if t not in self.tags or \ self.tags[t].max_time < a.max_time: self.tags[t] = a def file_path(r: str, p: str) -> str: if r.endswith('/'): r = r[:-1] if p[-2:] == ',v': path = p[:-2] # drop ",v" else: path = p p_ = path.split('/') if len(p_) > 0 and p_[-2] == 'Attic': path = '/'.join(p_[:-2] + [p_[-1]]) if path.startswith(r): path = path[len(r) + 1:] return path def git_dump_file(path: str, k, rcs, markseq) -> None: try: - cont = rcs.expand_keyword(path, rcsparse.rcsfile(path), k) + cont = rcs.expand_keyword(path, rcsparse.rcsfile(path), k, []) except RuntimeError as msg: print('Unexpected runtime error on parsing', path, k, ':', msg, file=sys.stderr) print('unlimit the resource limit may fix this problem.', file=sys.stderr) sys.exit(1) output('blob') output('mark :%d' % markseq) output('data', len(cont)) output(cont) class RcsKeywords: RCS_KW_AUTHOR = (1 << 0) RCS_KW_DATE = (1 << 1) RCS_KW_LOG = (1 << 2) RCS_KW_NAME = (1 << 3) RCS_KW_RCSFILE = (1 << 4) RCS_KW_REVISION = (1 << 5) RCS_KW_SOURCE = (1 << 6) RCS_KW_STATE = (1 << 7) RCS_KW_FULLPATH = (1 << 8) RCS_KW_MDOCDATE = (1 << 9) RCS_KW_LOCKER = (1 << 10) RCS_KW_ID = (RCS_KW_RCSFILE | RCS_KW_REVISION | RCS_KW_DATE | RCS_KW_AUTHOR | RCS_KW_STATE) RCS_KW_HEADER = (RCS_KW_ID | RCS_KW_FULLPATH) rcs_expkw = { b"Author": RCS_KW_AUTHOR, b"Date": RCS_KW_DATE, b"Header": RCS_KW_HEADER, b"Id": RCS_KW_ID, b"Log": RCS_KW_LOG, b"Name": RCS_KW_NAME, b"RCSfile": RCS_KW_RCSFILE, b"Revision": RCS_KW_REVISION, b"Source": RCS_KW_SOURCE, b"State": RCS_KW_STATE, b"Mdocdate": RCS_KW_MDOCDATE, b"Locker": RCS_KW_LOCKER } RCS_KWEXP_NONE = (1 << 0) RCS_KWEXP_NAME = (1 << 1) # include keyword name RCS_KWEXP_VAL = (1 << 2) # include keyword value RCS_KWEXP_LKR = (1 << 3) # include name of locker RCS_KWEXP_OLD = (1 << 4) # generate old keyword string RCS_KWEXP_ERR = (1 << 5) # mode has an error RCS_KWEXP_DEFAULT = (RCS_KWEXP_NAME | RCS_KWEXP_VAL) RCS_KWEXP_KVL = (RCS_KWEXP_NAME | RCS_KWEXP_VAL | RCS_KWEXP_LKR) def __init__(self) -> None: self.rerecomple() def rerecomple(self) -> None: pat = b'|'.join(list(self.rcs_expkw.keys())) self.re_kw = re.compile(b".*?\\$(" + pat + b")[\\$:]") def add_id_keyword(self, keyword) -> None: self.rcs_expkw[keyword.encode('ascii')] = self.RCS_KW_ID self.rerecomple() def kflag_get(self, flags: Optional[str]) -> int: if flags is None: return self.RCS_KWEXP_DEFAULT fl = 0 for fc in flags: if fc == 'k': fl |= self.RCS_KWEXP_NAME elif fc == 'v': fl |= self.RCS_KWEXP_VAL elif fc == 'l': fl |= self.RCS_KWEXP_LKR elif fc == 'o': if len(flags) != 1: fl |= self.RCS_KWEXP_ERR fl |= self.RCS_KWEXP_OLD elif fc == 'b': if len(flags) != 1: fl |= self.RCS_KWEXP_ERR fl |= self.RCS_KWEXP_NONE else: fl |= self.RCS_KWEXP_ERR return fl - def expand_keyword(self, filename: str, rcs: rcsparse.rcsfile, r: str) -> bytes: + def expand_keyword(self, filename: str, rcs: rcsparse.rcsfile, r: str, excluded_keywords: List[str]) -> bytes: """ Check out a file with keywords expanded. Expansion rules are specific to each keyword, and some cases specific to undocumented behaviour of CVS. Our implementation does not expand some keywords (see comments in the code). For a list of keywords and their expansion rules, see: https://www.gnu.org/software/trans-coord/manual/cvs/cvs.html#Keyword-list (also available in 'info cvs' if cvs is installed) """ rev = rcs.revs[r] mode = self.kflag_get(rcs.expand) if (mode & (self.RCS_KWEXP_NONE | self.RCS_KWEXP_OLD)) != 0: return rcs.checkout(rev[0]) ret = [] for line in rcs.checkout(rev[0]).splitlines(keepends=True): logbuf = None m = self.re_kw.match(line) if m is None: # No RCS Keywords, use it as it is ret.append(line) continue line0 = b'' while m is not None: logbuf = None try: dsign = m.end(1) + line[m.end(1):].index(b'$') except ValueError: # No RCS Keywords, use it as it is ret.append(line) break prefix = line[:m.start(1) - 1] next_match_segment = copy.deepcopy(line[dsign:]) - line = line[dsign + 1:] expbuf = '' + try: + kwname = m.group(1).decode('ascii') + except UnicodeDecodeError: + # Not a valid RCS keyword, use it as it is + ret.append(line) + break + if kwname in excluded_keywords: + line0 += prefix + m.group(1) + m = self.re_kw.match(next_match_segment) + if m: + line = next_match_segment + continue + else: + ret.append(line0 + line[dsign + 1:]) + break + line = line[dsign + 1:] if (mode & self.RCS_KWEXP_NAME) != 0: - expbuf += '$' - expbuf += m.group(1).decode('ascii') + expbuf += '$%s' % kwname if (mode & self.RCS_KWEXP_VAL) != 0: expbuf += ': ' if (mode & self.RCS_KWEXP_VAL) != 0: expkw = self.rcs_expkw[m.group(1)] if (expkw & self.RCS_KW_RCSFILE) != 0: expbuf += filename \ if (expkw & self.RCS_KW_FULLPATH) != 0 \ else os.path.basename(filename) expbuf += " " if (expkw & self.RCS_KW_REVISION) != 0: expbuf += rev[0] expbuf += " " if (expkw & self.RCS_KW_DATE) != 0: expbuf += time.strftime( "%Y/%m/%d %H:%M:%S ", time.gmtime(rev[1])) if (expkw & self.RCS_KW_MDOCDATE) != 0: d = time.gmtime(rev[1]) expbuf += time.strftime( "%B%e %Y " if (d.tm_mday < 10) else "%B %e %Y ", d) if (expkw & self.RCS_KW_AUTHOR) != 0: expbuf += rev[2] expbuf += " " if (expkw & self.RCS_KW_STATE) != 0: expbuf += rev[3] expbuf += " " if (expkw & self.RCS_KW_LOG) != 0: # Unlike other keywords, the Log keyword expands over multiple lines. # The terminating '$' of the Log keyword appears on the line which # contains the log keyword itself. Then follow all log message lines, # and those lines are followed by content which follows the Log keyword. # For example, the line: # # $Log$ content which follows # # must be expanded like this: # # $Log: delta,v $ # Revision 1.2 2021/11/29 14:24:18 stsp # log message line 1 # log message line 2 # content which follows # # If we did not trim the Log keyword's trailing "$" here then # the last line would read instead: # # $ content which follows assert(next_match_segment[0] == ord('$')) next_match_segment = next_match_segment[1:] p = prefix expbuf += filename \ if (expkw & self.RCS_KW_FULLPATH) != 0 \ else os.path.basename(filename) expbuf += " " logbuf = p + ( 'Revision %s %s %s\n' % ( rev[0], time.strftime( "%Y/%m/%d %H:%M:%S", time.gmtime(rev[1])), rev[2])).encode('ascii') for lline in rcs.getlog(rev[0]).splitlines(keepends=True): logbuf += p + lline if (expkw & self.RCS_KW_SOURCE) != 0: expbuf += filename expbuf += " " if (expkw & (self.RCS_KW_NAME | self.RCS_KW_LOCKER)) != 0: # We do not expand Name and Locker keywords. # The Name keyword is only expanded when a file is checked # out with an explicit tag name .perhaps this will be needed # if the loader learns about CVS tags some day. # The Locker keyword only expands if the file is currently # locked via 'cvs admin -l', which is not part of the # information we want to preserve about source code. expbuf += " " if (mode & self.RCS_KWEXP_NAME) != 0: expbuf += '$' if logbuf is not None: ret.append(prefix + expbuf.encode('ascii') + b'\n' + logbuf) else: line0 += prefix + expbuf[:255].encode('ascii') m = self.re_kw.match(next_match_segment) if m: line = next_match_segment if (mode & self.RCS_KWEXP_NAME) != 0 and (expkw & self.RCS_KW_LOG) == 0 and line0[-1] == ord('$'): # There is another keyword on this line that needs expansion. # Avoid a double "$$" in the expanded string. This $ terminates # the previous keyword and marks the beginning of the next one. line0 = line0[:-1] elif logbuf is not None: # Trim whitespace from the beginning of text following the Log keyword. # But leave a lone trailing empty line as-is. Which seems inconsistent, # but testing suggests that this matches CVS's behaviour. if len(line) == 1 and line[0] == ord('\n'): ret.append(line0 + prefix + line) else: ret.append(line0 + prefix + line.lstrip()) else: ret.append(line0 + line) return b''.join(ret) # ---------------------------------------------------------------------- # entry point # ---------------------------------------------------------------------- if __name__ == '__main__': main() diff --git a/swh/loader/cvs/loader.py b/swh/loader/cvs/loader.py index 2e1ae68..b752247 100644 --- a/swh/loader/cvs/loader.py +++ b/swh/loader/cvs/loader.py @@ -1,555 +1,615 @@ # Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information """Loader in charge of injecting either new or existing cvs repositories to swh-storage. """ from datetime import datetime import os import os.path import subprocess import tempfile import time from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Sequence, Tuple, cast from urllib3.util import parse_url from swh.loader.core.loader import BaseLoader from swh.loader.core.utils import clean_dangling_folders from swh.loader.cvs.cvs2gitdump.cvs2gitdump import ( CHANGESET_FUZZ_SEC, ChangeSetKey, CvsConv, FileRevision, RcsKeywords, file_path, ) from swh.loader.cvs.cvsclient import CVSClient import swh.loader.cvs.rcsparse as rcsparse from swh.loader.cvs.rlog import RlogConv from swh.loader.exception import NotFound from swh.model import from_disk, hashutil from swh.model.model import ( Content, Directory, Origin, Person, Revision, RevisionType, Sha1Git, SkippedContent, Snapshot, SnapshotBranch, TargetType, TimestampWithTimezone, ) from swh.storage.algos.snapshot import snapshot_get_latest from swh.storage.interface import StorageInterface DEFAULT_BRANCH = b"HEAD" TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.cvs." class CvsLoader(BaseLoader): """Swh cvs loader. The repository is local. The loader deals with update on an already previously loaded repository. """ visit_type = "cvs" cvs_module_name: str cvsclient: CVSClient # remote CVS repository access (history is parsed from CVS rlog): rlog_file: BinaryIO swh_revision_gen: Iterator[ Tuple[List[Content], List[SkippedContent], List[Directory], Revision] ] def __init__( self, storage: StorageInterface, url: str, origin_url: Optional[str] = None, visit_date: Optional[datetime] = None, cvsroot_path: Optional[str] = None, temp_directory: str = "/tmp", max_content_size: Optional[int] = None, ): super().__init__( storage=storage, logging_class="swh.loader.cvs.CvsLoader", max_content_size=max_content_size, ) self.cvsroot_url = url # origin url as unique identifier for origin in swh archive self.origin_url = origin_url if origin_url else self.cvsroot_url self.temp_directory = temp_directory # internal state used to store swh objects self._contents: List[Content] = [] self._skipped_contents: List[SkippedContent] = [] self._directories: List[Directory] = [] self._revisions: List[Revision] = [] # internal state, current visit self._last_revision: Optional[Revision] = None self._visit_status = "full" self.visit_date = visit_date self.cvsroot_path = cvsroot_path + self.custom_id_keyword = None + self.excluded_keywords: List[str] = [] self.snapshot: Optional[Snapshot] = None self.last_snapshot: Optional[Snapshot] = snapshot_get_latest( self.storage, self.origin_url ) def compute_swh_revision( self, k: ChangeSetKey, logmsg: Optional[bytes] ) -> Tuple[Revision, from_disk.Directory]: """Compute swh hash data per CVS changeset. Returns: tuple (rev, swh_directory) - rev: current SWH revision computed from checked out work tree - swh_directory: dictionary of path, swh hash data with type """ # Compute SWH revision from the on-disk state swh_dir = from_disk.Directory.from_disk(path=os.fsencode(self.worktree_path)) parents: Tuple[Sha1Git, ...] if self._last_revision: parents = (self._last_revision.id,) else: parents = () revision = self.build_swh_revision(k, logmsg, swh_dir.hash, parents) self.log.info("SWH revision ID: %s", hashutil.hash_to_hex(revision.id)) self._last_revision = revision return (revision, swh_dir) def checkout_file_with_rcsparse( self, k: ChangeSetKey, f: FileRevision, rcsfile: rcsparse.rcsfile ) -> None: assert self.cvsroot_path assert self.server_style_cvsroot path = file_path(self.cvsroot_path, f.path) wtpath = os.path.join(self.tempdir_path, path) self.log.info("rev %s state %s file %s" % (f.rev, f.state, f.path)) if f.state == "dead": # remove this file from work tree try: os.remove(wtpath) except FileNotFoundError: pass else: # create, or update, this file in the work tree if not rcsfile: rcsfile = rcsparse.rcsfile(f.path) rcs = RcsKeywords() # We try our best to generate the same commit hashes over both pserver # and rsync. To avoid differences in file content due to expansion of # RCS keywords which contain absolute file paths (such as "Header"), # attempt to expand such paths in the same way as a regular CVS server # would expand them. # Whether this will avoid content differences depends on pserver and # rsync servers exposing the same server-side path to the CVS repository. # However, this is the best we can do, and only matters if an origin can # be fetched over both pserver and rsync. Each will still be treated as # a distinct origin, but will hopefully point at the same SWH snapshot. # In any case, an absolute path based on the origin URL looks nicer than # an absolute path based on a temporary directory used by the CVS loader. server_style_path = f.path.replace( self.cvsroot_path, self.server_style_cvsroot ) if server_style_path[0] != "/": server_style_path = "/" + server_style_path - contents = rcs.expand_keyword(server_style_path, rcsfile, f.rev) + if self.custom_id_keyword is not None: + rcs.add_id_keyword(self.custom_id_keyword) + contents = rcs.expand_keyword( + server_style_path, rcsfile, f.rev, self.excluded_keywords + ) os.makedirs(os.path.dirname(wtpath), exist_ok=True) outfile = open(wtpath, mode="wb") outfile.write(contents) outfile.close() def checkout_file_with_cvsclient( self, k: ChangeSetKey, f: FileRevision, cvsclient: CVSClient ): assert self.cvsroot_path path = file_path(self.cvsroot_path, f.path) wtpath = os.path.join(self.tempdir_path, path) self.log.info("rev %s state %s file %s" % (f.rev, f.state, f.path)) if f.state == "dead": # remove this file from work tree try: os.remove(wtpath) except FileNotFoundError: pass else: dirname = os.path.dirname(wtpath) os.makedirs(dirname, exist_ok=True) self.log.debug("checkout to %s\n" % wtpath) fp = cvsclient.checkout(path, f.rev, dirname, expand_keywords=True) os.rename(fp.name, wtpath) try: fp.close() except FileNotFoundError: # Well, we have just renamed the file... pass def process_cvs_changesets( self, cvs_changesets: List[ChangeSetKey], use_rcsparse: bool, ) -> Iterator[ Tuple[List[Content], List[SkippedContent], List[Directory], Revision] ]: """Process CVS revisions. At each CVS revision, check out contents and compute swh hashes. Yields: tuple (contents, skipped-contents, directories, revision) of dict as a dictionary with keys, sha1_git, sha1, etc... """ for k in cvs_changesets: tstr = time.strftime("%c", time.gmtime(k.max_time)) self.log.info( "changeset from %s by %s on branch %s", tstr, k.author, k.branch ) logmsg: Optional[bytes] = b"" # Check out all files of this revision and get a log message. # # The log message is obtained from the first file in the changeset. # The message will usually be the same for all affected files, and # the SWH archive will only store one version of the log message. for f in k.revs: rcsfile = None if use_rcsparse: if rcsfile is None: rcsfile = rcsparse.rcsfile(f.path) if not logmsg: logmsg = rcsfile.getlog(k.revs[0].rev) self.checkout_file_with_rcsparse(k, f, rcsfile) else: if not logmsg: logmsg = self.rlog.getlog(self.rlog_file, f.path, k.revs[0].rev) self.checkout_file_with_cvsclient(k, f, self.cvsclient) # TODO: prune empty directories? (revision, swh_dir) = self.compute_swh_revision(k, logmsg) (contents, skipped_contents, directories) = from_disk.iter_directory( swh_dir ) yield contents, skipped_contents, directories, revision def prepare_origin_visit(self) -> None: self.origin = Origin( url=self.origin_url if self.origin_url else self.cvsroot_url ) def pre_cleanup(self) -> None: """Cleanup potential dangling files from prior runs (e.g. OOM killed tasks) """ clean_dangling_folders( self.temp_directory, pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, log=self.log, ) def cleanup(self) -> None: self.log.info("cleanup") + def configure_custom_id_keyword(self, cvsconfig): + """Parse CVSROOT/config and look for a custom keyword definition. + There are two different configuration directives in use for this purpose. + + The first variant stems from a patch which was never accepted into + upstream CVS and uses the tag directive: tag=MyName + With this, the "MyName" keyword becomes an alias for the "Id" keyword. + This variant is prelevant in CVS versions shipped on BSD. + + The second variant stems from upstream CVS 1.12 and looks like: + LocalKeyword=MyName=SomeKeyword + KeywordExpand=iMyName + We only support "SomeKeyword" if it specifies "Id" or "CVSHeader", for now. + The KeywordExpand directive can be used to suppress expansion of keywords + by listing keywords after an initial "e" character ("exclude", as opposed + to an "include" list which uses an initial "i" character). + For example, this disables expansion of the Date and Name keywords: + KeywordExpand=eDate,Name + """ + for line in cvsconfig.readlines(): + line = line.strip() + try: + (config_key, value) = line.split("=", 1) + except ValueError: + continue + config_key = config_key.strip() + value = value.strip() + if config_key == "tag": + self.custom_id_keyword = value + elif config_key == "LocalKeyword": + try: + (custom_kwname, kwname) = value.split("=", 1) + except ValueError: + continue + if kwname.strip() in ("Id", "CVSHeader"): + self.custom_id_keyword = custom_kwname.strip() + elif config_key == "KeywordExpand" and value.startswith("e"): + excluded_keywords = value[1:].split(",") + for k in excluded_keywords: + self.excluded_keywords.append(k.strip()) + def fetch_cvs_repo_with_rsync(self, host: str, path: str) -> None: # URL *must* end with a trailing slash in order to get CVSROOT listed url = "rsync://%s%s/" % (host, os.path.dirname(path)) rsync = subprocess.run(["rsync", url], capture_output=True, encoding="ascii") rsync.check_returncode() have_cvsroot = False have_module = False for line in rsync.stdout.split("\n"): self.log.debug("rsync server: %s", line) if line.endswith(" CVSROOT"): have_cvsroot = True elif line.endswith(" %s" % self.cvs_module_name): have_module = True if have_module and have_cvsroot: break if not have_module: raise NotFound( "CVS module %s not found at %s" % (self.cvs_module_name, url) ) if not have_cvsroot: raise NotFound("No CVSROOT directory found at %s" % url) + # Fetch the CVSROOT directory and the desired CVS module. assert self.cvsroot_path - subprocess.run( - # Ensure that rsync will place files directly within our cvsroot - # directory by appending a "/" to our cvsroot path. - ["rsync", "-a", url, self.cvsroot_path + "/"] - ).check_returncode() + for d in ("CVSROOT", self.cvs_module_name): + target_dir = os.path.join(self.cvsroot_path, d) + os.makedirs(target_dir, exist_ok=True) + subprocess.run( + # Append trailing path separators ("/" in the URL and os.path.sep in the + # local target directory path) to ensure that rsync will place files + # directly within our target directory . + ["rsync", "-a", url + d + "/", target_dir + os.path.sep] + ).check_returncode() def prepare(self) -> None: self._last_revision = None self.tempdir_path = tempfile.mkdtemp( suffix="-%s" % os.getpid(), prefix=TEMPORARY_DIR_PREFIX_PATTERN, dir=self.temp_directory, ) url = parse_url(self.origin_url) self.log.debug( "prepare; origin_url=%s scheme=%s path=%s", self.origin_url, url.scheme, url.path, ) if not url.path: raise NotFound("Invalid CVS origin URL '%s'" % self.origin_url) self.cvs_module_name = os.path.basename(url.path) self.server_style_cvsroot = os.path.dirname(url.path) self.worktree_path = os.path.join(self.tempdir_path, self.cvs_module_name) if url.scheme == "file" or url.scheme == "rsync": # local CVS repository conversion if not self.cvsroot_path: self.cvsroot_path = tempfile.mkdtemp( suffix="-%s" % os.getpid(), prefix=TEMPORARY_DIR_PREFIX_PATTERN, dir=self.temp_directory, ) if url.scheme == "file": if not os.path.exists(url.path): raise NotFound elif url.scheme == "rsync": self.fetch_cvs_repo_with_rsync(url.host, url.path) have_rcsfile = False have_cvsroot = False for root, dirs, files in os.walk(self.cvsroot_path): if "CVSROOT" in dirs: have_cvsroot = True dirs.remove("CVSROOT") continue for f in files: filepath = os.path.join(root, f) if f[-2:] == ",v": rcsfile = rcsparse.rcsfile(filepath) # noqa: F841 self.log.debug( "Looks like we have data to convert; " "found a valid RCS file at %s", filepath, ) have_rcsfile = True break if have_rcsfile: break if not have_rcsfile: raise NotFound( "Directory %s does not contain any valid RCS files %s", self.cvsroot_path, ) if not have_cvsroot: self.log.warn( "The CVS repository at '%s' lacks a CVSROOT directory; " "we might be ingesting an incomplete copy of the repository", self.cvsroot_path, ) + # The file CVSROOT/config will usually contain ASCII data only. + # We allow UTF-8 just in case. Other encodings may result in an + # error and will require manual intervention, for now. + cvsconfig_path = os.path.join(self.cvsroot_path, "CVSROOT", "config") + cvsconfig = open(cvsconfig_path, mode="r", encoding="utf-8") + self.configure_custom_id_keyword(cvsconfig) + cvsconfig.close() + # Unfortunately, there is no way to convert CVS history in an # iterative fashion because the data is not indexed by any kind # of changeset ID. We need to walk the history of each and every # RCS file in the repository during every visit, even if no new # changes will be added to the SWH archive afterwards. # "CVS’s repository is the software equivalent of a telephone book # sorted by telephone number." # https://corecursive.com/software-that-doesnt-suck-with-jim-blandy/ # # An implicit assumption made here is that self.cvs_changesets will # fit into memory in its entirety. If it won't fit then the CVS walker # will need to be modified such that it spools the list of changesets # to disk instead. cvs = CvsConv(self.cvsroot_path, RcsKeywords(), False, CHANGESET_FUZZ_SEC) self.log.info("Walking CVS module %s", self.cvs_module_name) cvs.walk(self.cvs_module_name) cvs_changesets = sorted(cvs.changesets) self.log.info( "CVS changesets found in %s: %d", self.cvs_module_name, len(cvs_changesets), ) self.swh_revision_gen = self.process_cvs_changesets( cvs_changesets, use_rcsparse=True ) elif url.scheme == "pserver" or url.scheme == "fake" or url.scheme == "ssh": # remote CVS repository conversion if not self.cvsroot_path: self.cvsroot_path = os.path.dirname(url.path) self.cvsclient = CVSClient(url) cvsroot_path = os.path.dirname(url.path) self.log.info( "Fetching CVS rlog from %s:%s/%s", url.host, cvsroot_path, self.cvs_module_name, ) self.rlog = RlogConv(cvsroot_path, CHANGESET_FUZZ_SEC) main_rlog_file = self.cvsclient.fetch_rlog() self.rlog.parse_rlog(main_rlog_file) # Find file deletion events only visible in Attic directories. main_changesets = self.rlog.changesets attic_paths = [] attic_rlog_files = [] assert self.cvsroot_path for k in main_changesets: for changed_file in k.revs: path = file_path(self.cvsroot_path, changed_file.path) if path.startswith(self.cvsroot_path): path = path[ len(os.path.commonpath([self.cvsroot_path, path])) + 1 : ] parent_path = os.path.dirname(path) if parent_path.split("/")[-1] == "Attic": continue attic_path = parent_path + "/Attic" if attic_path in attic_paths: continue attic_paths.append(attic_path) # avoid multiple visits # Try to fetch more rlog data from this Attic directory. attic_rlog_file = self.cvsclient.fetch_rlog( path=attic_path, state="dead", ) if attic_rlog_file: attic_rlog_files.append(attic_rlog_file) if len(attic_rlog_files) == 0: self.rlog_file = main_rlog_file else: # Combine all the rlog pieces we found and re-parse. fp = tempfile.TemporaryFile() for attic_rlog_file in attic_rlog_files: for line in attic_rlog_file.readlines(): fp.write(line) attic_rlog_file.close() main_rlog_file.seek(0) for line in main_rlog_file.readlines(): fp.write(line) main_rlog_file.close() fp.seek(0) self.rlog.parse_rlog(cast(BinaryIO, fp)) self.rlog_file = cast(BinaryIO, fp) cvs_changesets = sorted(self.rlog.changesets) self.log.info( "CVS changesets found for %s: %d", self.cvs_module_name, len(cvs_changesets), ) self.swh_revision_gen = self.process_cvs_changesets( cvs_changesets, use_rcsparse=False ) else: raise NotFound("Invalid CVS origin URL '%s'" % self.origin_url) def fetch_data(self) -> bool: """Fetch the next CVS revision.""" try: data = next(self.swh_revision_gen) except StopIteration: assert self._last_revision is not None self.snapshot = self.generate_and_load_snapshot(self._last_revision) self.log.info("SWH snapshot ID: %s", hashutil.hash_to_hex(self.snapshot.id)) self.flush() self.loaded_snapshot_id = self.snapshot.id return False except Exception: self.log.exception("Exception in fetch_data:") return False # Stopping iteration self._contents, self._skipped_contents, self._directories, rev = data self._revisions = [rev] return True def build_swh_revision( self, k: ChangeSetKey, logmsg: Optional[bytes], dir_id: bytes, parents: Sequence[bytes], ) -> Revision: """Given a CVS revision, build a swh revision. Args: k: changeset data logmsg: the changeset's log message dir_id: the tree's hash identifier parents: the revision's parents identifier Returns: The swh revision dictionary. """ author = Person.from_fullname(k.author.encode("UTF-8")) date = TimestampWithTimezone.from_dict(k.max_time) return Revision( type=RevisionType.CVS, date=date, committer_date=date, directory=dir_id, message=logmsg, author=author, committer=author, synthetic=True, extra_headers=[], parents=tuple(parents), ) def generate_and_load_snapshot(self, revision: Revision) -> Snapshot: """Create the snapshot either from existing revision. Args: revision (dict): Last revision seen if any (None by default) Returns: Optional[Snapshot] The newly created snapshot """ snap = Snapshot( branches={ DEFAULT_BRANCH: SnapshotBranch( target=revision.id, target_type=TargetType.REVISION ) } ) self.log.debug("snapshot: %s", snap) self.storage.snapshot_add([snap]) return snap def store_data(self) -> None: "Add our current CVS changeset to the archive." self.storage.skipped_content_add(self._skipped_contents) self.storage.content_add(self._contents) self.storage.directory_add(self._directories) self.storage.revision_add(self._revisions) self.flush() self._skipped_contents = [] self._contents = [] self._directories = [] self._revisions = [] def load_status(self) -> Dict[str, Any]: assert self.snapshot is not None if self.last_snapshot == self.snapshot: load_status = "uneventful" else: load_status = "eventful" return { "status": load_status, } def visit_status(self) -> str: return self._visit_status diff --git a/swh/loader/cvs/tests/data/greek-repository9.tgz b/swh/loader/cvs/tests/data/greek-repository9.tgz new file mode 100644 index 0000000..9348754 Binary files /dev/null and b/swh/loader/cvs/tests/data/greek-repository9.tgz differ diff --git a/swh/loader/cvs/tests/test_loader.py b/swh/loader/cvs/tests/test_loader.py index c0b3f70..50ecc9e 100644 --- a/swh/loader/cvs/tests/test_loader.py +++ b/swh/loader/cvs/tests/test_loader.py @@ -1,949 +1,997 @@ # Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import os from typing import Any, Dict from swh.loader.cvs.loader import CvsLoader from swh.loader.tests import ( assert_last_visit_matches, check_snapshot, get_stats, prepare_repository_from_archive, ) from swh.model.hashutil import hash_to_bytes from swh.model.model import Snapshot, SnapshotBranch, TargetType RUNBABY_SNAPSHOT = Snapshot( id=hash_to_bytes("e64667c400049f560a3856580e0d9e511ffa66c9"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("0f6db8ce49472d7829ddd6141f71c68c0d563f0e"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_not_found_no_mock(swh_storage, tmp_path): """Given an unknown repository, the loader visit ends up in status not_found""" unknown_repo_url = "unknown-repository" loader = CvsLoader(swh_storage, unknown_repo_url, cvsroot_path=tmp_path) assert loader.load() == {"status": "uneventful"} assert_last_visit_matches( swh_storage, unknown_repo_url, status="not_found", type="cvs", ) def test_loader_cvs_visit(swh_storage, datadir, tmp_path): """Eventful visit should yield 1 snapshot""" archive_name = "runbaby" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=RUNBABY_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 5, "directory": 1, "origin": 1, "origin_visit": 1, "release": 0, "revision": 1, "skipped_content": 0, "snapshot": 1, } check_snapshot(RUNBABY_SNAPSHOT, loader.storage) def test_loader_cvs_2_visits_no_change(swh_storage, datadir, tmp_path): """Eventful visit followed by uneventful visit should yield the same snapshot """ archive_name = "runbaby" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} visit_status1 = assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=RUNBABY_SNAPSHOT.id, ) loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "uneventful"} visit_status2 = assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=RUNBABY_SNAPSHOT.id, ) assert visit_status1.date < visit_status2.date assert visit_status1.snapshot == visit_status2.snapshot stats = get_stats(loader.storage) assert stats["origin_visit"] == 1 + 1 # computed twice the same snapshot assert stats["snapshot"] == 1 GREEK_SNAPSHOT = Snapshot( id=hash_to_bytes("c76f8b58a6dfbe6fccb9a85b695f914aa5c4a95a"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("e138207ddd5e1965b5ab9a522bfc2e0ecd233b67"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_with_file_additions_and_deletions(swh_storage, datadir, tmp_path): """Eventful conversion of history with file additions and deletions""" archive_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 8, "directory": 13, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT, loader.storage) def test_loader_cvs_pserver_with_file_additions_and_deletions( swh_storage, datadir, tmp_path ): """Eventful CVS pserver conversion with file additions and deletions""" archive_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 8, "directory": 13, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT, loader.storage) GREEK_SNAPSHOT2 = Snapshot( id=hash_to_bytes("e3d2e8860286000f546c01aa2a3e1630170eb3b6"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("f1ff9a3c7624b1be5e5d51f9ec0abf7dcddbf0b2"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_2_visits_with_change(swh_storage, datadir, tmp_path): """Eventful visit followed by eventful visit should yield two snapshots""" archive_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} visit_status1 = assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 8, "directory": 13, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 1, } archive_name2 = "greek-repository2" archive_path2 = os.path.join(datadir, f"{archive_name2}.tgz") repo_url = prepare_repository_from_archive(archive_path2, archive_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} visit_status2 = assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT2.id, ) stats = get_stats(loader.storage) assert stats == { "content": 10, "directory": 15, "origin": 1, "origin_visit": 2, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 2, } check_snapshot(GREEK_SNAPSHOT2, loader.storage) assert visit_status1.date < visit_status2.date assert visit_status1.snapshot != visit_status2.snapshot def test_loader_cvs_visit_pserver(swh_storage, datadir, tmp_path): """Eventful visit to CVS pserver should yield 1 snapshot""" archive_name = "runbaby" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/runbaby" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=RUNBABY_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 5, "directory": 1, "origin": 1, "origin_visit": 1, "release": 0, "revision": 1, "skipped_content": 0, "snapshot": 1, } check_snapshot(RUNBABY_SNAPSHOT, loader.storage) GREEK_SNAPSHOT3 = Snapshot( id=hash_to_bytes("6e9910ed072662cb482d9017cbf5e1973e6dc09f"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("d9f4837dc55a87d83730c6e277c88b67dae80272"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_visit_pserver_no_eol(swh_storage, datadir, tmp_path): """Visit to CVS pserver with file that lacks trailing eol""" archive_name = "greek-repository3" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT3.id, ) stats = get_stats(loader.storage) assert stats == { "content": 9, "directory": 15, "origin": 1, "origin_visit": 1, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT3, loader.storage) GREEK_SNAPSHOT4 = Snapshot( id=hash_to_bytes("a8593e9233601b31e012d36975f817d2c993d04b"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("51bb99655225c810ee259087fcae505899725360"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_visit_expand_id_keyword(swh_storage, datadir, tmp_path): """Visit to CVS repository with file with an RCS Id keyword""" archive_name = "greek-repository4" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT4.id, ) stats = get_stats(loader.storage) assert stats == { "content": 12, "directory": 20, "origin": 1, "origin_visit": 1, "release": 0, "revision": 11, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT4, loader.storage) def test_loader_cvs_visit_pserver_expand_id_keyword(swh_storage, datadir, tmp_path): """Visit to CVS pserver with file with an RCS Id keyword""" archive_name = "greek-repository4" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT4.id, ) stats = get_stats(loader.storage) assert stats == { "content": 12, "directory": 20, "origin": 1, "origin_visit": 1, "release": 0, "revision": 11, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT4, loader.storage) GREEK_SNAPSHOT5 = Snapshot( id=hash_to_bytes("6484ec9bfff677731cbb6d2bd5058dabfae952ed"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("514b3bef07d56e393588ceda18cc1dfa2dc4e04a"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_with_file_deleted_and_readded(swh_storage, datadir, tmp_path): """Eventful conversion of history with file deletion and re-addition""" archive_name = "greek-repository5" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT5.id, ) stats = get_stats(loader.storage) assert stats == { "content": 9, "directory": 14, "origin": 1, "origin_visit": 1, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT5, loader.storage) def test_loader_cvs_pserver_with_file_deleted_and_readded( swh_storage, datadir, tmp_path ): """Eventful pserver conversion with file deletion and re-addition""" archive_name = "greek-repository5" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT5.id, ) stats = get_stats(loader.storage) assert stats == { "content": 9, "directory": 14, "origin": 1, "origin_visit": 1, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT5, loader.storage) DINO_SNAPSHOT = Snapshot( id=hash_to_bytes("6cf774cec1030ff3e9a301681303adb537855d09"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("b7d3ea1fa878d51323b5200ad2c6ee9d5b656f10"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_readded_file_in_attic(swh_storage, datadir, tmp_path): """Conversion of history with RCS files in the Attic""" # This repository has some file revisions marked "dead" in the Attic only. # This is different to the re-added file tests above, where the RCS file # was moved out of the Attic again as soon as the corresponding deleted # file was re-added. Failure to detect the "dead" file revisions in the # Attic would result in errors in our converted history. archive_name = "dino-readded-file" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/src" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=DINO_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 38, "directory": 70, "origin": 1, "origin_visit": 1, "release": 0, "revision": 35, "skipped_content": 0, "snapshot": 1, } check_snapshot(DINO_SNAPSHOT, loader.storage) def test_loader_cvs_pserver_readded_file_in_attic(swh_storage, datadir, tmp_path): """Conversion over pserver with RCS files in the Attic""" # This repository has some file revisions marked "dead" in the Attic only. # This is different to the re-added file tests above, where the RCS file # was moved out of the Attic again as soon as the corresponding deleted # file was re-added. Failure to detect the "dead" file revisions in the # Attic would result in errors in our converted history. # This has special implications for the pserver case, because the "dead" # revisions will not appear in in the output of 'cvs rlog' by default. archive_name = "dino-readded-file" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/src" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=DINO_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 38, "directory": 70, "origin": 1, "origin_visit": 1, "release": 0, "revision": 35, "skipped_content": 0, "snapshot": 1, } check_snapshot(DINO_SNAPSHOT, loader.storage) DINO_SNAPSHOT2 = Snapshot( id=hash_to_bytes("afdeca6b8ec8f58367b4e014e2210233f1c5bf3d"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("84e428103d42b84713c77afb9420d667062f8676"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_split_commits_by_commitid(swh_storage, datadir, tmp_path): """Conversion of RCS history which needs to be split by commit ID""" # This repository has some file revisions which use the same log message # and can only be told apart by commit IDs. Without commit IDs, these commits # would get merged into a single commit in our conversion result. archive_name = "dino-commitid" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/dino" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=DINO_SNAPSHOT2.id, ) check_snapshot(DINO_SNAPSHOT2, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 18, "directory": 18, "origin": 1, "origin_visit": 1, "release": 0, "revision": 18, "skipped_content": 0, "snapshot": 1, } def test_loader_cvs_pserver_split_commits_by_commitid(swh_storage, datadir, tmp_path): """Conversion via pserver which needs to be split by commit ID""" # This repository has some file revisions which use the same log message # and can only be told apart by commit IDs. Without commit IDs, these commits # would get merged into a single commit in our conversion result. archive_name = "dino-commitid" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/dino" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=DINO_SNAPSHOT2.id, ) check_snapshot(DINO_SNAPSHOT2, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 18, "directory": 18, "origin": 1, "origin_visit": 1, "release": 0, "revision": 18, "skipped_content": 0, "snapshot": 1, } GREEK_SNAPSHOT6 = Snapshot( id=hash_to_bytes("859ae7ca5b31fee594c98abecdd41eff17cae079"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("fa48fb4551898cd8d3305cace971b3b95639e83e"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_empty_lines_in_log_message(swh_storage, datadir, tmp_path): """Conversion of RCS history with empty lines in a log message""" archive_name = "greek-repository6" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT6.id, ) check_snapshot(GREEK_SNAPSHOT6, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 9, "directory": 14, "origin": 1, "origin_visit": 1, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 1, } def test_loader_cvs_pserver_empty_lines_in_log_message(swh_storage, datadir, tmp_path): """Conversion via pserver with empty lines in a log message""" archive_name = "greek-repository6" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT6.id, ) check_snapshot(GREEK_SNAPSHOT6, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 9, "directory": 14, "origin": 1, "origin_visit": 1, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 1, } def get_head_revision_paths_info(loader: CvsLoader) -> Dict[bytes, Dict[str, Any]]: assert loader.snapshot is not None root_dir = loader.snapshot.branches[b"HEAD"].target revision = loader.storage.revision_get([root_dir])[0] assert revision is not None paths = {} for entry in loader.storage.directory_ls(revision.directory, recursive=True): paths[entry["name"]] = entry return paths def test_loader_cvs_with_header_keyword(swh_storage, datadir, tmp_path): """Eventful conversion of history with Header keyword in a file""" archive_name = "greek-repository7" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} repo_url = f"fake://{repo_url[7:]}" loader2 = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader2.load() == {"status": "eventful"} # We cannot verify the snapshot ID. It is unpredicable due to use of the $Header$ # RCS keyword which contains the temporary directory where the repository is stored. expected_stats = { "content": 9, "directory": 14, "origin": 2, "origin_visit": 2, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 1, } stats = get_stats(loader.storage) assert stats == expected_stats stats = get_stats(loader2.storage) assert stats == expected_stats # Ensure that file 'alpha', which contains a $Header$ keyword, # was imported with equal content via file:// and fake:// URLs. paths = get_head_revision_paths_info(loader) paths2 = get_head_revision_paths_info(loader2) alpha = paths[b"alpha"] alpha2 = paths2[b"alpha"] assert alpha["sha1"] == alpha2["sha1"] GREEK_SNAPSHOT8 = Snapshot( id=hash_to_bytes("5278a1f73ed0f804c68f72614a5f78ca5074ab9c"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("b389258fec8151d719e79da80b5e5355a48ec8bc"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_expand_log_keyword(swh_storage, datadir, tmp_path): """Conversion of RCS history with Log keyword in files""" archive_name = "greek-repository8" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT8.id, ) check_snapshot(GREEK_SNAPSHOT8, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 14, "directory": 20, "origin": 1, "origin_visit": 1, "release": 0, "revision": 11, "skipped_content": 0, "snapshot": 1, } def test_loader_cvs_pserver_expand_log_keyword(swh_storage, datadir, tmp_path): """Conversion of RCS history with Log keyword in files""" archive_name = "greek-repository8" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT8.id, ) check_snapshot(GREEK_SNAPSHOT8, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 14, "directory": 20, "origin": 1, "origin_visit": 1, "release": 0, "revision": 11, "skipped_content": 0, "snapshot": 1, } + + +GREEK_SNAPSHOT9 = Snapshot( + id=hash_to_bytes("3d08834666df7a589abea07ac409771ebe7e8fe4"), + branches={ + b"HEAD": SnapshotBranch( + target=hash_to_bytes("9971cbb3b540dfe75f3bcce5021cb73d63b47df3"), + target_type=TargetType.REVISION, + ) + }, +) + + +def test_loader_cvs_visit_expand_custom_keyword(swh_storage, datadir, tmp_path): + """Visit to CVS repository with file with a custom RCS keyword""" + archive_name = "greek-repository9" + extracted_name = "greek-repository" + archive_path = os.path.join(datadir, f"{archive_name}.tgz") + repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) + repo_url += "/greek-tree" # CVS module name + + loader = CvsLoader( + swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) + ) + + assert loader.load() == {"status": "eventful"} + + assert_last_visit_matches( + loader.storage, + repo_url, + status="full", + type="cvs", + snapshot=GREEK_SNAPSHOT9.id, + ) + + stats = get_stats(loader.storage) + assert stats == { + "content": 9, + "directory": 14, + "origin": 1, + "origin_visit": 1, + "release": 0, + "revision": 8, + "skipped_content": 0, + "snapshot": 1, + } + + check_snapshot(GREEK_SNAPSHOT9, loader.storage)