diff --git a/swh/loader/cvs/cvs2gitdump/cvs2gitdump.py b/swh/loader/cvs/cvs2gitdump/cvs2gitdump.py index e139b84..70717af 100644 --- a/swh/loader/cvs/cvs2gitdump/cvs2gitdump.py +++ b/swh/loader/cvs/cvs2gitdump/cvs2gitdump.py @@ -1,726 +1,729 @@ #!/usr/local/bin/python # # Copyright (c) 2012 YASUOKA Masahiko # # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. # Usage # # First import: # % git init --bare /git/openbsd.git # % python cvs2gitdump.py -k OpenBSD -e openbsd.org /cvs/openbsd/src \ # > openbsd.dump # % git --git-dir /git/openbsd.git fast-import < openbsd.dump # # Periodic import: # % sudo cvsync # % python cvs2gitdump.py -k OpenBSD -e openbsd.org /cvs/openbsd/src \ # /git/openbsd.git > openbsd2.dump # % git --git-dir /git/openbsd.git fast-import < openbsd2.dump # +from collections import defaultdict import copy import getopt import os import re import subprocess import sys import time from typing import Dict, List, Optional, Tuple, TypeVar import swh.loader.cvs.rcsparse as rcsparse CHANGESET_FUZZ_SEC = 300 def usage(): print('usage: cvs2gitdump [-ah] [-z fuzz] [-e email_domain] ' '[-E log_encodings]\n' '\t[-k rcs_keywords] [-b branch] [-m module] [-l last_revision]\n' '\tcvsroot [git_dir]', file=sys.stderr) def main() -> None: email_domain = None do_incremental = False git_tip = None git_branch = 'master' dump_all = False log_encoding = 'utf-8,iso-8859-1' rcs = RcsKeywords() modules = [] last_revision = None fuzzsec = CHANGESET_FUZZ_SEC try: opts, args = getopt.getopt(sys.argv[1:], 'ab:hm:z:e:E:k:t:l:') for opt, v in opts: if opt == '-z': fuzzsec = int(v) elif opt == '-e': email_domain = v elif opt == '-a': dump_all = True elif opt == '-b': git_branch = v elif opt == '-E': log_encoding = v elif opt == '-k': rcs.add_id_keyword(v) elif opt == '-m': if v == '.git': print('Cannot handle the path named \'.git\'', file=sys.stderr) sys.exit(1) modules.append(v) elif opt == '-l': last_revision = v elif opt == '-h': usage() sys.exit(1) except getopt.GetoptError as msg: print(msg, file=sys.stderr) usage() sys.exit(1) if len(args) == 0 or len(args) > 2: usage() sys.exit(1) log_encodings = log_encoding.split(',') cvsroot = args[0] while cvsroot[-1] == '/': cvsroot = cvsroot[:-1] if len(args) == 2: do_incremental = True git = subprocess.Popen( ['git', '--git-dir=' + args[1], '-c', 'i18n.logOutputEncoding=UTF-8', 'log', '--max-count', '1', '--date=raw', '--format=%ae%n%ad%n%H', git_branch], encoding='utf-8', stdout=subprocess.PIPE) assert git.stdout is not None outs = git.stdout.readlines() git.wait() if git.returncode != 0: print("Couldn't exec git", file=sys.stderr) sys.exit(git.returncode) git_tip = outs[2].strip() if last_revision is not None: git = subprocess.Popen( ['git', '--git-dir=' + args[1], '-c', 'i18n.logOutputEncoding=UTF-8', 'log', '--max-count', '1', '--date=raw', '--format=%ae%n%ad%n%H', last_revision], encoding='utf-8', stdout=subprocess.PIPE) assert git.stdout is not None outs = git.stdout.readlines() git.wait() if git.returncode != 0: print("Coundn't exec git", file=sys.stderr) sys.exit(git.returncode) last_author = outs[0].strip() last_ctime = float(outs[1].split()[0]) # strip off the domain part from the last author since cvs doesn't have # the domain part. if do_incremental and email_domain is not None and \ last_author.lower().endswith(('@' + email_domain).lower()): last_author = last_author[:-1 * (1 + len(email_domain))] cvs = CvsConv(cvsroot, rcs, not do_incremental, fuzzsec) print('** walk cvs tree', file=sys.stderr) if len(modules) == 0: cvs.walk() else: for module in modules: cvs.walk(module) changesets = sorted(cvs.changesets) nchangesets = len(changesets) print('** cvs has %d changeset' % (nchangesets), file=sys.stderr) if nchangesets <= 0: sys.exit(0) if not dump_all: # don't use last 10 minutes for safety max_time_max = changesets[-1].max_time - 600 else: max_time_max = changesets[-1].max_time found_last_revision = False markseq = cvs.markseq extags = set() for k in changesets: if do_incremental and not found_last_revision: if k.min_time == last_ctime and k.author == last_author: found_last_revision = True for tag in k.tags: extags.add(tag) continue if k.max_time > max_time_max: break marks = {} for f in k.revs: if not do_incremental: marks[f.markseq] = f else: markseq = markseq + 1 git_dump_file(f.path, f.rev, rcs, markseq) marks[markseq] = f log = rcsparse.rcsfile(k.revs[0].path).getlog(k.revs[0].rev) for i, e in enumerate(log_encodings): try: how = 'ignore' if i == len(log_encodings) - 1 else 'strict' log_str = log.decode(e, how) break except UnicodeError: pass log = log_str.encode('utf-8', 'ignore') output('commit refs/heads/' + git_branch) markseq = markseq + 1 output('mark :%d' % (markseq)) email = k.author if email_domain is None \ else k.author + '@' + email_domain output('author %s <%s> %d +0000' % (k.author, email, k.min_time)) output('committer %s <%s> %d +0000' % (k.author, email, k.min_time)) output('data', len(log)) output(log, end='') if do_incremental and git_tip is not None: output('from', git_tip) git_tip = None for m in marks: f = marks[m] mode = 0o100755 if os.access(f.path, os.X_OK) else 0o100644 fn = file_path(cvs.cvsroot, f.path) if f.state == 'dead': output('D', fn) else: output('M %o :%d %s' % (mode, m, fn)) output('') for tag in k.tags: if tag in extags: continue output('reset refs/tags/%s' % (tag)) output('from :%d' % (markseq)) output('') if do_incremental and not found_last_revision: raise Exception('could not find the last revision') print('** dumped', file=sys.stderr) # # Encode by UTF-8 always for string objects since encoding for git-fast-import # is UTF-8. Also write without conversion for a bytes object (file bodies # might be various encodings) # def output(*args, end='\n') -> None: if len(args) == 0: pass elif len(args) > 1 or isinstance(args[0], str): lines = ' '.join( [arg if isinstance(arg, str) else str(arg) for arg in args]) sys.stdout.buffer.write(lines.encode('utf-8')) else: sys.stdout.buffer.write(args[0]) if len(end) > 0: sys.stdout.buffer.write(end.encode('utf-8')) class FileRevision: def __init__(self, path: bytes, rev: str, state: str, markseq: int) -> None: self.path = path self.rev = rev self.state = state self.markseq = markseq class ChangeSetKey: def __init__( self, branch: str, author, timestamp: int, log: bytes, commitid: Optional[str], fuzzsec: int ) -> None: self.branch = branch self.author = author self.min_time = timestamp self.max_time = timestamp self.commitid = commitid self.fuzzsec = fuzzsec self.revs: List[FileRevision] = [] self.tags: List[str] = [] self.log_hash = 0 h = 0 for c in log: h = 31 * h + c self.log_hash = h def __lt__(self, other) -> bool: return self._cmp(other) < 0 def __gt__(self, other) -> bool: return self._cmp(other) > 0 def __eq__(self, other) -> bool: return self._cmp(other) == 0 def __le__(self, other) -> bool: return self._cmp(other) <= 0 def __ge__(self, other) -> bool: return self._cmp(other) >= 0 def __ne__(self, other) -> bool: return self._cmp(other) != 0 def _cmp(self, anon) -> int: if not isinstance(anon, ChangeSetKey): raise TypeError() # compare by the commitid cid = _cmp2(self.commitid, anon.commitid) if cid == 0 and self.commitid is not None: # both have commitid and they are same return 0 # compare by the time ma = anon.min_time - self.max_time mi = self.min_time - anon.max_time ct = self.min_time - anon.min_time if ma > self.fuzzsec or mi > self.fuzzsec: return ct if cid != 0: # only one has the commitid, this means different commit return cid if ct == 0 else ct # compare by log, branch and author c = _cmp2(self.log_hash, anon.log_hash) if c == 0: c = _cmp2(self.branch, anon.branch) if c == 0: c = _cmp2(self.author, anon.author) if c == 0: return 0 return ct if ct != 0 else c def merge(self, anot: "ChangeSetKey") -> None: self.max_time = max(self.max_time, anot.max_time) self.min_time = min(self.min_time, anot.min_time) self.revs.extend(anot.revs) def __hash__(self) -> int: return hash(self.branch + '/' + self.author) * 31 + self.log_hash def put_file(self, path: bytes, rev: str, state: str, markseq: int): self.revs.append(FileRevision(path, rev, state, markseq)) TCmp = TypeVar("TCmp", int, str) def _cmp2(a: Optional[TCmp], b: Optional[TCmp]) -> int: _a = a is not None _b = b is not None return (a > b) - (a < b) if _a and _b else (_a > _b) - (_a < _b) # type: ignore class CvsConv: def __init__(self, cvsroot: str, rcs: "RcsKeywords", dumpfile: bool, fuzzsec: int) -> None: self.cvsroot = cvsroot self.rcs = rcs self.changesets: Dict[ChangeSetKey, ChangeSetKey] = dict() self.dumpfile = dumpfile self.markseq = 0 self.tags: Dict[str, ChangeSetKey] = dict() self.fuzzsec = fuzzsec def walk(self, module: Optional[str] =None) -> None: p = [self.cvsroot] if module is not None: p.append(module) path = os.path.join(*p) for root, dirs, files in os.walk(os.fsencode(path)): if b'.git' in dirs: print('Ignore %s: cannot handle the path named \'.git\'' % ( os.path.join(root, b'.git')), file=sys.stderr) dirs.remove(b'.git') if b'.git' in files: print('Ignore %s: cannot handle the path named \'.git\'' % ( os.path.join(root, b'.git')), file=sys.stderr) files.remove(b'.git') for f in files: if not f[-2:] == b',v': continue self.parse_file(os.path.join(root, f)) for t, c in list(self.tags.items()): c.tags.append(t) def parse_file(self, path: str) -> None: - rtags: Dict[str, List[str]] = dict() + rtags: Dict[str, List[str]] = defaultdict(list) rcsfile = rcsparse.rcsfile(path) + branches = {'1': 'HEAD', '1.1.1': 'VENDOR'} + for k, v_ in list(rcsfile.symbols.items()): r = v_.split('.') if len(r) == 3: branches[v_] = 'VENDOR' elif len(r) >= 3 and r[-2] == '0': branches['.'.join(r[:-2] + r[-1:])] = k - if len(r) == 2 and branches[r[0]] == 'HEAD': - if v_ not in rtags: - rtags[v_] = list() + elif len(r) == 2 and branches.get(r[0]) == 'HEAD': rtags[v_].append(k) revs: List[Tuple[str, Tuple[str, int, str, str, List[str], str, str]]] = list(rcsfile.revs.items()) # sort by revision descending to priorize 1.1.1.1 than 1.1 revs.sort(key=lambda a: a[1][0], reverse=True) # sort by time revs.sort(key=lambda a: a[1][1]) novendor = False have_initial_revision = False last_vendor_status = None for k, v in revs: r = k.split('.') if len(r) == 4 and r[0] == '1' and r[1] == '1' and r[2] == '1' \ and r[3] == '1': if have_initial_revision: continue if v[3] == 'dead': continue last_vendor_status = v[3] have_initial_revision = True elif len(r) == 4 and r[0] == '1' and r[1] == '1' and r[2] == '1': if novendor: continue last_vendor_status = v[3] elif len(r) == 2: + # ensure revision targets head branch + branches[r[0]] = 'HEAD' if r[0] == '1' and r[1] == '1': if have_initial_revision: continue if v[3] == 'dead': continue have_initial_revision = True elif r[0] == '1' and r[1] != '1': novendor = True if last_vendor_status == 'dead' and v[3] == 'dead': last_vendor_status = None continue last_vendor_status = None else: # trunk only continue if self.dumpfile: self.markseq = self.markseq + 1 git_dump_file(path, k, self.rcs, self.markseq) b = '.'.join(r[:-1]) try: a = ChangeSetKey( branches[b], v[2], v[1], rcsfile.getlog(v[0]), v[6], self.fuzzsec) except Exception as e: print('Aborted at %s %s' % (path, v[0]), file=sys.stderr) raise e a.put_file(path, k, v[3], self.markseq) while a in self.changesets: c = self.changesets[a] del self.changesets[a] c.merge(a) a = c self.changesets[a] = a if k in rtags: for t in rtags[k]: if t not in self.tags or \ self.tags[t].max_time < a.max_time: self.tags[t] = a def file_path(r: bytes, p: bytes) -> bytes: if r.endswith(b'/'): r = r[:-1] if p[-2:] == b',v': path = p[:-2] # drop ",v" else: path = p p_ = path.split(b'/') if len(p_) > 0 and p_[-2] == b'Attic': path = b'/'.join(p_[:-2] + [p_[-1]]) if path.startswith(r): path = path[len(r) + 1:] return path def git_dump_file(path: str, k, rcs, markseq) -> None: try: cont = rcs.expand_keyword(path, rcsparse.rcsfile(path), k, []) except RuntimeError as msg: print('Unexpected runtime error on parsing', path, k, ':', msg, file=sys.stderr) print('unlimit the resource limit may fix this problem.', file=sys.stderr) sys.exit(1) output('blob') output('mark :%d' % markseq) output('data', len(cont)) output(cont) class RcsKeywords: RCS_KW_AUTHOR = (1 << 0) RCS_KW_DATE = (1 << 1) RCS_KW_LOG = (1 << 2) RCS_KW_NAME = (1 << 3) RCS_KW_RCSFILE = (1 << 4) RCS_KW_REVISION = (1 << 5) RCS_KW_SOURCE = (1 << 6) RCS_KW_STATE = (1 << 7) RCS_KW_FULLPATH = (1 << 8) RCS_KW_MDOCDATE = (1 << 9) RCS_KW_LOCKER = (1 << 10) RCS_KW_ID = (RCS_KW_RCSFILE | RCS_KW_REVISION | RCS_KW_DATE | RCS_KW_AUTHOR | RCS_KW_STATE) RCS_KW_HEADER = (RCS_KW_ID | RCS_KW_FULLPATH) rcs_expkw = { b"Author": RCS_KW_AUTHOR, b"Date": RCS_KW_DATE, b"Header": RCS_KW_HEADER, b"Id": RCS_KW_ID, b"Log": RCS_KW_LOG, b"Name": RCS_KW_NAME, b"RCSfile": RCS_KW_RCSFILE, b"Revision": RCS_KW_REVISION, b"Source": RCS_KW_SOURCE, b"State": RCS_KW_STATE, b"Mdocdate": RCS_KW_MDOCDATE, b"Locker": RCS_KW_LOCKER } RCS_KWEXP_NONE = (1 << 0) RCS_KWEXP_NAME = (1 << 1) # include keyword name RCS_KWEXP_VAL = (1 << 2) # include keyword value RCS_KWEXP_LKR = (1 << 3) # include name of locker RCS_KWEXP_OLD = (1 << 4) # generate old keyword string RCS_KWEXP_ERR = (1 << 5) # mode has an error RCS_KWEXP_DEFAULT = (RCS_KWEXP_NAME | RCS_KWEXP_VAL) RCS_KWEXP_KVL = (RCS_KWEXP_NAME | RCS_KWEXP_VAL | RCS_KWEXP_LKR) def __init__(self) -> None: self.rerecomple() def rerecomple(self) -> None: pat = b'|'.join(list(self.rcs_expkw.keys())) self.re_kw = re.compile(b".*?\\$(" + pat + b")[\\$:]") def add_id_keyword(self, keyword) -> None: self.rcs_expkw[keyword.encode('ascii')] = self.RCS_KW_ID self.rerecomple() def kflag_get(self, flags: Optional[str]) -> int: if flags is None: return self.RCS_KWEXP_DEFAULT fl = 0 for fc in flags: if fc == 'k': fl |= self.RCS_KWEXP_NAME elif fc == 'v': fl |= self.RCS_KWEXP_VAL elif fc == 'l': fl |= self.RCS_KWEXP_LKR elif fc == 'o': if len(flags) != 1: fl |= self.RCS_KWEXP_ERR fl |= self.RCS_KWEXP_OLD elif fc == 'b': if len(flags) != 1: fl |= self.RCS_KWEXP_ERR fl |= self.RCS_KWEXP_NONE else: fl |= self.RCS_KWEXP_ERR return fl def expand_keyword(self, filename: str, rcs: rcsparse.rcsfile, r: str, excluded_keywords: List[str], filename_encoding="utf-8") -> bytes: """ Check out a file with keywords expanded. Expansion rules are specific to each keyword, and some cases specific to undocumented behaviour of CVS. Our implementation does not expand some keywords (see comments in the code). For a list of keywords and their expansion rules, see: https://www.gnu.org/software/trans-coord/manual/cvs/cvs.html#Keyword-list (also available in 'info cvs' if cvs is installed) """ rev = rcs.revs[r] mode = self.kflag_get(rcs.expand) if (mode & (self.RCS_KWEXP_NONE | self.RCS_KWEXP_OLD)) != 0: return rcs.checkout(rev[0]) ret = [] for line in rcs.checkout(rev[0]).splitlines(keepends=True): logbuf = None m = self.re_kw.match(line) if m is None: # No RCS Keywords, use it as it is ret.append(line) continue expkw = 0 line0 = b'' while m is not None: logbuf = None try: dsign = m.end(1) + line[m.end(1):].index(b'$') except ValueError: # No RCS Keywords, use it as it is ret.append(line) break prefix = line[:m.start(1) - 1] next_match_segment = copy.deepcopy(line[dsign:]) expbuf = '' try: kwname = m.group(1).decode('ascii') except UnicodeDecodeError: # Not a valid RCS keyword, use it as it is ret.append(line) break if kwname in excluded_keywords: line0 += prefix + m.group(1) m = self.re_kw.match(next_match_segment) if m: line = next_match_segment continue else: ret.append(line0 + line[dsign + 1:]) break line = line[dsign + 1:] if (mode & self.RCS_KWEXP_NAME) != 0: expbuf += '$%s' % kwname if (mode & self.RCS_KWEXP_VAL) != 0: expbuf += ': ' if (mode & self.RCS_KWEXP_VAL) != 0: expkw = self.rcs_expkw[m.group(1)] if (expkw & self.RCS_KW_RCSFILE) != 0: expbuf += filename \ if (expkw & self.RCS_KW_FULLPATH) != 0 \ else os.path.basename(filename) expbuf += " " if (expkw & self.RCS_KW_REVISION) != 0: expbuf += rev[0] expbuf += " " if (expkw & self.RCS_KW_DATE) != 0: expbuf += time.strftime( "%Y/%m/%d %H:%M:%S ", time.gmtime(rev[1])) if (expkw & self.RCS_KW_MDOCDATE) != 0: d = time.gmtime(rev[1]) expbuf += time.strftime( "%B%e %Y " if (d.tm_mday < 10) else "%B %e %Y ", d) if (expkw & self.RCS_KW_AUTHOR) != 0: expbuf += rev[2] expbuf += " " if (expkw & self.RCS_KW_STATE) != 0: expbuf += rev[3] expbuf += " " if (expkw & self.RCS_KW_LOG) != 0: # Unlike other keywords, the Log keyword expands over multiple lines. # The terminating '$' of the Log keyword appears on the line which # contains the log keyword itself. Then follow all log message lines, # and those lines are followed by content which follows the Log keyword. # For example, the line: # # foo $Log$content which follows # # will be expanded like this by CVS: # # foo $Log: delta,v $ # foo Revision 1.2 2021/11/29 14:24:18 stsp # foo log message line 1 # foo log message line 2 # foocontent which follows # # (Side note: Trailing whitespace is stripped from "foo " when # the content which follows gets written to the output file.) # # If we did not trim the Log keyword's trailing "$" here then # the last line would read instead: # # foo$content which follows assert(next_match_segment[0] == ord('$')) next_match_segment = next_match_segment[1:] expbuf += filename \ if (expkw & self.RCS_KW_FULLPATH) != 0 \ else os.path.basename(filename) expbuf += " " logbuf = prefix + ( 'Revision %s %s %s\n' % ( rev[0], time.strftime( "%Y/%m/%d %H:%M:%S", time.gmtime(rev[1])), rev[2])).encode('ascii') for lline in rcs.getlog(rev[0]).splitlines(keepends=True): logbuf += prefix + lline if (expkw & self.RCS_KW_SOURCE) != 0: expbuf += filename expbuf += " " if (expkw & (self.RCS_KW_NAME | self.RCS_KW_LOCKER)) != 0: # We do not expand Name and Locker keywords. # The Name keyword is only expanded when a file is checked # out with an explicit tag name .perhaps this will be needed # if the loader learns about CVS tags some day. # The Locker keyword only expands if the file is currently # locked via 'cvs admin -l', which is not part of the # information we want to preserve about source code. expbuf += " " if (mode & self.RCS_KWEXP_NAME) != 0: expbuf += '$' if logbuf is not None: ret.append(prefix + expbuf.encode(filename_encoding) + b'\n' + logbuf) else: line0 += prefix + expbuf[:255].encode(filename_encoding) m = self.re_kw.match(next_match_segment) if m: line = next_match_segment if (mode & self.RCS_KWEXP_NAME) != 0 and expkw and (expkw & self.RCS_KW_LOG) == 0 and line0[-1] == ord('$'): # There is another keyword on this line that needs expansion. # Avoid a double "$$" in the expanded string. This $ terminates # the previous keyword and marks the beginning of the next one. line0 = line0[:-1] elif logbuf is not None: # Trim whitespace from tail of prefix if appending a suffix which # followed the Log keyword on the same line. # Testing suggests that this matches CVS's behaviour. ret.append(line0 + prefix.rstrip() + line) else: ret.append(line0 + line) return b''.join(ret) # ---------------------------------------------------------------------- # entry point # ---------------------------------------------------------------------- if __name__ == '__main__': main() diff --git a/swh/loader/cvs/rlog.py b/swh/loader/cvs/rlog.py index f11e498..c3d39c3 100644 --- a/swh/loader/cvs/rlog.py +++ b/swh/loader/cvs/rlog.py @@ -1,497 +1,498 @@ """ RCS/CVS rlog parser, derived from viewvc and cvs2gitdump.py """ # Copyright (C) 1999-2021 The ViewCVS Group. All Rights Reserved. # # By using ViewVC, you agree to the terms and conditions set forth # below: # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following # disclaimer. # # * Redistributions in binary form must reproduce the above # copyright notice, this list of conditions and the following # disclaimer in the documentation and/or other materials provided # with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS # BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR # BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, # WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE # OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN # IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # Copyright (c) 2012 YASUOKA Masahiko # # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. import calendar +from collections import defaultdict import re import string import time from typing import BinaryIO, Dict, List, NamedTuple, Optional, Tuple from swh.loader.cvs.cvs2gitdump.cvs2gitdump import ChangeSetKey class revtuple(NamedTuple): number: str date: int author: bytes state: str branches: None revnumstr: None commitid: Optional[str] class RlogConv: def __init__(self, cvsroot_path: str, fuzzsec: int) -> None: self.cvsroot_path = cvsroot_path self.fuzzsec = fuzzsec self.changesets: Dict[ChangeSetKey, ChangeSetKey] = dict() self.tags: Dict[str, ChangeSetKey] = dict() self.offsets: Dict[bytes, Dict[str, int]] = dict() def _process_rlog_revisions( self, path: bytes, taginfo: Dict[bytes, bytes], revisions: Dict[str, revtuple], logmsgs: Dict[str, Optional[bytes]], ) -> None: """Convert RCS revision history of a file into self.changesets items""" - rtags: Dict[str, List[str]] = dict() + rtags: Dict[str, List[str]] = defaultdict(list) # RCS and CVS represent branches by adding digits to revision numbers. # And CVS assigns special meaning to certain revision number ranges. # # Revision numbers on the main branch have only two digits: # # 1.1, 1.2, 1.3, ... # # Branches created with 'cvs tag -b' use even numbers for # the third digit: # # 1.1, 1.2, 1.3, ... main branch history of the file # | # 1.1.2.1, 1.1.2.2 ... a branch (2) forked off r1.1 of the file # # Branches are given human-readable names by associating # RCS tag labels with their revision numbers. # Given a file on the above branch which has been changed 10 times # since history was forked, the branch tag would look like this: # # MY_BRANCH: r1.1.2.10 # # Odd branch numbers are reserved for CVS "vendor" branches. # The default vendor branch is 1.1.1. # Vendor branches are populated with 'cvs import'. # Files on the vendor branch are merged to the main branch automatically # unless there are merge conflicts. Such conflicts have to be resolved # manually each time 'cvs import' is used to update the vendor branch. # # See here for details: # https://www.gnu.org/software/trans-coord/manual/cvs/html_node/Branches-and-revisions.html#Branches-and-revisions # # There are also "magic" branch numbers with a zero inserted # at the second-rightmost position: # # 1.1, 1.2, 1.3, ... main branch history of the file # | # 1.1.2.0.1 magic branch (2) # # This allows CVS to store information about a branch's existence # before any files on this branch have been modified. # Even-numbered branch revisions appear once the file is modified. + branches = {"1": "HEAD", "1.1.1": "VENDOR"} - k: str - v_: str - for k, v_ in list(taginfo.items()): # type: ignore # FIXME, inconsistent types - r = v_.split(".") + for k_, v_ in taginfo.items(): + v_str = v_.decode() + r = v_str.split(".") if len(r) == 3: # vendor branch number - branches[v_] = "VENDOR" + branches[v_str] = "VENDOR" elif len(r) >= 3 and r[-2] == "0": # magic branch number - branches[".".join(r[:-2] + r[-1:])] = k - if len(r) == 2 and branches[r[0]] == "HEAD": + branches[".".join(r[:-2] + r[-1:])] = k_.decode() + elif len(r) == 2 and branches.get(r[0]) == "HEAD": # main branch number - if v_ not in rtags: - rtags[v_] = list() - rtags[v_].append(k) + rtags[v_str].append(k_.decode()) revs: List[Tuple[str, revtuple]] = list(revisions.items()) # sort by revision descending to priorize 1.1.1.1 than 1.1 revs.sort(key=lambda a: a[1][0], reverse=True) # sort by time revs.sort(key=lambda a: a[1][1]) novendor = False have_initial_revision = False last_vendor_status = None for k, v in revs: r = k.split(".") if ( len(r) == 4 and r[0] == "1" and r[1] == "1" and r[2] == "1" and r[3] == "1" ): if have_initial_revision: continue if v[3] == "dead": continue last_vendor_status = v[3] have_initial_revision = True elif len(r) == 4 and r[0] == "1" and r[1] == "1" and r[2] == "1": if novendor: continue last_vendor_status = v[3] elif len(r) == 2: + # ensure revision targets head branch + branches[r[0]] = "HEAD" if r[0] == "1" and r[1] == "1": if have_initial_revision: continue if v[3] == "dead": continue have_initial_revision = True elif r[0] == "1" and r[1] != "1": novendor = True if last_vendor_status == "dead" and v[3] == "dead": last_vendor_status = None continue last_vendor_status = None else: # trunk only continue b = ".".join(r[:-1]) # decode author name in a potentially lossy way; # it is only used for internal hashing in this case author = v[2].decode("utf-8", "ignore") logmsg = logmsgs[k] assert logmsg is not None a = ChangeSetKey(branches[b], author, v[1], logmsg, v[6], self.fuzzsec) a.put_file(path, k, v[3], 0) while a in self.changesets: c = self.changesets[a] del self.changesets[a] c.merge(a) a = c self.changesets[a] = a if k in rtags: for t in rtags[k]: if t not in self.tags or self.tags[t].max_time < a.max_time: self.tags[t] = a def parse_rlog(self, fp: BinaryIO) -> None: self.changesets = dict() self.tags = dict() self.offsets = dict() eof = None while eof != _EOF_LOG and eof != _EOF_ERROR: filename, branch, taginfo, lockinfo, errmsg, eof = _parse_log_header(fp) revisions: Dict[str, revtuple] = {} logmsgs: Dict[str, Optional[bytes]] = {} path = b"" if filename: path = filename elif not eof: raise ValueError("No filename found in rlog header") while not eof: off = fp.tell() rev, logmsg, eof = _parse_log_entry(fp) if rev: revisions[rev[0]] = rev logmsgs[rev[0]] = logmsg if eof != _EOF_LOG and eof != _EOF_ERROR: if path not in self.offsets.keys(): self.offsets[path] = dict() if rev: self.offsets[path][rev[0]] = off self._process_rlog_revisions(path, taginfo, revisions, logmsgs) def getlog(self, fp: BinaryIO, path: bytes, rev: str) -> Optional[bytes]: off = self.offsets[path][rev] fp.seek(off) _rev, logmsg, eof = _parse_log_entry(fp) return logmsg # if your rlog doesn't use 77 '=' characters, then this must change LOG_END_MARKER = b"=" * 77 + b"\n" ENTRY_END_MARKER = b"-" * 28 + b"\n" _EOF_FILE = b"end of file entries" # no more entries for this RCS file _EOF_LOG = b"end of log" # hit the true EOF on the pipe _EOF_ERROR = b"error message found" # rlog issued an error # rlog error messages look like # # rlog: filename/goes/here,v: error message # rlog: filename/goes/here,v:123: error message # # so we should be able to match them with a regex like # # ^rlog\: (.*)(?:\:\d+)?\: (.*)$ # # But for some reason the windows version of rlog omits the "rlog: " prefix # for the first error message when the standard error stream has been # redirected to a file or pipe. (the prefix is present in subsequent errors # and when rlog is run from the console). So the expression below is more # complicated _re_log_error = re.compile(rb"^(?:rlog\: )*(.*,v)(?:\:\d+)?\: (.*)$") # CVSNT error messages look like: # cvs rcsfile: `C:/path/to/file,v' does not appear to be a valid rcs file # cvs [rcsfile aborted]: C:/path/to/file,v: No such file or directory # cvs [rcsfile aborted]: cannot open C:/path/to/file,v: Permission denied _re_cvsnt_error = re.compile( rb"^(?:cvs rcsfile\: |cvs \[rcsfile aborted\]: )" rb"(?:\`(.*,v)' |" rb"cannot open (.*,v)\: |(.*,v)\: |)" rb"(.*)$" ) def _parse_log_header( fp: BinaryIO, ) -> Tuple[ bytes, bytes, Dict[bytes, bytes], Dict[bytes, bytes], bytes, Optional[bytes] ]: """Parse and RCS/CVS log header. fp is a file (pipe) opened for reading the log information. On entry, fp should point to the start of a log entry. On exit, fp will have consumed the separator line between the header and the first revision log. If there is no revision information (e.g. the "-h" switch was passed to rlog), then fp will consumed the file separator line on exit. Returns: filename, default branch, tag dictionary, lock dictionary, rlog error message, and eof flag """ filename = branch = msg = b"" taginfo: Dict[bytes, bytes] = {} # tag name => number lockinfo: Dict[bytes, bytes] = {} # revision => locker state = 0 # 0 = base, 1 = parsing symbols, 2 = parsing locks eof = None while 1: line = fp.readline() if not line: # the true end-of-file eof = _EOF_LOG break if state == 1: - if line[0] == b"\t": + if line.startswith(b"\t"): [tag, rev] = [x.strip() for x in line.split(b":")] taginfo[tag] = rev else: # oops. this line isn't tag info. stop parsing tags. state = 0 if state == 2: - if line[0] == b"\t": + if line.startswith(b"\t"): [locker, rev] = [x.strip() for x in line.split(b":")] lockinfo[rev] = locker else: # oops. this line isn't lock info. stop parsing tags. state = 0 if state == 0: if line == "\n": continue elif line[:9] == b"RCS file:": filename = line[10:-1] elif line[:5] == b"head:": # head = line[6:-1] pass elif line[:7] == b"branch:": branch = line[8:-1] elif line[:6] == b"locks:": # start parsing the lock information state = 2 elif line[:14] == b"symbolic names": # start parsing the tag information state = 1 elif line == ENTRY_END_MARKER: # end of the headers break elif line == LOG_END_MARKER: # end of this file's log information eof = _EOF_FILE break else: error = _re_cvsnt_error.match(line) if error: p1, p2, p3, msg = error.groups() filename = p1 or p2 or p3 if not filename: raise ValueError( "Could not get filename from CVSNT error:\n%r" % line ) eof = _EOF_ERROR break error = _re_log_error.match(line) if error: filename, msg = error.groups() if msg[:30] == b"warning: Unknown phrases like ": # don't worry about this warning. it can happen with some RCS # files that have unknown fields in them e.g. "permissions 644;" continue eof = _EOF_ERROR break return filename, branch, taginfo, lockinfo, msg, eof _re_log_info = re.compile( rb"^date:\s+([^;]+);" rb"\s+author:\s+([^;]+);" rb"\s+state:\s+([^;]+);" rb"(\s+lines:\s+([0-9\s+-]+);?)?" rb"(\s+commitid:\s+([a-zA-Z0-9]+);)?\n$" ) # TODO: _re_rev should be updated to extract the "locked" flag _re_rev = re.compile(rb"^revision\s+([0-9.]+).*") def cvs_strptime(timestr): try: return time.strptime(timestr, "%Y/%m/%d %H:%M:%S")[:-1] + (0,) except ValueError: return time.strptime(timestr, "%Y-%m-%d %H:%M:%S %z")[:-1] + (0,) def _parse_commitid(commitid: bytes) -> Optional[str]: s = commitid.decode("ascii").strip() # Strip "commitid: " tag and the trailing semicolon. s = s[len("commitid: ") : -len(";")] # The commitid itself contains digit and ASCII letters only: for c in s: if ( c not in string.digits and c not in string.ascii_lowercase and c not in string.ascii_uppercase ): raise ValueError("invalid commitid") return s def _parse_log_entry(fp) -> Tuple[Optional[revtuple], Optional[bytes], Optional[bytes]]: """Parse a single log entry. On entry, fp should point to the first line of the entry (the "revision" line). On exit, fp will have consumed the log separator line (dashes) or the end-of-file marker (equals). Returns: Revision data tuple (number string, date, author, state, branches, revnumstr, commitid) if any, log, and eof flag (see _EOF_*) """ rev = None line = fp.readline() if not line: return None, None, _EOF_LOG if line == LOG_END_MARKER: # Needed because some versions of RCS precede LOG_END_MARKER # with ENTRY_END_MARKER return None, None, _EOF_FILE if line[:8] == b"revision": match = _re_rev.match(line) if not match: return None, None, _EOF_LOG rev = match.group(1) line = fp.readline() if not line: return None, None, _EOF_LOG match = _re_log_info.match(line) eof = None log = b"" while 1: line = fp.readline() if not line: # true end-of-file eof = _EOF_LOG break if line[:9] == b"branches:": continue if line == ENTRY_END_MARKER: break if line == LOG_END_MARKER: # end of this file's log information eof = _EOF_FILE break log = log + line if not rev or not match: # there was a parsing error return None, None, eof # parse out a time tuple for the local time tm = cvs_strptime(match.group(1).decode("UTF-8")) # rlog seems to assume that two-digit years are 1900-based (so, "04" # comes out as "1904", not "2004"). EPOCH = 1970 if tm[0] < EPOCH: tm = list(tm) if (tm[0] - 1900) < 70: tm[0] = tm[0] + 100 if tm[0] < EPOCH: raise ValueError("invalid year") date = calendar.timegm(tm) commitid = match.group(6) or None if commitid: parsed_commitid = _parse_commitid(commitid) else: parsed_commitid = None # return a revision tuple compatible with 'rcsparse', the log message, # and the EOF marker return ( revtuple( rev.decode("ascii"), # revision number string date, match.group(2), # author (encoding is arbitrary; don't attempt to decode) match.group(3).decode( "ascii" ), # state, usually "Exp" or "dead"; non-ASCII data here would be weird None, # TODO: branches of this rev None, # TODO: revnumstr of previous rev parsed_commitid, ), log, eof, ) diff --git a/swh/loader/cvs/tests/data/cpmixin.tgz b/swh/loader/cvs/tests/data/cpmixin.tgz new file mode 100644 index 0000000..f1db820 Binary files /dev/null and b/swh/loader/cvs/tests/data/cpmixin.tgz differ diff --git a/swh/loader/cvs/tests/test_loader.py b/swh/loader/cvs/tests/test_loader.py index dc8b915..616d30a 100644 --- a/swh/loader/cvs/tests/test_loader.py +++ b/swh/loader/cvs/tests/test_loader.py @@ -1,1290 +1,1332 @@ # Copyright (C) 2016-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import os import subprocess import tempfile from typing import Any, Dict from urllib.parse import urlparse import pytest from swh.loader.cvs.cvsclient import CVSClient from swh.loader.cvs.loader import BadPathException, CvsLoader from swh.loader.tests import ( assert_last_visit_matches, check_snapshot, get_stats, prepare_repository_from_archive, ) from swh.model.hashutil import hash_to_bytes from swh.model.model import Snapshot, SnapshotBranch, TargetType RUNBABY_SNAPSHOT = Snapshot( id=hash_to_bytes("e64667c400049f560a3856580e0d9e511ffa66c9"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("0f6db8ce49472d7829ddd6141f71c68c0d563f0e"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_not_found_no_mock(swh_storage, tmp_path): """Given an unknown repository, the loader visit ends up in status not_found""" unknown_repo_url = "unknown-repository" loader = CvsLoader(swh_storage, unknown_repo_url, cvsroot_path=tmp_path) assert loader.load() == {"status": "uneventful"} assert_last_visit_matches( swh_storage, unknown_repo_url, status="not_found", type="cvs", ) def test_loader_cvs_visit(swh_storage, datadir, tmp_path): """Eventful visit should yield 1 snapshot""" archive_name = "runbaby" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=RUNBABY_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 5, "directory": 1, "origin": 1, "origin_visit": 1, "release": 0, "revision": 1, "skipped_content": 0, "snapshot": 1, } check_snapshot(RUNBABY_SNAPSHOT, loader.storage) def test_loader_cvs_2_visits_no_change(swh_storage, datadir, tmp_path): """Eventful visit followed by uneventful visit should yield the same snapshot""" archive_name = "runbaby" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} visit_status1 = assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=RUNBABY_SNAPSHOT.id, ) loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "uneventful"} visit_status2 = assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=RUNBABY_SNAPSHOT.id, ) assert visit_status1.date < visit_status2.date assert visit_status1.snapshot == visit_status2.snapshot stats = get_stats(loader.storage) assert stats["origin_visit"] == 1 + 1 # computed twice the same snapshot assert stats["snapshot"] == 1 GREEK_SNAPSHOT = Snapshot( id=hash_to_bytes("c76f8b58a6dfbe6fccb9a85b695f914aa5c4a95a"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("e138207ddd5e1965b5ab9a522bfc2e0ecd233b67"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_with_file_additions_and_deletions(swh_storage, datadir, tmp_path): """Eventful conversion of history with file additions and deletions""" archive_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 8, "directory": 13, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT, loader.storage) def test_loader_cvs_pserver_with_file_additions_and_deletions( swh_storage, datadir, tmp_path ): """Eventful CVS pserver conversion with file additions and deletions""" archive_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 8, "directory": 13, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT, loader.storage) GREEK_SNAPSHOT2 = Snapshot( id=hash_to_bytes("e3d2e8860286000f546c01aa2a3e1630170eb3b6"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("f1ff9a3c7624b1be5e5d51f9ec0abf7dcddbf0b2"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_2_visits_with_change(swh_storage, datadir, tmp_path): """Eventful visit followed by eventful visit should yield two snapshots""" archive_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} visit_status1 = assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 8, "directory": 13, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 1, } archive_name2 = "greek-repository2" archive_path2 = os.path.join(datadir, f"{archive_name2}.tgz") repo_url = prepare_repository_from_archive(archive_path2, archive_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} visit_status2 = assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT2.id, ) stats = get_stats(loader.storage) assert stats == { "content": 10, "directory": 15, "origin": 1, "origin_visit": 2, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 2, } check_snapshot(GREEK_SNAPSHOT2, loader.storage) assert visit_status1.date < visit_status2.date assert visit_status1.snapshot != visit_status2.snapshot def test_loader_cvs_visit_pserver(swh_storage, datadir, tmp_path): """Eventful visit to CVS pserver should yield 1 snapshot""" archive_name = "runbaby" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/runbaby" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=RUNBABY_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 5, "directory": 1, "origin": 1, "origin_visit": 1, "release": 0, "revision": 1, "skipped_content": 0, "snapshot": 1, } check_snapshot(RUNBABY_SNAPSHOT, loader.storage) GREEK_SNAPSHOT3 = Snapshot( id=hash_to_bytes("6e9910ed072662cb482d9017cbf5e1973e6dc09f"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("d9f4837dc55a87d83730c6e277c88b67dae80272"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_visit_pserver_no_eol(swh_storage, datadir, tmp_path): """Visit to CVS pserver with file that lacks trailing eol""" archive_name = "greek-repository3" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT3.id, ) stats = get_stats(loader.storage) assert stats == { "content": 9, "directory": 15, "origin": 1, "origin_visit": 1, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT3, loader.storage) GREEK_SNAPSHOT4 = Snapshot( id=hash_to_bytes("a8593e9233601b31e012d36975f817d2c993d04b"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("51bb99655225c810ee259087fcae505899725360"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_visit_expand_id_keyword(swh_storage, datadir, tmp_path): """Visit to CVS repository with file with an RCS Id keyword""" archive_name = "greek-repository4" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT4.id, ) stats = get_stats(loader.storage) assert stats == { "content": 12, "directory": 20, "origin": 1, "origin_visit": 1, "release": 0, "revision": 11, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT4, loader.storage) def test_loader_cvs_visit_pserver_expand_id_keyword(swh_storage, datadir, tmp_path): """Visit to CVS pserver with file with an RCS Id keyword""" archive_name = "greek-repository4" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT4.id, ) stats = get_stats(loader.storage) assert stats == { "content": 12, "directory": 20, "origin": 1, "origin_visit": 1, "release": 0, "revision": 11, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT4, loader.storage) GREEK_SNAPSHOT5 = Snapshot( id=hash_to_bytes("6484ec9bfff677731cbb6d2bd5058dabfae952ed"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("514b3bef07d56e393588ceda18cc1dfa2dc4e04a"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_with_file_deleted_and_readded(swh_storage, datadir, tmp_path): """Eventful conversion of history with file deletion and re-addition""" archive_name = "greek-repository5" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT5.id, ) stats = get_stats(loader.storage) assert stats == { "content": 9, "directory": 14, "origin": 1, "origin_visit": 1, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT5, loader.storage) def test_loader_cvs_pserver_with_file_deleted_and_readded( swh_storage, datadir, tmp_path ): """Eventful pserver conversion with file deletion and re-addition""" archive_name = "greek-repository5" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT5.id, ) stats = get_stats(loader.storage) assert stats == { "content": 9, "directory": 14, "origin": 1, "origin_visit": 1, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT5, loader.storage) DINO_SNAPSHOT = Snapshot( id=hash_to_bytes("6cf774cec1030ff3e9a301681303adb537855d09"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("b7d3ea1fa878d51323b5200ad2c6ee9d5b656f10"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_readded_file_in_attic(swh_storage, datadir, tmp_path): """Conversion of history with RCS files in the Attic""" # This repository has some file revisions marked "dead" in the Attic only. # This is different to the re-added file tests above, where the RCS file # was moved out of the Attic again as soon as the corresponding deleted # file was re-added. Failure to detect the "dead" file revisions in the # Attic would result in errors in our converted history. archive_name = "dino-readded-file" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/src" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=DINO_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 38, "directory": 70, "origin": 1, "origin_visit": 1, "release": 0, "revision": 35, "skipped_content": 0, "snapshot": 1, } check_snapshot(DINO_SNAPSHOT, loader.storage) def test_loader_cvs_pserver_readded_file_in_attic(swh_storage, datadir, tmp_path): """Conversion over pserver with RCS files in the Attic""" # This repository has some file revisions marked "dead" in the Attic only. # This is different to the re-added file tests above, where the RCS file # was moved out of the Attic again as soon as the corresponding deleted # file was re-added. Failure to detect the "dead" file revisions in the # Attic would result in errors in our converted history. # This has special implications for the pserver case, because the "dead" # revisions will not appear in in the output of 'cvs rlog' by default. archive_name = "dino-readded-file" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/src" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=DINO_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 38, "directory": 70, "origin": 1, "origin_visit": 1, "release": 0, "revision": 35, "skipped_content": 0, "snapshot": 1, } check_snapshot(DINO_SNAPSHOT, loader.storage) DINO_SNAPSHOT2 = Snapshot( id=hash_to_bytes("afdeca6b8ec8f58367b4e014e2210233f1c5bf3d"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("84e428103d42b84713c77afb9420d667062f8676"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_split_commits_by_commitid(swh_storage, datadir, tmp_path): """Conversion of RCS history which needs to be split by commit ID""" # This repository has some file revisions which use the same log message # and can only be told apart by commit IDs. Without commit IDs, these commits # would get merged into a single commit in our conversion result. archive_name = "dino-commitid" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/dino" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=DINO_SNAPSHOT2.id, ) check_snapshot(DINO_SNAPSHOT2, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 18, "directory": 18, "origin": 1, "origin_visit": 1, "release": 0, "revision": 18, "skipped_content": 0, "snapshot": 1, } def test_loader_cvs_pserver_split_commits_by_commitid(swh_storage, datadir, tmp_path): """Conversion via pserver which needs to be split by commit ID""" # This repository has some file revisions which use the same log message # and can only be told apart by commit IDs. Without commit IDs, these commits # would get merged into a single commit in our conversion result. archive_name = "dino-commitid" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/dino" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=DINO_SNAPSHOT2.id, ) check_snapshot(DINO_SNAPSHOT2, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 18, "directory": 18, "origin": 1, "origin_visit": 1, "release": 0, "revision": 18, "skipped_content": 0, "snapshot": 1, } GREEK_SNAPSHOT6 = Snapshot( id=hash_to_bytes("859ae7ca5b31fee594c98abecdd41eff17cae079"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("fa48fb4551898cd8d3305cace971b3b95639e83e"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_empty_lines_in_log_message(swh_storage, datadir, tmp_path): """Conversion of RCS history with empty lines in a log message""" archive_name = "greek-repository6" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT6.id, ) check_snapshot(GREEK_SNAPSHOT6, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 9, "directory": 14, "origin": 1, "origin_visit": 1, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 1, } def test_loader_cvs_pserver_empty_lines_in_log_message(swh_storage, datadir, tmp_path): """Conversion via pserver with empty lines in a log message""" archive_name = "greek-repository6" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT6.id, ) check_snapshot(GREEK_SNAPSHOT6, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 9, "directory": 14, "origin": 1, "origin_visit": 1, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 1, } def get_head_revision_paths_info(loader: CvsLoader) -> Dict[bytes, Dict[str, Any]]: assert loader.snapshot is not None root_dir = loader.snapshot.branches[b"HEAD"].target revision = loader.storage.revision_get([root_dir])[0] assert revision is not None paths = {} for entry in loader.storage.directory_ls(revision.directory, recursive=True): paths[entry["name"]] = entry return paths def test_loader_cvs_with_header_keyword(swh_storage, datadir, tmp_path): """Eventful conversion of history with Header keyword in a file""" archive_name = "greek-repository7" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} repo_url = f"fake://{repo_url[7:]}" loader2 = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader2.load() == {"status": "eventful"} # We cannot verify the snapshot ID. It is unpredicable due to use of the $Header$ # RCS keyword which contains the temporary directory where the repository is stored. expected_stats = { "content": 9, "directory": 14, "origin": 2, "origin_visit": 2, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 1, } stats = get_stats(loader.storage) assert stats == expected_stats stats = get_stats(loader2.storage) assert stats == expected_stats # Ensure that file 'alpha', which contains a $Header$ keyword, # was imported with equal content via file:// and fake:// URLs. paths = get_head_revision_paths_info(loader) paths2 = get_head_revision_paths_info(loader2) alpha = paths[b"alpha"] alpha2 = paths2[b"alpha"] assert alpha["sha1"] == alpha2["sha1"] GREEK_SNAPSHOT8 = Snapshot( id=hash_to_bytes("5278a1f73ed0f804c68f72614a5f78ca5074ab9c"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("b389258fec8151d719e79da80b5e5355a48ec8bc"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_expand_log_keyword(swh_storage, datadir, tmp_path): """Conversion of RCS history with Log keyword in files""" archive_name = "greek-repository8" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT8.id, ) check_snapshot(GREEK_SNAPSHOT8, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 14, "directory": 20, "origin": 1, "origin_visit": 1, "release": 0, "revision": 11, "skipped_content": 0, "snapshot": 1, } def test_loader_cvs_pserver_expand_log_keyword(swh_storage, datadir, tmp_path): """Conversion of RCS history with Log keyword in files""" archive_name = "greek-repository8" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT8.id, ) check_snapshot(GREEK_SNAPSHOT8, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 14, "directory": 20, "origin": 1, "origin_visit": 1, "release": 0, "revision": 11, "skipped_content": 0, "snapshot": 1, } GREEK_SNAPSHOT9 = Snapshot( id=hash_to_bytes("3d08834666df7a589abea07ac409771ebe7e8fe4"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("9971cbb3b540dfe75f3bcce5021cb73d63b47df3"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_visit_expand_custom_keyword(swh_storage, datadir, tmp_path): """Visit to CVS repository with file with a custom RCS keyword""" archive_name = "greek-repository9" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT9.id, ) stats = get_stats(loader.storage) assert stats == { "content": 9, "directory": 14, "origin": 1, "origin_visit": 1, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT9, loader.storage) RCSBASE_SNAPSHOT = Snapshot( id=hash_to_bytes("2c75041ba8868df04349c1c8f4c29f992967b8aa"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("46f076387ff170dc3d4da5e43d953c1fc744c821"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_expand_log_keyword2(swh_storage, datadir, tmp_path): """Another conversion of RCS history with Log keyword in files""" archive_name = "rcsbase-log-kw-test-repo" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/src" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=RCSBASE_SNAPSHOT.id, ) check_snapshot(RCSBASE_SNAPSHOT, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 2, "directory": 3, "origin": 1, "origin_visit": 1, "release": 0, "revision": 3, "skipped_content": 0, "snapshot": 1, } def test_loader_cvs_pserver_expand_log_keyword2(swh_storage, datadir, tmp_path): """Another conversion of RCS history with Log keyword in files""" archive_name = "rcsbase-log-kw-test-repo" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/src" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=RCSBASE_SNAPSHOT.id, ) check_snapshot(RCSBASE_SNAPSHOT, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 2, "directory": 3, "origin": 1, "origin_visit": 1, "release": 0, "revision": 3, "skipped_content": 0, "snapshot": 1, } @pytest.mark.parametrize( "rlog_unsafe_path", [ # paths that walk to parent directory: "unsafe_rlog_with_unsafe_relative_path.rlog", # absolute path outside the CVS server's root directory: "unsafe_rlog_wrong_arborescence.rlog", ], ) def test_loader_cvs_weird_paths_in_rlog( swh_storage, datadir, tmp_path, mocker, rlog_unsafe_path ): """Handle cvs rlog output which contains unsafe paths""" archive_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" # And let's pretend the server returned this rlog output instead of # what it would actually return. rlog_file = tempfile.NamedTemporaryFile( dir=tmp_path, mode="w+", delete=False, prefix="weird-path-rlog-" ) rlog_file_path = rlog_file.name rlog_weird_paths = open(os.path.join(datadir, rlog_unsafe_path)) for line in rlog_weird_paths.readlines(): rlog_file.write(line.replace("{cvsroot_path}", os.path.dirname(repo_url[7:]))) rlog_file.close() rlog_file_override = open(rlog_file_path, "rb") # re-open as bytes instead of str mock_read = mocker.patch("swh.loader.cvs.cvsclient.CVSClient.fetch_rlog") mock_read.return_value = rlog_file_override def side_effect(self, path="", state=""): return None mock_read.side_effect = side_effect(side_effect) try: loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name), ) except BadPathException: pass assert loader.load() == {"status": "failed"} assert_last_visit_matches( swh_storage, repo_url, status="failed", type="cvs", ) assert mock_read.called rlog_file_override.close() os.unlink(rlog_file_path) def test_loader_rsync_retry(swh_storage, mocker, tmp_path): module_name = "module" host = "example.org" path = f"/cvsroot/{module_name}" repo_url = f"rsync://{host}{path}/" rsync_first_call = ["rsync", repo_url] rsync_second_call = [ "rsync", "-az", f"{repo_url}CVSROOT/", os.path.join(tmp_path, "CVSROOT/"), ] rsync_third_call = [ "rsync", "-az", f"{repo_url}{module_name}/", os.path.join(tmp_path, f"{module_name}/"), ] mock_subprocess = mocker.patch("swh.loader.cvs.loader.subprocess") mock_subprocess.run.side_effect = [ subprocess.CompletedProcess(args=rsync_first_call, returncode=23), subprocess.CompletedProcess( args=rsync_first_call, returncode=0, stdout=f""" drwxr-xr-x 21 2012/11/04 06:58:58 . drwxr-xr-x 39 2021/01/22 10:21:05 CVSROOT drwxr-xr-x 15 2020/12/28 00:50:21 {module_name}""", ), subprocess.CompletedProcess( args=rsync_second_call, returncode=23, ), subprocess.CompletedProcess( args=rsync_second_call, returncode=23, ), subprocess.CompletedProcess(args=rsync_second_call, returncode=0), subprocess.CompletedProcess( args=rsync_third_call, returncode=23, ), subprocess.CompletedProcess( args=rsync_third_call, returncode=23, ), subprocess.CompletedProcess(args=rsync_third_call, returncode=0), ] loader = CvsLoader(swh_storage, repo_url) loader.cvs_module_name = module_name loader.cvsroot_path = tmp_path loader.fetch_cvs_repo_with_rsync(host, path) @pytest.mark.parametrize( "pserver_url", [ "pserver://anonymous:anonymous@cvs.example.org/cvsroot/project/module", "pserver://anonymous@cvs.example.org/cvsroot/project/module", ], ) def test_cvs_client_connect_pserver(mocker, pserver_url): from swh.loader.cvs.cvsclient import socket conn = mocker.MagicMock() conn.recv.side_effect = [b"I LOVE YOU\n", b"Valid-requests \n", b"ok\n"] mocker.patch.object(socket, "create_connection").return_value = conn parsed_url = urlparse(pserver_url) # check cvs client can be instantiated without errors CVSClient(parsed_url) @pytest.mark.parametrize("protocol", ["rsync", "pserver"]) def test_loader_cvs_with_non_utf8_directory_paths( swh_storage, datadir, tmp_path, protocol ): archive_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/greek-tree" # CVS module name protocol_prefix = "file://" if protocol == "pserver": protocol_prefix = "fake://" repo_url = repo_url.replace("file://", protocol_prefix) for root, _, files in os.walk(repo_url.replace(protocol_prefix, "")): for file in files: # clone existing file in repository but makes it path non UTF-8 encoded filepath = os.path.join(root, file) with open(filepath, "rb") as f: filecontent = f.read() filepath = root.encode() + ("é" + file).encode("iso-8859-1") with open(filepath, "wb") as f: f.write(filecontent) loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} + + +CPMIXIN_SNAPSHOT = Snapshot( + id=hash_to_bytes("105b49290a48cc780f5519588ae822e2dd942930"), + branches={ + b"HEAD": SnapshotBranch( + target=hash_to_bytes("658f18d145376f0b71716649602752b509cfdbd4"), + target_type=TargetType.REVISION, + ) + }, +) + + +@pytest.mark.parametrize("protocol", ["rsync", "pserver"]) +def test_loader_cvs_with_rev_numbers_greater_than_one( + swh_storage, datadir, tmp_path, protocol +): + archive_name = "cpmixin" + archive_path = os.path.join(datadir, f"{archive_name}.tgz") + repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) + repo_url += "/cpmixin" # CVS module name + + protocol_prefix = "file://" + if protocol == "pserver": + protocol_prefix = "fake://" + repo_url = repo_url.replace("file://", protocol_prefix) + + loader = CvsLoader( + swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) + ) + + assert loader.load() == {"status": "eventful"} + + assert_last_visit_matches( + loader.storage, + repo_url, + status="full", + type="cvs", + snapshot=CPMIXIN_SNAPSHOT.id, + ) + + check_snapshot(CPMIXIN_SNAPSHOT, loader.storage)