diff --git a/swh/loader/cvs/cvs2gitdump/cvs2gitdump.py b/swh/loader/cvs/cvs2gitdump/cvs2gitdump.py index 572a89d..8bfa321 100644 --- a/swh/loader/cvs/cvs2gitdump/cvs2gitdump.py +++ b/swh/loader/cvs/cvs2gitdump/cvs2gitdump.py @@ -1,645 +1,648 @@ #!/usr/local/bin/python # # Copyright (c) 2012 YASUOKA Masahiko # # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. # Usage # # First import: # % git init --bare /git/openbsd.git # % python cvs2gitdump.py -k OpenBSD -e openbsd.org /cvs/openbsd/src \ # > openbsd.dump # % git --git-dir /git/openbsd.git fast-import < openbsd.dump # # Periodic import: # % sudo cvsync # % python cvs2gitdump.py -k OpenBSD -e openbsd.org /cvs/openbsd/src \ # /git/openbsd.git > openbsd2.dump # % git --git-dir /git/openbsd.git fast-import < openbsd2.dump # import getopt import os import re import subprocess import sys import time import swh.loader.cvs.rcsparse as rcsparse CHANGESET_FUZZ_SEC = 300 def usage(): print('usage: cvs2gitdump [-ah] [-z fuzz] [-e email_domain] ' '[-E log_encodings]\n' '\t[-k rcs_keywords] [-b branch] [-m module] [-l last_revision]\n' '\tcvsroot [git_dir]', file=sys.stderr) def main(): email_domain = None do_incremental = False git_tip = None git_branch = 'master' dump_all = False log_encoding = 'utf-8,iso-8859-1' rcs = RcsKeywords() modules = [] last_revision = None fuzzsec = CHANGESET_FUZZ_SEC try: opts, args = getopt.getopt(sys.argv[1:], 'ab:hm:z:e:E:k:t:l:') for opt, v in opts: if opt == '-z': fuzzsec = int(v) elif opt == '-e': email_domain = v elif opt == '-a': dump_all = True elif opt == '-b': git_branch = v elif opt == '-E': log_encoding = v elif opt == '-k': rcs.add_id_keyword(v) elif opt == '-m': if v == '.git': print('Cannot handle the path named \'.git\'', file=sys.stderr) sys.exit(1) modules.append(v) elif opt == '-l': last_revision = v elif opt == '-h': usage() sys.exit(1) except getopt.GetoptError as msg: print(msg, file=sys.stderr) usage() sys.exit(1) if len(args) == 0 or len(args) > 2: usage() sys.exit(1) log_encodings = log_encoding.split(',') cvsroot = args[0] while cvsroot[-1] == '/': cvsroot = cvsroot[:-1] if len(args) == 2: do_incremental = True git = subprocess.Popen( ['git', '--git-dir=' + args[1], '-c', 'i18n.logOutputEncoding=UTF-8', 'log', '--max-count', '1', '--date=raw', '--format=%ae%n%ad%n%H', git_branch], encoding='utf-8', stdout=subprocess.PIPE) outs = git.stdout.readlines() git.wait() if git.returncode != 0: print("Couldn't exec git", file=sys.stderr) sys.exit(git.returncode) git_tip = outs[2].strip() if last_revision is not None: git = subprocess.Popen( ['git', '--git-dir=' + args[1], '-c', 'i18n.logOutputEncoding=UTF-8', 'log', '--max-count', '1', '--date=raw', '--format=%ae%n%ad%n%H', last_revision], encoding='utf-8', stdout=subprocess.PIPE) outs = git.stdout.readlines() git.wait() if git.returncode != 0: print("Coundn't exec git", file=sys.stderr) sys.exit(git.returncode) last_author = outs[0].strip() last_ctime = float(outs[1].split()[0]) # strip off the domain part from the last author since cvs doesn't have # the domain part. if do_incremental and email_domain is not None and \ last_author.lower().endswith(('@' + email_domain).lower()): last_author = last_author[:-1 * (1 + len(email_domain))] cvs = CvsConv(cvsroot, rcs, not do_incremental, fuzzsec) print('** walk cvs tree', file=sys.stderr) if len(modules) == 0: cvs.walk() else: for module in modules: cvs.walk(module) changesets = sorted(cvs.changesets) nchangesets = len(changesets) print('** cvs has %d changeset' % (nchangesets), file=sys.stderr) if nchangesets <= 0: sys.exit(0) if not dump_all: # don't use last 10 minutes for safety max_time_max = changesets[-1].max_time - 600 else: max_time_max = changesets[-1].max_time found_last_revision = False markseq = cvs.markseq extags = set() for k in changesets: if do_incremental and not found_last_revision: if k.min_time == last_ctime and k.author == last_author: found_last_revision = True for tag in k.tags: extags.add(tag) continue if k.max_time > max_time_max: break marks = {} for f in k.revs: if not do_incremental: marks[f.markseq] = f else: markseq = markseq + 1 git_dump_file(f.path, f.rev, rcs, markseq) marks[markseq] = f log = rcsparse.rcsfile(k.revs[0].path).getlog(k.revs[0].rev) for i, e in enumerate(log_encodings): try: how = 'ignore' if i == len(log_encodings) - 1 else 'strict' log = log.decode(e, how) break except UnicodeError: pass log = log.encode('utf-8', 'ignore') output('commit refs/heads/' + git_branch) markseq = markseq + 1 output('mark :%d' % (markseq)) email = k.author if email_domain is None \ else k.author + '@' + email_domain output('author %s <%s> %d +0000' % (k.author, email, k.min_time)) output('committer %s <%s> %d +0000' % (k.author, email, k.min_time)) output('data', len(log)) output(log, end='') if do_incremental and git_tip is not None: output('from', git_tip) git_tip = None for m in marks: f = marks[m] mode = 0o100755 if os.access(f.path, os.X_OK) else 0o100644 fn = file_path(cvs.cvsroot, f.path) if f.state == 'dead': output('D', fn) else: output('M %o :%d %s' % (mode, m, fn)) output('') for tag in k.tags: if tag in extags: continue output('reset refs/tags/%s' % (tag)) output('from :%d' % (markseq)) output('') if do_incremental and not found_last_revision: raise Exception('could not find the last revision') print('** dumped', file=sys.stderr) # # Encode by UTF-8 always for string objects since encoding for git-fast-import # is UTF-8. Also write without conversion for a bytes object (file bodies # might be various encodings) # def output(*args, end='\n'): if len(args) == 0: pass elif len(args) > 1 or isinstance(args[0], str): lines = ' '.join( [arg if isinstance(arg, str) else str(arg) for arg in args]) sys.stdout.buffer.write(lines.encode('utf-8')) else: sys.stdout.buffer.write(args[0]) if len(end) > 0: sys.stdout.buffer.write(end.encode('utf-8')) class FileRevision: def __init__(self, path, rev, state, markseq): self.path = path self.rev = rev self.state = state self.markseq = markseq class ChangeSetKey: def __init__(self, branch, author, timestamp, log, commitid, fuzzsec): self.branch = branch self.author = author self.min_time = timestamp self.max_time = timestamp self.commitid = commitid self.fuzzsec = fuzzsec self.revs = [] self.tags = [] self.log_hash = 0 h = 0 for c in log: h = 31 * h + c self.log_hash = h def __lt__(self, other): return self._cmp(other) < 0 def __gt__(self, other): return self._cmp(other) > 0 def __eq__(self, other): return self._cmp(other) == 0 def __le__(self, other): return self._cmp(other) <= 0 def __ge__(self, other): return self._cmp(other) >= 0 def __ne__(self, other): return self._cmp(other) != 0 def _cmp(self, anon): # compare by the commitid cid = _cmp2(self.commitid, anon.commitid) if cid == 0 and self.commitid is not None: # both have commitid and they are same return 0 # compare by the time ma = anon.min_time - self.max_time mi = self.min_time - anon.max_time ct = self.min_time - anon.min_time if ma > self.fuzzsec or mi > self.fuzzsec: return ct if cid != 0: # only one has the commitid, this means different commit return cid if ct == 0 else ct # compare by log, branch and author c = _cmp2(self.log_hash, anon.log_hash) if c == 0: c = _cmp2(self.branch, anon.branch) if c == 0: c = _cmp2(self.author, anon.author) if c == 0: return 0 return ct if ct != 0 else c def merge(self, anot): self.max_time = max(self.max_time, anot.max_time) self.min_time = min(self.min_time, anot.min_time) self.revs.extend(anot.revs) def __hash__(self): return hash(self.branch + '/' + self.author) * 31 + self.log_hash def put_file(self, path, rev, state, markseq): self.revs.append(FileRevision(path, rev, state, markseq)) def _cmp2(a, b): _a = a is not None _b = b is not None return (a > b) - (a < b) if _a and _b else (_a > _b) - (_a < _b) class CvsConv: def __init__(self, cvsroot, rcs, dumpfile, fuzzsec): self.cvsroot = cvsroot self.rcs = rcs self.changesets = dict() self.dumpfile = dumpfile self.markseq = 0 self.tags = dict() self.fuzzsec = fuzzsec def walk(self, module=None): p = [self.cvsroot] if module is not None: p.append(module) path = os.path.join(*p) for root, dirs, files in os.walk(path): if '.git' in dirs: print('Ignore %s: cannot handle the path named \'.git\'' % ( root + os.sep + '.git'), file=sys.stderr) dirs.remove('.git') if '.git' in files: print('Ignore %s: cannot handle the path named \'.git\'' % ( root + os.sep + '.git'), file=sys.stderr) files.remove('.git') for f in files: if not f[-2:] == ',v': continue self.parse_file(root + os.sep + f) for t, c in list(self.tags.items()): c.tags.append(t) def parse_file(self, path): rtags = dict() rcsfile = rcsparse.rcsfile(path) branches = {'1': 'HEAD', '1.1.1': 'VENDOR'} for k, v in list(rcsfile.symbols.items()): r = v.split('.') if len(r) == 3: branches[v] = 'VENDOR' elif len(r) >= 3 and r[-2] == '0': branches['.'.join(r[:-2] + r[-1:])] = k if len(r) == 2 and branches[r[0]] == 'HEAD': if v not in rtags: rtags[v] = list() rtags[v].append(k) revs = rcsfile.revs.items() # sort by revision descending to priorize 1.1.1.1 than 1.1 revs = sorted(revs, key=lambda a: a[1][0], reverse=True) # sort by time revs = sorted(revs, key=lambda a: a[1][1]) novendor = False have_initial_revision = False last_vendor_status = None for k, v in revs: r = k.split('.') if len(r) == 4 and r[0] == '1' and r[1] == '1' and r[2] == '1' \ and r[3] == '1': if have_initial_revision: continue if v[3] == 'dead': continue last_vendor_status = v[3] have_initial_revision = True elif len(r) == 4 and r[0] == '1' and r[1] == '1' and r[2] == '1': if novendor: continue last_vendor_status = v[3] elif len(r) == 2: if r[0] == '1' and r[1] == '1': if have_initial_revision: continue if v[3] == 'dead': continue have_initial_revision = True elif r[0] == '1' and r[1] != '1': novendor = True if last_vendor_status == 'dead' and v[3] == 'dead': last_vendor_status = None continue last_vendor_status = None else: # trunk only continue if self.dumpfile: self.markseq = self.markseq + 1 git_dump_file(path, k, self.rcs, self.markseq) b = '.'.join(r[:-1]) try: a = ChangeSetKey( branches[b], v[2], v[1], rcsfile.getlog(v[0]), v[6], self.fuzzsec) except Exception as e: print('Aborted at %s %s' % (path, v[0]), file=sys.stderr) raise e a.put_file(path, k, v[3], self.markseq) while a in self.changesets: c = self.changesets[a] del self.changesets[a] c.merge(a) a = c self.changesets[a] = a if k in rtags: for t in rtags[k]: if t not in self.tags or \ self.tags[t].max_time < a.max_time: self.tags[t] = a def file_path(r, p): if r.endswith('/'): r = r[:-1] - path = p[:-2] # drop ",v" + if p[-2:] == ',v': + path = p[:-2] # drop ",v" + else: + path = p p = path.split('/') if len(p) > 0 and p[-2] == 'Attic': path = '/'.join(p[:-2] + [p[-1]]) if path.startswith(r): path = path[len(r) + 1:] return path def git_dump_file(path, k, rcs, markseq): try: cont = rcs.expand_keyword(path, rcsparse.rcsfile(path), k) except RuntimeError as msg: print('Unexpected runtime error on parsing', path, k, ':', msg, file=sys.stderr) print('unlimit the resource limit may fix this problem.', file=sys.stderr) sys.exit(1) output('blob') output('mark :%d' % markseq) output('data', len(cont)) output(cont) class RcsKeywords: RCS_KW_AUTHOR = (1 << 0) RCS_KW_DATE = (1 << 1) RCS_KW_LOG = (1 << 2) RCS_KW_NAME = (1 << 3) RCS_KW_RCSFILE = (1 << 4) RCS_KW_REVISION = (1 << 5) RCS_KW_SOURCE = (1 << 6) RCS_KW_STATE = (1 << 7) RCS_KW_FULLPATH = (1 << 8) RCS_KW_MDOCDATE = (1 << 9) RCS_KW_LOCKER = (1 << 10) RCS_KW_ID = (RCS_KW_RCSFILE | RCS_KW_REVISION | RCS_KW_DATE | RCS_KW_AUTHOR | RCS_KW_STATE) RCS_KW_HEADER = (RCS_KW_ID | RCS_KW_FULLPATH) rcs_expkw = { b"Author": RCS_KW_AUTHOR, b"Date": RCS_KW_DATE, b"Header": RCS_KW_HEADER, b"Id": RCS_KW_ID, b"Log": RCS_KW_LOG, b"Name": RCS_KW_NAME, b"RCSfile": RCS_KW_RCSFILE, b"Revision": RCS_KW_REVISION, b"Source": RCS_KW_SOURCE, b"State": RCS_KW_STATE, b"Mdocdate": RCS_KW_MDOCDATE, b"Locker": RCS_KW_LOCKER } RCS_KWEXP_NONE = (1 << 0) RCS_KWEXP_NAME = (1 << 1) # include keyword name RCS_KWEXP_VAL = (1 << 2) # include keyword value RCS_KWEXP_LKR = (1 << 3) # include name of locker RCS_KWEXP_OLD = (1 << 4) # generate old keyword string RCS_KWEXP_ERR = (1 << 5) # mode has an error RCS_KWEXP_DEFAULT = (RCS_KWEXP_NAME | RCS_KWEXP_VAL) RCS_KWEXP_KVL = (RCS_KWEXP_NAME | RCS_KWEXP_VAL | RCS_KWEXP_LKR) def __init__(self): self.rerecomple() def rerecomple(self): pat = b'|'.join(list(self.rcs_expkw.keys())) self.re_kw = re.compile(b".*?\\$(" + pat + b")[\\$:]") def add_id_keyword(self, keyword): self.rcs_expkw[keyword.encode('ascii')] = self.RCS_KW_ID self.rerecomple() def kflag_get(self, flags): if flags is None: return self.RCS_KWEXP_DEFAULT fl = 0 for fc in flags: if fc == 'k': fl |= self.RCS_KWEXP_NAME elif fc == 'v': fl |= self.RCS_KWEXP_VAL elif fc == 'l': fl |= self.RCS_KWEXP_LKR elif fc == 'o': if len(flags) != 1: fl |= self.RCS_KWEXP_ERR fl |= self.RCS_KWEXP_OLD elif fc == 'b': if len(flags) != 1: fl |= self.RCS_KWEXP_ERR fl |= self.RCS_KWEXP_NONE else: fl |= self.RCS_KWEXP_ERR return fl def expand_keyword(self, filename, rcs, r): rev = rcs.revs[r] mode = self.kflag_get(rcs.expand) if (mode & (self.RCS_KWEXP_NONE | self.RCS_KWEXP_OLD)) != 0: return rcs.checkout(rev[0]) ret = [] for line in rcs.checkout(rev[0]).split(b'\n'): logbuf = None m = self.re_kw.match(line) if m is None: # No RCS Keywords, use it as it is ret += [line] continue line0 = b'' while m is not None: try: dsign = m.end(1) + line[m.end(1):].index(b'$') except ValueError: break prefix = line[:m.start(1) - 1] line = line[dsign + 1:] line0 += prefix expbuf = '' if (mode & self.RCS_KWEXP_NAME) != 0: expbuf += '$' expbuf += m.group(1).decode('ascii') if (mode & self.RCS_KWEXP_VAL) != 0: expbuf += ': ' if (mode & self.RCS_KWEXP_VAL) != 0: expkw = self.rcs_expkw[m.group(1)] if (expkw & self.RCS_KW_RCSFILE) != 0: expbuf += filename \ if (expkw & self.RCS_KW_FULLPATH) != 0 \ else os.path.basename(filename) expbuf += " " if (expkw & self.RCS_KW_REVISION) != 0: expbuf += rev[0] expbuf += " " if (expkw & self.RCS_KW_DATE) != 0: expbuf += time.strftime( "%Y/%m/%d %H:%M:%S ", time.gmtime(rev[1])) if (expkw & self.RCS_KW_MDOCDATE) != 0: d = time.gmtime(rev[1]) expbuf += time.strftime( "%B%e %Y " if (d.tm_mday < 10) else "%B %e %Y ", d) if (expkw & self.RCS_KW_AUTHOR) != 0: expbuf += rev[2] expbuf += " " if (expkw & self.RCS_KW_STATE) != 0: expbuf += rev[3] expbuf += " " if (expkw & self.RCS_KW_LOG) != 0: p = prefix expbuf += filename \ if (expkw & self.RCS_KW_FULLPATH) != 0 \ else os.path.basename(filename) expbuf += " " logbuf = p + ( 'Revision %s %s %s\n' % ( rev[0], time.strftime( "%Y/%m/%d %H:%M:%S", time.gmtime(rev[1])), rev[2])).encode('ascii') for lline in rcs.getlog(rev[0]).rstrip().split(b'\n'): if len(lline) == 0: logbuf += p.rstrip() + b'\n' else: logbuf += p + lline.lstrip() + b'\n' if len(line) == 0: logbuf += p.rstrip() else: logbuf += p + line.lstrip() line = b'' if (expkw & self.RCS_KW_SOURCE) != 0: expbuf += filename expbuf += " " if (expkw & (self.RCS_KW_NAME | self.RCS_KW_LOCKER)) != 0: expbuf += " " if (mode & self.RCS_KWEXP_NAME) != 0: expbuf += '$' line0 += expbuf[:255].encode('ascii') m = self.re_kw.match(line) ret += [line0 + line] if logbuf is not None: ret += [logbuf] return b'\n'.join(ret) # ---------------------------------------------------------------------- # entry point # ---------------------------------------------------------------------- if __name__ == '__main__': main() diff --git a/swh/loader/cvs/cvsclient.py b/swh/loader/cvs/cvsclient.py new file mode 100644 index 0000000..e670f96 --- /dev/null +++ b/swh/loader/cvs/cvsclient.py @@ -0,0 +1,334 @@ +# Copyright (C) 2015-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Minimal CVS client implementation + +""" + +import socket +import subprocess +import os.path +import tempfile +import re + +from swh.loader.exception import NotFound + +CVS_PSERVER_PORT = 2401 +CVS_PROTOCOL_BUFFER_SIZE = 8192 +EXAMPLE_PSERVER_URL = "pserver://user:password@cvs.example.com/cvsroot/repository" +EXAMPLE_SSH_URL = "ssh://user@cvs.example.com/cvsroot/repository" + +VALID_RESPONSES = [ "ok", "error", "Valid-requests", "Checked-in", + "New-entry", "Checksum", "Copy-file", "Updated", "Created", + "Update-existing", "Merged", "Patched", "Rcs-diff", "Mode", + "Removed", "Remove-entry", "Template", "Notified", "Module-expansion", + "Wrapper-rcsOption", "M", "Mbinary", "E", "F", "MT" ] + +# Trivially encode strings to protect them from innocent eyes (i.e., +# inadvertent password compromises, like a network administrator +# who's watching packets for legitimate reasons and accidentally sees +# the password protocol go by). +# +# This is NOT secure encryption. +def scramble_password(password): + s = ['A'] # scramble scheme version number + scramble_shifts = [ + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, + 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, + 114,120, 53, 79, 96,109, 72,108, 70, 64, 76, 67,116, 74, 68, 87, + 111, 52, 75,119, 49, 34, 82, 81, 95, 65,112, 86,118,110,122,105, + 41, 57, 83, 43, 46,102, 40, 89, 38,103, 45, 50, 42,123, 91, 35, + 125, 55, 54, 66,124,126, 59, 47, 92, 71,115, 78, 88,107,106, 56, + 36,121,117,104,101,100, 69, 73, 99, 63, 94, 93, 39, 37, 61, 48, + 58,113, 32, 90, 44, 98, 60, 51, 33, 97, 62, 77, 84, 80, 85,223, + 225,216,187,166,229,189,222,188,141,249,148,200,184,136,248,190, + 199,170,181,204,138,232,218,183,255,234,220,247,213,203,226,193, + 174,172,228,252,217,201,131,230,197,211,145,238,161,179,160,212, + 207,221,254,173,202,146,224,151,140,196,205,130,135,133,143,246, + 192,159,244,239,185,168,215,144,139,165,180,157,147,186,214,176, + 227,231,219,169,175,156,206,198,129,164,150,210,154,177,134,127, + 182,128,158,208,162,132,167,209,149,241,153,251,237,236,171,195, + 243,233,253,240,194,250,191,155,142,137,245,235,163,242,178,152 ] + for c in password: + s.append('%c' % scramble_shifts[ord(c)]) + return "".join(s) + + +class CVSProtocolError(Exception): + pass + +_re_kb_opt = re.compile(b'\/-kb\/') + +class CVSClient: + + def connect_pserver(self, hostname, port, auth): + if port == None: + port = CVS_PSERVER_PORT + if auth == None: + raise NotFound("Username and password are required for a pserver connection: %s" % EXAMPLE_PSERVER_URL) + try: + user = auth.split(':')[0] + password = auth.split(':')[1] + except IndexError: + raise NotFound("Username and password are required for a pserver connection: %s" % EXAMPLE_PSERVER_URL) + + try: + self.socket = socket.create_connection((hostname, port)) + except ConnectionRefusedError: + raise NotFound("Could not connect to %s:%s", hostname, port) + + scrambled_password = scramble_password(password) + request = "BEGIN AUTH REQUEST\n%s/%s\n%s\n%s\nEND AUTH REQUEST\n" \ + % (self.cvsroot_path, self.cvs_module_name, user, scrambled_password) + self.socket.sendall(request.encode('UTF-8')) + + response = self.socket.recv(11) + if response != b"I LOVE YOU\n": + raise NotFound("pserver authentication failed for %s:%s" % (hostname, port)) + + def connect_ssh(self, hostname, port, auth): + command = [ 'ssh' ] + if auth != None: + # Assume 'auth' contains only a user name. + # We do not support password authentication with SSH since the + # anoncvs user is usually granted access without a password. + command += [ '-l' , '%s' % auth ] + if port != None: + command += [ '-p' , '%d' % port ] + + # accept new SSH hosts keys upon first use; changed host keys will require intervention + command += ['-o', "StrictHostKeyChecking=accept-new" ] + + # disable interactive prompting + command += ['-o', "BatchMode=yes" ] + + # disable further option processing by adding '--' + command += [ '--' ] + + command += ['%s' % hostname, 'cvs', 'server'] + self.ssh = subprocess.Popen(command, + bufsize=0, # use non-buffered I/O to match behaviour of self.socket + stdin=subprocess.PIPE, stdout=subprocess.PIPE) + + def connect_fake(self, hostname, port, auth): + command = [ 'cvs', 'server' ] + self.ssh = subprocess.Popen(command, + bufsize=0, # use non-buffered I/O to match behaviour of self.socket + stdin=subprocess.PIPE, stdout=subprocess.PIPE) + + def conn_read_line(self, require_newline=True): + if len(self.linebuffer) != 0: + return self.linebuffer.pop(0) + buf = b'' + idx = -1 + while idx == -1: + if len(buf) >= CVS_PROTOCOL_BUFFER_SIZE: + if require_newline: + raise CVSProtocolError("Overlong response from CVS server: %s" % buf) + else: + break + if self.socket: + buf += self.socket.recv(CVS_PROTOCOL_BUFFER_SIZE) + elif self.ssh: + buf += self.ssh.stdout.read(CVS_PROTOCOL_BUFFER_SIZE) + else: + raise Exception("No valid connection") + if not buf: + return None + idx = buf.rfind(b'\n') + if idx != -1: + self.linebuffer = buf[:idx + 1].splitlines(keepends=True) + else: + if require_newline: + raise CVSProtocolError("Invalid response from CVS server: %s" % buf) + else: + self.linebuffer.append(buf) + if len(self.incomplete_line) > 0: + self.linebuffer[0] = self.incomplete_line + self.linebuffer[0] + if idx != -1: + self.incomplete_line = buf[idx + 1:] + else: + self.incomplete_line = b'' + return self.linebuffer.pop(0) + + def conn_write(self, data): + if self.socket: + return self.socket.sendall(data) + if self.ssh: + self.ssh.stdin.write(data) + return self.ssh.stdin.flush() + raise Exception("No valid connection") + + def conn_write_str(self, s): + return self.conn_write(s.encode('UTF-8')) + + def conn_close(self): + if self.socket: + self.socket.close() + if self.ssh: + self.ssh.kill() + try: + self.ssh.wait(timeout=10) + except TimeoutExpired as e: + raise TimeoutExpired("Could not terminate ssh program: %s" % e) + + def __init__(self, url): + """ + Connect to a CVS server at the specified URL and perform the initial + CVS protocol handshake. + """ + self.hostname = url.host + self.cvsroot_path = os.path.dirname(url.path) + self.cvs_module_name = os.path.basename(url.path) + self.socket = None + self.ssh = None + self.linebuffer = list() + self.incomplete_line = b'' + + if url.scheme == 'pserver': + self.connect_pserver(url.host, url.port, url.auth) + elif url.scheme == 'ssh': + self.connect_ssh(url.host, url.port, url.auth) + elif url.scheme == 'fake': + self.connect_fake(url.host, url.port, url.auth) + else: + raise NotFound("Invalid CVS origin URL '%s'" % url) + + # we should have a connection now + assert self.socket or self.ssh + + self.conn_write_str("Root %s\nValid-responses %s\nvalid-requests\nUseUnchanged\n" % \ + (self.cvsroot_path, ' '.join(VALID_RESPONSES))) + response = self.conn_read_line() + if not response: + raise CVSProtocolError("No response from CVS server") + try: + if response[0:15] != b"Valid-requests ": + raise CVSProtocolError("Invalid response from CVS server: %s" % response) + except IndexError: + raise CVSProtocolError("Invalid response from CVS server: %s" % response) + response = self.conn_read_line() + if response != b"ok\n": + raise CVSProtocolError("Invalid response from CVS server: %s" % response) + + def __del__(self): + self.conn_close() + + def _parse_rlog_response(self, fp): + rlog_output = tempfile.TemporaryFile() + expect_error = False + for line in fp.readlines(): + if expect_error: + raise CVSProtocolError('CVS server error: %s' % line) + if line == b'ok\n': + break + elif line == b'M \n': + continue + elif line[0:2] == b'M ': + rlog_output.write(line[2:]) + elif line[0:8] == b'MT text ': + rlog_output.write(line[8:-1]) + elif line[0:8] == b'MT date ': + rlog_output.write(line[8:-1]) + elif line[0:10] == b'MT newline': + rlog_output.write(line[10:]) + elif line[0:7] == b'error ': + epxect_error = True + continue + else: + raise CVSProtocolError('Bad CVS protocol response: %s' % line) + rlog_output.seek(0) + return rlog_output + + + def fetch_rlog(self): + fp = tempfile.TemporaryFile() + self.conn_write_str("Global_option -q\nArgument --\nArgument %s\nrlog\n" % \ + self.cvs_module_name) + while True: + response = self.conn_read_line() + if response == None: + raise CVSProtocolError("No response from CVS server") + if response[0:2] == b"E ": + raise CVSProtocolError("Error response from CVS server: %s" % response) + fp.write(response) + if response == b"ok\n": + break + fp.seek(0) + return self._parse_rlog_response(fp) + + def checkout(self, path, rev, dest_dir): + skip_line = False + expect_modeline = False + expect_bytecount = False + have_bytecount = False + bytecount = 0 + dirname = os.path.dirname(path) + if dirname: + self.conn_write_str("Directory %s\n%s\n" % (dirname, dirname)) + filename = os.path.basename(path) + co_output = tempfile.NamedTemporaryFile(dir=dest_dir, delete=True, + prefix='cvsclient-checkout-%s-r%s-' % (filename, rev)) + # TODO: cvs <= 1.10 servers expect to be given every Directory along the path. + self.conn_write_str("Directory %s\n%s\n" + "Global_option -q\n" + "Argument -r%s\n" + "Argument -kb\n" + "Argument --\nArgument %s\nco \n" % (self.cvs_module_name, + self.cvs_module_name, rev, path)) + while True: + if have_bytecount and bytecount > 0: + response = self.conn_read_line(require_newline=False) + if response == None: + raise CVSProtocolError("Incomplete response from CVS server") + co_output.write(response) + bytecount -= len(response) + if bytecount < 0: + raise CVSProtocolError("Overlong response from CVS server: %s" % response) + continue + else: + response = self.conn_read_line() + if response[0:2] == b'E ': + raise CVSProtocolError('Error from CVS server: %s' % response) + if have_bytecount and bytecount == 0 and response == b'ok\n': + break + if skip_line: + skip_line = False + continue + elif expect_bytecount: + try: + bytecount = int(response[0:-1]) # strip trailing \n + except ValueError: + raise CVSProtocolError('Bad CVS protocol response: %s' % response) + have_bytecount = True + continue + elif response == b'M \n': + continue + elif response == b'MT +updated\n': + continue + elif response == b'MT -updated\n': + continue + elif response[0:9] == b'MT fname ': + continue + elif response[0:8] == b'Created ': + skip_line = True + continue + elif response[0:1] == b'/' and _re_kb_opt.search(response): + expect_modeline = True + continue + elif expect_modeline and response[0:2] == b'u=': + expect_modeline = False + expect_bytecount = True + continue + elif response[0:2] == b'M ': + continue + elif response[0:8] == b'MT text ': + continue + elif response[0:10] == b'MT newline': + continue + else: + raise CVSProtocolError('Bad CVS protocol response: %s' % response) + co_output.seek(0) + return co_output diff --git a/swh/loader/cvs/loader.py b/swh/loader/cvs/loader.py index d5c7ae2..e7a187f 100644 --- a/swh/loader/cvs/loader.py +++ b/swh/loader/cvs/loader.py @@ -1,372 +1,466 @@ # Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Loader in charge of injecting either new or existing cvs repositories to swh-storage. """ from datetime import datetime import os import subprocess import tempfile import time from typing import Iterator, List, Optional, Sequence, Tuple from urllib3.util import parse_url from swh.loader.core.loader import BaseLoader from swh.loader.core.utils import clean_dangling_folders from swh.loader.exception import NotFound import swh.loader.cvs.rcsparse as rcsparse +import swh.loader.cvs.cvsclient as cvsclient +from swh.loader.cvs.rlog import RlogConv from swh.loader.cvs.cvs2gitdump.cvs2gitdump import CvsConv, RcsKeywords, CHANGESET_FUZZ_SEC, file_path, ChangeSetKey from swh.model import from_disk, hashutil from swh.model.model import Person, Revision, RevisionType, TimestampWithTimezone from swh.model.model import ( Content, Directory, Origin, Revision, SkippedContent, Snapshot, SnapshotBranch, TargetType, ) from swh.storage.interface import StorageInterface DEFAULT_BRANCH = b"HEAD" TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.cvs." class CvsLoader(BaseLoader): """Swh cvs loader. The repository is local. The loader deals with update on an already previously loaded repository. """ visit_type = "cvs" def __init__( self, storage: StorageInterface, url: str, origin_url: Optional[str] = None, visit_date: Optional[datetime] = None, cvsroot_path: Optional[str] = None, temp_directory: str = "/tmp", max_content_size: Optional[int] = None, ): super().__init__( storage=storage, logging_class="swh.loader.cvs.CvsLoader", max_content_size=max_content_size, ) self.cvsroot_url = url # origin url as unique identifier for origin in swh archive self.origin_url = origin_url if origin_url else self.cvsroot_url self.temp_directory = temp_directory self.done = False + self.cvs_module_name = None - self.cvs_module_path = None + + # XXX At present changeset IDs are recomputed on the fly during every visit. + # If we were able to maintain a cached somewhere which can be indexed by a + # cvs2gitdump.ChangeSetKey and yields an SWH revision hash we could avoid + # doing a lot of redundant work during every visit. + self.cvs_changesets = None + + # remote CVS repository access (history is parsed from CVS rlog): + self.cvsclient = None + self.rlog_file = None + # internal state used to store swh objects self._contents: List[Content] = [] self._skipped_contents: List[SkippedContent] = [] self._directories: List[Directory] = [] self._revisions: List[Revision] = [] self.swh_revision_gen = None # internal state, current visit self._last_revision = None self._visit_status = "full" self._load_status = "uneventful" self.visit_date = visit_date self.cvsroot_path = cvsroot_path self.snapshot = None + def compute_swh_revision(self, k, logmsg): + """Compute swh hash data per CVS changeset. + + Returns: + tuple (rev, swh_directory) + - rev: current SWH revision computed from checked out work tree + - swh_directory: dictionary of path, swh hash data with type + + """ + # Compute SWH revision from the on-disk state + swh_dir = from_disk.Directory.from_disk(path=os.fsencode(self.worktree_path)) + if self._last_revision: + parents = tuple([bytes(self._last_revision.id)]) + else: + parents = () + revision = self.build_swh_revision(k, logmsg, swh_dir.hash, parents) + self.log.debug("SWH revision ID: %s" % hashutil.hash_to_hex(revision.id)) + self._last_revision = revision + if self._load_status == "uneventful": + # We have an eventful load if this revision is not already present in the archive + if not self.storage.revision_get([revision.id])[0]: + self._load_status = "eventful" + return (revision, swh_dir) + def swh_hash_data_per_cvs_changeset(self): """Compute swh hash data per CVS changeset. Yields: tuple (rev, swh_directory) - rev: current SWH revision computed from checked out work tree - swh_directory: dictionary of path, swh hash data with type """ - # XXX At present changeset IDs are recomputed on the fly during every visit. - # If we were able to maintain a cached somewhere which can be indexed by a - # cvs2gitdump.ChangeSetKey and yields an SWH revision hash we could avoid - # doing a lot of redundant work during every visit. for k in self.cvs_changesets: tstr = time.strftime('%c', time.gmtime(k.max_time)) self.log.info("changeset from %s by %s on branch %s", tstr, k.author, k.branch); logmsg = "" # Check out the on-disk state of this revision for f in k.revs: rcsfile = None path = file_path(self.cvsroot_path, f.path) wtpath = os.path.join(self.worktree_path, path) self.log.info("rev %s of file %s" % (f.rev, f.path)); if not logmsg: rcsfile = rcsparse.rcsfile(f.path) logmsg = rcsfile.getlog(k.revs[0].rev) if f.state == 'dead': # remove this file from work tree try: os.remove(wtpath) except FileNotFoundError: pass else: # create, or update, this file in the work tree if not rcsfile: rcsfile = rcsparse.rcsfile(f.path) rcs = RcsKeywords() contents = rcs.expand_keyword(f.path, rcsfile, f.rev) try: outfile = open(wtpath, mode='wb') except FileNotFoundError: os.makedirs(os.path.dirname(wtpath)) outfile = open(wtpath, mode='wb') outfile.write(contents) outfile.close() - # Compute SWH revision from the on-disk state - swh_dir = from_disk.Directory.from_disk(path=os.fsencode(self.worktree_path)) - if self._last_revision: - parents = tuple([bytes(self._last_revision.id)]) - else: - parents = () - revision = self.build_swh_revision(k, logmsg, swh_dir.hash, parents) - self.log.debug("SWH revision ID: %s" % hashutil.hash_to_hex(revision.id)) - self._last_revision = revision - if self._load_status == "uneventful": - # We have an eventful load if this revision is not already present in the archive - if not self.storage.revision_get([revision.id])[0]: - self._load_status = "eventful" - + (revision, swh_dir) = self.compute_swh_revision(k, logmsg) yield revision, swh_dir + def swh_hash_data_per_cvs_rlog_changeset(self): + """Compute swh hash data per CVS rlog changeset. + + Yields: + tuple (rev, swh_directory) + - rev: current SWH revision computed from checked out work tree + - swh_directory: dictionary of path, swh hash data with type + + """ + for k in self.cvs_changesets: + tstr = time.strftime('%c', time.gmtime(k.max_time)) + self.log.info("changeset from %s by %s on branch %s", tstr, k.author, k.branch); + logmsg = "" + # Check out the on-disk state of this revision + for f in k.revs: + path = file_path(self.cvsroot_path, f.path) + wtpath = os.path.join(self.worktree_path, path) + self.log.info("rev %s of file %s" % (f.rev, f.path)); + if not logmsg: + logmsg = self.rlog.getlog(self.rlog_file, f.path, k.revs[0].rev) + self.log.debug("f.state is %s\n" % f.state) + if f.state == 'dead': + # remove this file from work tree + try: + os.remove(wtpath) + except FileNotFoundError: + pass + else: + dirname = os.path.dirname(wtpath) + try: + os.makedirs(dirname) + except FileExistsError: + pass + self.log.debug("checkout to %s\n" % wtpath) + fp = self.cvsclient.checkout(f.path, f.rev, dirname) + os.rename(fp.name, wtpath) + try: + fp.close() + except FileNotFoundError: + # Well, we have just renamed the file... + pass + + # TODO: prune empty directories? + (revision, swh_dir) = self.compute_swh_revision(k, logmsg) + yield revision, swh_dir def process_cvs_changesets(self) -> Iterator[ Tuple[List[Content], List[SkippedContent], List[Directory], Revision] ]: """Process CVS revisions. At each CVS revision, check out contents and compute swh hashes. Yields: tuple (contents, skipped-contents, directories, revision) of dict as a dictionary with keys, sha1_git, sha1, etc... """ for swh_revision, swh_dir in self.swh_hash_data_per_cvs_changeset(): # Send the associated contents/directories (_contents, _skipped_contents, _directories) = from_disk.iter_directory(swh_dir) yield _contents, _skipped_contents, _directories, swh_revision + def process_cvs_rlog_changesets(self) -> Iterator[ + Tuple[List[Content], List[SkippedContent], List[Directory], Revision] + ]: + """Process CVS rlog revisions. + + At each CVS revision, check out contents and compute swh hashes. + + Yields: + tuple (contents, skipped-contents, directories, revision) of dict as a + dictionary with keys, sha1_git, sha1, etc... + + """ + for swh_revision, swh_dir in self.swh_hash_data_per_cvs_rlog_changeset(): + # Send the associated contents/directories + (_contents, _skipped_contents, _directories) = from_disk.iter_directory(swh_dir) + yield _contents, _skipped_contents, _directories, swh_revision def prepare_origin_visit(self): self.origin = Origin(url=self.origin_url if self.origin_url else self.cvsroot_url) def pre_cleanup(self): """Cleanup potential dangling files from prior runs (e.g. OOM killed tasks) """ clean_dangling_folders( self.temp_directory, pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, log=self.log, ) def cleanup(self): self.log.info("cleanup") def fetch_cvs_repo_with_rsync(self, host, path): # URL *must* end with a trailing slash in order to get CVSROOT listed url = 'rsync://%s%s/' % (host, os.path.dirname(path)) rsync = subprocess.run(['rsync', url], capture_output=True, encoding='ascii') rsync.check_returncode() have_cvsroot = False have_module = False for line in rsync.stdout.split('\n'): self.log.debug("rsync server: %s" % line) if line.endswith(' CVSROOT'): have_cvsroot = True elif line.endswith(' %s' % self.cvs_module_name): have_module = True if have_module and have_cvsroot: break if not have_module: raise NotFound("CVS module %s not found at %s" \ % (self.cvs_module_name, host, url)) if not have_cvsroot: raise NotFound("No CVSROOT directory found at %s" % url) rsync = subprocess.run(['rsync', '-a', url, self.cvsroot_path]) rsync.check_returncode() def prepare(self): self._last_revision = None self._load_status = "uneventful" self.swh_revision_gen = None if not self.cvsroot_path: self.cvsroot_path = tempfile.mkdtemp( suffix="-%s" % os.getpid(), prefix=TEMPORARY_DIR_PREFIX_PATTERN, dir=self.temp_directory, ) self.worktree_path = tempfile.mkdtemp( suffix="-%s" % os.getpid(), prefix=TEMPORARY_DIR_PREFIX_PATTERN, dir=self.temp_directory, ) url = parse_url(self.origin_url) self.log.debug("prepare; origin_url=%s scheme=%s path=%s" % (self.origin_url, url.scheme, url.path)) if not url.path: raise NotFound("Invalid CVS origin URL '%s'" % self.origin_url) self.cvs_module_name = os.path.basename(url.path) os.mkdir(os.path.join(self.worktree_path, self.cvs_module_name)); - self.cvs_module_path = os.path.join(self.cvsroot_path, self.cvs_module_name) if url.scheme == 'file': if not os.path.exists(url.path): raise NotFound elif url.scheme == 'rsync': - self.fetch_cvs_repo_with_rsync(url.host, url.path) + self.fetch_cvs_repo_with_rsync(url.host, url.path) + + if url.scheme == 'file' or url.scheme == 'rsync': + # local CVS repository conversion + have_rcsfile = False + have_cvsroot = False + for root, dirs, files in os.walk(self.cvsroot_path): + if 'CVSROOT' in dirs: + have_cvsroot = True + dirs.remove('CVSROOT') + continue; + for f in files: + filepath = os.path.join(root, f) + if f[-2:] == ',v': + try: + rcsfile = rcsparse.rcsfile(filepath) + except(Exception): + raise + else: + self.log.debug("Looks like we have data to convert; " + "found a valid RCS file at %s" % filepath) + have_rcsfile = True + break + if have_rcsfile: + break; + + if not have_rcsfile: + raise NotFound("Directory %s does not contain any valid RCS files %s" % self.cvsroot_path) + if not have_cvsroot: + self.log.warn("The CVS repository at '%s' lacks a CVSROOT directory; " + "we might be ingesting an incomplete copy of the repository" % self.cvsroot_path) + + # Unfortunately, there is no way to convert CVS history in an iterative fashion + # because the data is not indexed by any kind of changeset ID. We need to walk + # the history of each and every RCS file in the repository during every visit, + # even if no new changes will be added to the SWH archive afterwards. + # "CVS’s repository is the software equivalent of a telephone book sorted by telephone number." + # https://corecursive.com/software-that-doesnt-suck-with-jim-blandy/ + # + # An implicit assumption made here is that self.cvs_changesets will fit into + # memory in its entirety. If it won't fit then the CVS walker will need to + # be modified such that it spools the list of changesets to disk instead. + cvs = CvsConv(self.cvsroot_path, RcsKeywords(), False, CHANGESET_FUZZ_SEC) + self.log.info("Walking CVS module %s", self.cvs_module_name) + cvs.walk(self.cvs_module_name) + self.cvs_changesets = sorted(cvs.changesets) + self.log.info('CVS changesets found in %s: %d' % (self.cvs_module_name, len(self.cvs_changesets))) + self.swh_revision_gen = self.process_cvs_changesets() + elif url.scheme == 'pserver' or url.scheme == 'fake': + # remote CVS repository conversion + self.cvsclient = cvsclient.CVSClient(url) + cvsroot_path = os.path.dirname(url.path) + self.log.info("Fetching CVS rlog from %s:%s/%s", url.host, cvsroot_path, self.cvs_module_name) + self.rlog = RlogConv(cvsroot_path, CHANGESET_FUZZ_SEC) + self.rlog_file = self.cvsclient.fetch_rlog() + self.rlog.parse_rlog(self.rlog_file) + self.cvs_changesets = sorted(self.rlog.changesets) + self.log.info('CVS changesets found for %s: %d' % (self.cvs_module_name, len(self.cvs_changesets))) + self.swh_revision_gen = self.process_cvs_rlog_changesets() else: raise NotFound("Invalid CVS origin URL '%s'" % self.origin_url) - have_rcsfile = False - have_cvsroot = False - for root, dirs, files in os.walk(self.cvsroot_path): - if 'CVSROOT' in dirs: - have_cvsroot = True - dirs.remove('CVSROOT') - continue; - for f in files: - filepath = os.path.join(root, f) - if f[-2:] == ',v': - try: - rcsfile = rcsparse.rcsfile(filepath) - except(Exception): - raise - else: - self.log.debug("Looks like we have data to convert; " - "found a valid RCS file at %s" % filepath) - have_rcsfile = True - break - if have_rcsfile: - break; - - if not have_rcsfile: - raise NotFound("Directory %s does not contain any valid RCS files %s" % self.cvsroot_path) - if not have_cvsroot: - self.log.warn("The CVS repository at '%s' lacks a CVSROOT directory; " - "we might be ingesting an incomplete copy of the repository" % self.cvsroot_path) - - # Unfortunately, there is no way to convert CVS history in an iterative fashion - # because the data is not indexed by any kind of changeset ID. We need to walk - # the history of each and every RCS file in the repository during every visit, - # even if no new changes will be added to the SWH archive afterwards. - # "CVS’s repository is the software equivalent of a telephone book sorted by telephone number." - # https://corecursive.com/software-that-doesnt-suck-with-jim-blandy/ - # - # An implicit assumption made here is that self.cvs_changesets will fit into - # memory in its entirety. If it won't fit then the CVS walker will need to - # be modified such that it spools the list of changesets to disk instead. - cvs = CvsConv(self.cvsroot_path, RcsKeywords(), False, CHANGESET_FUZZ_SEC) - self.log.info("Walking CVS module %s", self.cvs_module_name) - cvs.walk(self.cvs_module_name) - self.cvs_changesets = sorted(cvs.changesets) - self.log.info('CVS changesets found in %s: %d' % (self.cvs_module_name, len(self.cvs_changesets))) - # SWH revisions are generated and stored iteratively to avoid high memory consumption - self.swh_revision_gen = self.process_cvs_changesets() + def fetch_data(self): """Fetch the next CVS revision.""" try: data = next(self.swh_revision_gen) except StopIteration: return False except Exception as e: self.log.exception(e) return False # Stopping iteration self._contents, self._skipped_contents, self._directories, rev = data self._revisions = [rev] return True def build_swh_revision(self, k: ChangeSetKey, logmsg: bytes, dir_id: bytes, parents: Sequence[bytes] ) -> Revision: """Given a CVS revision, build a swh revision. Args: k: changeset data logmsg: the changeset's log message dir_id: the tree's hash identifier parents: the revision's parents identifier Returns: The swh revision dictionary. """ author = Person.from_fullname(k.author.encode('UTF-8')) date = TimestampWithTimezone.from_datetime(k.max_time) return Revision( type=RevisionType.CVS, date=date, committer_date=date, directory=dir_id, message=logmsg, author=author, committer=author, synthetic=True, extra_headers=[], parents=tuple(parents)) def generate_and_load_snapshot(self, revision) -> Snapshot: """Create the snapshot either from existing revision. Args: revision (dict): Last revision seen if any (None by default) Returns: Optional[Snapshot] The newly created snapshot """ snap = Snapshot( branches={ DEFAULT_BRANCH: SnapshotBranch( target=revision.id, target_type=TargetType.REVISION ) } ) self.log.debug("snapshot: %s" % snap) self.storage.snapshot_add([snap]) return snap def store_data(self): "Add our current CVS changeset to the archive." self.storage.skipped_content_add(self._skipped_contents) self.storage.content_add(self._contents) self.storage.directory_add(self._directories) self.storage.revision_add(self._revisions) self.snapshot = self.generate_and_load_snapshot(self._last_revision) self.log.debug("SWH snapshot ID: %s" % hashutil.hash_to_hex(self.snapshot.id)) self.flush() self.loaded_snapshot_id = self.snapshot.id self._skipped_contents = [] self._contents = [] self._directories = [] self._revisions = [] def load_status(self): return { "status": self._load_status, } def visit_status(self): return self._visit_status diff --git a/swh/loader/cvs/rlog.py b/swh/loader/cvs/rlog.py new file mode 100644 index 0000000..1a046c3 --- /dev/null +++ b/swh/loader/cvs/rlog.py @@ -0,0 +1,391 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +""" RCS/CVS rlog parser, derived from viewvc and cvs2gitdump.py """ + +# Copyright (C) 1999-2021 The ViewCVS Group. All Rights Reserved. +# +# By using ViewVC, you agree to the terms and conditions set forth +# below: +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following +# disclaimer. +# +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS +# BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR +# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE +# OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN +# IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# Copyright (c) 2012 YASUOKA Masahiko +# +# Permission to use, copy, modify, and distribute this software for any +# purpose with or without fee is hereby granted, provided that the above +# copyright notice and this permission notice appear in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +import re +import calendar +import time + +from swh.loader.cvs.cvs2gitdump.cvs2gitdump import CHANGESET_FUZZ_SEC, file_path, ChangeSetKey + +# TODO: actual path encoding should be specified as a parameter +path_encodings = [ 'ascii', 'utf-8' ] + +class RlogConv: + def __init__(self, cvsroot_path, fuzzsec): + self.cvsroot_path = cvsroot_path + self.fuzzsec = fuzzsec + self.changesets = dict() + self.tags = dict() + self.offsets = dict() + + def _process_rlog_entry(self, path, taginfo, revisions, logmsgs): + """ Convert an rlog entry into an item in self.changesets """ + rtags = dict() + branches = {'1': 'HEAD', '1.1.1': 'VENDOR'} + for k, v in list(taginfo.items()): + r = v.split('.') + if len(r) == 3: + branches[v] = 'VENDOR' + elif len(r) >= 3 and r[-2] == '0': + branches['.'.join(r[:-2] + r[-1:])] = k + if len(r) == 2 and branches[r[0]] == 'HEAD': + if v not in rtags: + rtags[v] = list() + rtags[v].append(k) + + revs = revisions.items() + # sort by revision descending to priorize 1.1.1.1 than 1.1 + revs = sorted(revs, key=lambda a: a[1][0], reverse=True) + # sort by time + revs = sorted(revs, key=lambda a: a[1][1]) + novendor = False + have_initial_revision = False + last_vendor_status = None + for k, v in revs: + r = k.split('.') + if len(r) == 4 and r[0] == '1' and r[1] == '1' and r[2] == '1' \ + and r[3] == '1': + if have_initial_revision: + continue + if v[3] == 'dead': + continue + last_vendor_status = v[3] + have_initial_revision = True + elif len(r) == 4 and r[0] == '1' and r[1] == '1' and r[2] == '1': + if novendor: + continue + last_vendor_status = v[3] + elif len(r) == 2: + if r[0] == '1' and r[1] == '1': + if have_initial_revision: + continue + if v[3] == 'dead': + continue + have_initial_revision = True + elif r[0] == '1' and r[1] != '1': + novendor = True + if last_vendor_status == 'dead' and v[3] == 'dead': + last_vendor_status = None + continue + last_vendor_status = None + else: + # trunk only + continue + + b = '.'.join(r[:-1]) + # decode author name in a potentially lossy way; + # it is only used for internal hashing in this case + author = v[2].decode('utf-8', 'ignore') + a = ChangeSetKey( + branches[b], author, v[1], logmsgs[k], v[6], + self.fuzzsec) + + a.put_file(path, k, v[3], 0) + while a in self.changesets: + c = self.changesets[a] + del self.changesets[a] + c.merge(a) + a = c + self.changesets[a] = a + if k in rtags: + for t in rtags[k]: + if t not in self.tags or \ + self.tags[t].max_time < a.max_time: + self.tags[t] = a + + def parse_rlog(self, fp): + eof = None + while eof != _EOF_LOG and eof != _EOF_ERROR: + filename, branch, taginfo, lockinfo, errmsg, eof = _parse_log_header(fp) + revisions = {} + logmsgs = {} + if filename: + for i, e in enumerate(path_encodings): + try: + how = 'ignore' if i == len(path_encodings) - 1 else 'strict' + fname = filename.decode(e, how) + break + except UnicodeError: + pass + while not eof: + off = fp.tell() + rev, logmsg, eof = _parse_log_entry(fp) + if rev: + revisions[rev[0]] = rev + logmsgs[rev[0]] = logmsg + if eof != _EOF_LOG and eof != _EOF_ERROR: + path = file_path(self.cvsroot_path, fname) + if not path in self.offsets.keys(): + self.offsets[path] = dict() + if rev: + self.offsets[path][rev[0]] = off + self._process_rlog_entry(path, taginfo, revisions, logmsgs) + + def getlog(self, fp, path, rev): + off = self.offsets[path][rev] + fp.seek(off) + rev, logmsg, eof = _parse_log_entry(fp) + return logmsg + +# if your rlog doesn't use 77 '=' characters, then this must change +LOG_END_MARKER = b'=' * 77 + b'\n' +ENTRY_END_MARKER = b'-' * 28 + b'\n' + +_EOF_FILE = b'end of file entries' # no more entries for this RCS file +_EOF_LOG = b'end of log' # hit the true EOF on the pipe +_EOF_ERROR = b'error message found' # rlog issued an error + +# rlog error messages look like +# +# rlog: filename/goes/here,v: error message +# rlog: filename/goes/here,v:123: error message +# +# so we should be able to match them with a regex like +# +# ^rlog\: (.*)(?:\:\d+)?\: (.*)$ +# +# But for some reason the windows version of rlog omits the "rlog: " prefix +# for the first error message when the standard error stream has been +# redirected to a file or pipe. (the prefix is present in subsequent errors +# and when rlog is run from the console). So the expression below is more +# complicated +_re_log_error = re.compile(b'^(?:rlog\: )*(.*,v)(?:\:\d+)?\: (.*)$') + +# CVSNT error messages look like: +# cvs rcsfile: `C:/path/to/file,v' does not appear to be a valid rcs file +# cvs [rcsfile aborted]: C:/path/to/file,v: No such file or directory +# cvs [rcsfile aborted]: cannot open C:/path/to/file,v: Permission denied +_re_cvsnt_error = re.compile(b'^(?:cvs rcsfile\: |cvs \[rcsfile aborted\]: )' + b'(?:\`(.*,v)\' |' + b'cannot open (.*,v)\: |(.*,v)\: |)' + b'(.*)$') + + +def _parse_log_header(fp): + """Parse and RCS/CVS log header. + + fp is a file (pipe) opened for reading the log information. + + On entry, fp should point to the start of a log entry. + On exit, fp will have consumed the separator line between the header and + the first revision log. + + If there is no revision information (e.g. the "-h" switch was passed to + rlog), then fp will consumed the file separator line on exit. + + Returns: filename, default branch, tag dictionary, lock dictionary, + rlog error message, and eof flag + """ + + filename = branch = msg = b"" + taginfo = {} # tag name => number + lockinfo = {} # revision => locker + state = 0 # 0 = base, 1 = parsing symbols, 2 = parsing locks + eof = None + + while 1: + line = fp.readline() + if not line: + # the true end-of-file + eof = _EOF_LOG + break + + if state == 1: + if line[0] == b'\t': + [tag, rev] = [x.strip() for x in line.split(b':')] + taginfo[tag] = rev + else: + # oops. this line isn't tag info. stop parsing tags. + state = 0 + + if state == 2: + if line[0] == b'\t': + [locker, rev] = [x.strip() for x in line.split(b':')] + lockinfo[rev] = locker + else: + # oops. this line isn't lock info. stop parsing tags. + state = 0 + + if state == 0: + if line[:9] == b'RCS file:': + filename = line[10:-1] + elif line[:5] == b'head:': + # head = line[6:-1] + pass + elif line[:7] == b'branch:': + branch = line[8:-1] + elif line[:6] == b'locks:': + # start parsing the lock information + state = 2 + elif line[:14] == b'symbolic names': + # start parsing the tag information + state = 1 + elif line == ENTRY_END_MARKER: + # end of the headers + break + elif line == LOG_END_MARKER: + # end of this file's log information + eof = _EOF_FILE + break + else: + error = _re_cvsnt_error.match(line) + if error: + p1, p2, p3, msg = error.groups() + filename = p1 or p2 or p3 + if not filename: + raise vclib.Error("Could not get filename from CVSNT error:\n%s" + % line) + eof = _EOF_ERROR + break + + error = _re_log_error.match(line) + if error: + filename, msg = error.groups() + if msg[:30] == b'warning: Unknown phrases like ': + # don't worry about this warning. it can happen with some RCS + # files that have unknown fields in them (e.g. "permissions 644;" + continue + eof = _EOF_ERROR + break + + return filename, branch, taginfo, lockinfo, msg, eof + + +_re_log_info = re.compile(b'^date:\s+([^;]+);' + b'\s+author:\s+([^;]+);' + b'\s+state:\s+([^;]+);' + b'(\s+lines:\s+([0-9\s+-]+);?)?' + b'(\s+commitid:\s+([a-zA-Z0-9]+))?\n$') + +# TODO: _re_rev should be updated to extract the "locked" flag +_re_rev = re.compile(b'^revision\s+([0-9.]+).*') + +def cvs_strptime(timestr): + try: + return time.strptime(timestr, '%Y/%m/%d %H:%M:%S')[:-1] + (0,) + except ValueError: + return time.strptime(timestr, '%Y-%m-%d %H:%M:%S %z')[:-1] + (0,) + +def _parse_log_entry(fp): + """Parse a single log entry. + + On entry, fp should point to the first line of the entry (the "revision" + line). + On exit, fp will have consumed the log separator line (dashes) or the + end-of-file marker (equals). + + Returns: Revision data tuple, and eof flag (see _EOF_*) + """ + rev = None + line = fp.readline() + if not line: + return None, None, _EOF_LOG + if line == LOG_END_MARKER: + # Needed because some versions of RCS precede LOG_END_MARKER + # with ENTRY_END_MARKER + return None, None, _EOF_FILE + if line[:8] == b'revision': + match = _re_rev.match(line) + if not match: + return None, None, _EOF_LOG + rev = match.group(1) + + line = fp.readline() + if not line: + return None, None, _EOF_LOG + match = _re_log_info.match(line) + + eof = None + log = b'' + while 1: + line = fp.readline() + if not line: + # true end-of-file + eof = _EOF_LOG + break + if line[:9] == b'branches:': + continue + if line == ENTRY_END_MARKER: + break + if line == LOG_END_MARKER: + # end of this file's log information + eof = _EOF_FILE + break + + log = log + line + + if not rev or not match: + # there was a parsing error + return None, None, eof + + # parse out a time tuple for the local time + tm = cvs_strptime(match.group(1).decode('UTF-8')) + + # rlog seems to assume that two-digit years are 1900-based (so, "04" + # comes out as "1904", not "2004"). + EPOCH = 1970 + if tm[0] < EPOCH: + tm = list(tm) + if (tm[0] - 1900) < 70: + tm[0] = tm[0] + 100 + if tm[0] < EPOCH: + raise ValueError('invalid year') + date = calendar.timegm(tm) + + # return a revision tuple compatible with 'rcsparse', the log message, and the EOF marker + return (rev.decode('ascii'), # revision number string + date, + match.group(2), # author (encoding is arbitrary; don't attempt to decode) + match.group(3).decode('ascii'), # state, usually "Exp" or "dead"; non-ASCII data here would be weird + None, # TODO: branches of this rev + None, # TODO: revnumstr of previous rev + None, # TODO: commitid + ), log, eof diff --git a/swh/loader/cvs/tests/data/nano.rlog.tgz b/swh/loader/cvs/tests/data/nano.rlog.tgz new file mode 100644 index 0000000..ffffa87 Binary files /dev/null and b/swh/loader/cvs/tests/data/nano.rlog.tgz differ diff --git a/swh/loader/cvs/tests/data/runbaby.tgz b/swh/loader/cvs/tests/data/runbaby.tgz index c2256f3..354845d 100644 Binary files a/swh/loader/cvs/tests/data/runbaby.tgz and b/swh/loader/cvs/tests/data/runbaby.tgz differ diff --git a/swh/loader/cvs/tests/test_loader.py b/swh/loader/cvs/tests/test_loader.py index c4abe57..d899a9a 100644 --- a/swh/loader/cvs/tests/test_loader.py +++ b/swh/loader/cvs/tests/test_loader.py @@ -1,223 +1,259 @@ # Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest from swh.loader.cvs.loader import CvsLoader from swh.loader.tests import ( assert_last_visit_matches, check_snapshot, get_stats, prepare_repository_from_archive, ) from swh.model.hashutil import hash_to_bytes from swh.model.model import Snapshot, SnapshotBranch, TargetType RUNBABY_SNAPSHOT = Snapshot( id=hash_to_bytes("1cff69ab9bd70822d5e3006092f943ccaafdcf57"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("ef511d258fa55035c2bc2a5b05cad233cee1d328"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_not_found_no_mock(swh_storage, tmp_path): """Given an unknown repository, the loader visit ends up in status not_found""" unknown_repo_url = "unknown-repository" loader = CvsLoader(swh_storage, unknown_repo_url, cvsroot_path=tmp_path) assert loader.load() == {"status": "uneventful"} assert_last_visit_matches( swh_storage, unknown_repo_url, status="not_found", type="cvs", ) def test_loader_cvs_visit(swh_storage, datadir, tmp_path): """Eventful visit should yield 1 snapshot""" archive_name = "runbaby" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = CvsLoader(swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=RUNBABY_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 5, "directory": 2, "origin": 1, "origin_visit": 1, "release": 0, "revision": 1, "skipped_content": 0, "snapshot": 1, } check_snapshot(RUNBABY_SNAPSHOT, loader.storage) def test_loader_cvs_2_visits_no_change(swh_storage, datadir, tmp_path): """Eventful visit followed by uneventful visit should yield the same snapshot """ archive_name = "runbaby" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = CvsLoader(swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)) assert loader.load() == {"status": "eventful"} visit_status1 = assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=RUNBABY_SNAPSHOT.id, ) assert loader.load() == {"status": "uneventful"} visit_status2 = assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=RUNBABY_SNAPSHOT.id, ) assert visit_status1.date < visit_status2.date assert visit_status1.snapshot == visit_status2.snapshot stats = get_stats(loader.storage) assert stats["origin_visit"] == 1 + 1 # computed twice the same snapshot assert stats["snapshot"] == 1 GREEK_SNAPSHOT = Snapshot( id=hash_to_bytes("5e74af67d69dfd7aea0eb118154d062f71f50120"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("e18b92f14cd5b3efb3fcb4ea46cfaf97f25f301b"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_with_file_additions_and_deletions(swh_storage, datadir, tmp_path): """Eventful conversion of history with file additions and deletions""" archive_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += '/greek-tree' # CVS module name loader = CvsLoader(swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 8, "directory": 20, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 7, } check_snapshot(GREEK_SNAPSHOT, loader.storage) GREEK_SNAPSHOT2 = Snapshot( id=hash_to_bytes("048885ae2145ffe81588aea95dcf75c536ecdf26"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("55eb1438c03588607ce4b8db8f45e8e23075951b"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_2_visits_with_change(swh_storage, datadir, tmp_path): """Eventful visit followed by eventful visit should yield two snapshots""" archive_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += '/greek-tree' # CVS module name loader = CvsLoader(swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)) assert loader.load() == {"status": "eventful"} visit_status1 = assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 8, "directory": 20, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 7, } archive_name2 = "greek-repository2" archive_path2 = os.path.join(datadir, f"{archive_name2}.tgz") repo_url = prepare_repository_from_archive(archive_path2, archive_name, tmp_path) repo_url += '/greek-tree' # CVS module name loader = CvsLoader(swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)) assert loader.load() == {"status": "eventful"} visit_status2 = assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT2.id, ) stats = get_stats(loader.storage) assert stats == { "content": 10, "directory": 23, "origin": 1, "origin_visit": 2, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 8, } check_snapshot(GREEK_SNAPSHOT2, loader.storage) assert visit_status1.date < visit_status2.date assert visit_status1.snapshot != visit_status2.snapshot + +def test_loader_cvs_visit_pserver(swh_storage, datadir, tmp_path): + """Eventful visit to CVS pserver should yield 1 snapshot""" + archive_name = "runbaby" + archive_path = os.path.join(datadir, f"{archive_name}.tgz") + repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) + repo_url += '/runbaby' # CVS module name + + # Ask our cvsclient to connect via the 'cvs server' command + repo_url = 'fake://' + repo_url[7:] + + loader = CvsLoader(swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)) + + assert loader.load() == {"status": "eventful"} + + assert_last_visit_matches( + loader.storage, + repo_url, + status="full", + type="cvs", + snapshot=RUNBABY_SNAPSHOT.id, + ) + + stats = get_stats(loader.storage) + assert stats == { + "content": 5, + "directory": 2, + "origin": 1, + "origin_visit": 1, + "release": 0, + "revision": 1, + "skipped_content": 0, + "snapshot": 1, + } + + check_snapshot(RUNBABY_SNAPSHOT, loader.storage)