diff --git a/swh/loader/cvs/cvsclient.py b/swh/loader/cvs/cvsclient.py index 4ee7fe1..d05baf5 100644 --- a/swh/loader/cvs/cvsclient.py +++ b/swh/loader/cvs/cvsclient.py @@ -1,480 +1,480 @@ # Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information """Minimal CVS client implementation """ import os.path import socket import subprocess import tempfile -from typing import Tuple +from typing import IO, Tuple from tenacity import retry from tenacity.retry import retry_if_exception_type from tenacity.stop import stop_after_attempt from swh.loader.exception import NotFound CVS_PSERVER_PORT = 2401 CVS_PROTOCOL_BUFFER_SIZE = 8192 EXAMPLE_PSERVER_URL = "pserver://user:password@cvs.example.com/cvsroot/repository" EXAMPLE_SSH_URL = "ssh://user@cvs.example.com/cvsroot/repository" VALID_RESPONSES = [ "ok", "error", "Valid-requests", "Checked-in", "New-entry", "Checksum", "Copy-file", "Updated", "Created", "Update-existing", "Merged", "Patched", "Rcs-diff", "Mode", "Removed", "Remove-entry", "Template", "Notified", "Module-expansion", "Wrapper-rcsOption", "M", "Mbinary", "E", "F", "MT", ] # Trivially encode strings to protect them from innocent eyes (i.e., # inadvertent password compromises, like a network administrator # who's watching packets for legitimate reasons and accidentally sees # the password protocol go by). # # This is NOT secure encryption. def scramble_password(password): s = ["A"] # scramble scheme version number # fmt: off scramble_shifts = [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, # noqa: E241 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, # noqa: E241,E131,B950 114,120, 53, 79, 96,109, 72,108, 70, 64, 76, 67,116, 74, 68, 87, # noqa: E241,E131,B950 111, 52, 75,119, 49, 34, 82, 81, 95, 65,112, 86,118,110,122,105, # noqa: E241,E131,B950 41, 57, 83, 43, 46,102, 40, 89, 38,103, 45, 50, 42,123, 91, 35, # noqa: E241,E131,B950 125, 55, 54, 66,124,126, 59, 47, 92, 71,115, 78, 88,107,106, 56, # noqa: E241,E131,B950 36,121,117,104,101,100, 69, 73, 99, 63, 94, 93, 39, 37, 61, 48, # noqa: E241,E131,B950 58,113, 32, 90, 44, 98, 60, 51, 33, 97, 62, 77, 84, 80, 85,223, # noqa: E241,E131,B950 225,216,187,166,229,189,222,188,141,249,148,200,184,136,248,190, # noqa: E241,E131,B950 199,170,181,204,138,232,218,183,255,234,220,247,213,203,226,193, # noqa: E241,E131,B950 174,172,228,252,217,201,131,230,197,211,145,238,161,179,160,212, # noqa: E241,E131,B950 207,221,254,173,202,146,224,151,140,196,205,130,135,133,143,246, # noqa: E241,E131,B950 192,159,244,239,185,168,215,144,139,165,180,157,147,186,214,176, # noqa: E241,E131,B950 227,231,219,169,175,156,206,198,129,164,150,210,154,177,134,127, # noqa: E241,E131,B950 182,128,158,208,162,132,167,209,149,241,153,251,237,236,171,195, # noqa: E241,E131,B950 243,233,253,240,194,250,191,155,142,137,245,235,163,242,178,152] # noqa: E241,E131,B950 # fmt: on for c in password: s.append("%c" % scramble_shifts[ord(c)]) return "".join(s) def decode_path(path: bytes) -> Tuple[str, str]: """Attempt to decode a file path based on encodings known to be used in CVS repositories that can be found in the wild. Args: path: raw bytes path Returns: A tuple (decoded path, encoding) """ path_encodings = ["ascii", "iso-8859-1", "utf-8"] for encoding in path_encodings: try: how = "ignore" if encoding == path_encodings[-1] else "strict" path_str = path.decode(encoding, how) break except UnicodeError: pass return path_str, encoding class CVSProtocolError(Exception): pass class CVSClient: # connection to an existing pserver might sometimes fail, # retrying the operation usually fixes the issue @retry( retry=retry_if_exception_type(NotFound), stop=stop_after_attempt(max_attempt_number=3), reraise=True, ) def connect_pserver(self, hostname, port, username, password): if port is None: port = CVS_PSERVER_PORT if username is None: raise NotFound( "Username is required for " "a pserver connection: %s" % EXAMPLE_PSERVER_URL ) try: self.socket = socket.create_connection((hostname, port)) except ConnectionRefusedError: raise NotFound("Could not connect to %s:%s", hostname, port) # use empty password if it is None scrambled_password = scramble_password(password or "") request = "BEGIN AUTH REQUEST\n%s\n%s\n%s\nEND AUTH REQUEST\n" % ( self.cvsroot_path, username, scrambled_password, ) print("Request: %s\n" % request) self.socket.sendall(request.encode("UTF-8")) response = self.conn_read_line() if response != b"I LOVE YOU\n": raise NotFound( "pserver authentication failed for %s:%s: %s" % (hostname, port, response) ) def connect_ssh(self, hostname, port, username): command = ["ssh"] if username is not None: # Assume 'auth' contains only a user name. # We do not support password authentication with SSH since the # anoncvs user is usually granted access without a password. command += ["-l", "%s" % username] if port is not None: command += ["-p", "%d" % port] # accept new SSH hosts keys upon first use; changed host keys # will require intervention command += ["-o", "StrictHostKeyChecking=accept-new"] # disable interactive prompting command += ["-o", "BatchMode=yes"] # disable further option processing by adding '--' command += ["--"] command += ["%s" % hostname, "cvs", "server"] # use non-buffered I/O to match behaviour of self.socket self.ssh = subprocess.Popen( command, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE ) def connect_fake(self): command = ["cvs", "server"] # use non-buffered I/O to match behaviour of self.socket self.ssh = subprocess.Popen( command, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE ) def conn_read_line(self, require_newline=True): if len(self.linebuffer) != 0: return self.linebuffer.pop(0) buf = b"" idx = -1 while idx == -1: if len(buf) >= CVS_PROTOCOL_BUFFER_SIZE: if require_newline: raise CVSProtocolError( "Overlong response from " "CVS server: %s" % buf ) else: break if self.socket: buf += self.socket.recv(CVS_PROTOCOL_BUFFER_SIZE) elif self.ssh: buf += self.ssh.stdout.read(CVS_PROTOCOL_BUFFER_SIZE) else: raise Exception("No valid connection") if not buf: return None idx = buf.rfind(b"\n") if idx != -1: self.linebuffer = buf[: idx + 1].splitlines(keepends=True) else: if require_newline: raise CVSProtocolError("Invalid response from CVS server: %s" % buf) else: self.linebuffer.append(buf) if len(self.incomplete_line) > 0: self.linebuffer[0] = self.incomplete_line + self.linebuffer[0] if idx != -1: self.incomplete_line = buf[idx + 1 :] else: self.incomplete_line = b"" return self.linebuffer.pop(0) def conn_write(self, data): if self.socket: return self.socket.sendall(data) if self.ssh: self.ssh.stdin.write(data) return self.ssh.stdin.flush() raise Exception("No valid connection") def conn_write_str(self, s, encoding="utf-8"): return self.conn_write(s.encode(encoding)) def conn_close(self): if self.socket: self.socket.close() if self.ssh: self.ssh.kill() try: self.ssh.wait(timeout=10) except subprocess.TimeoutExpired as e: raise subprocess.TimeoutExpired( "Could not terminate " "ssh program: %s" % e ) def __init__(self, url): """ Connect to a CVS server at the specified URL and perform the initial CVS protocol handshake. """ self.hostname = url.hostname self.cvsroot_path = os.path.dirname(url.path) self.cvs_module_name = os.path.basename(url.path) self.socket = None self.ssh = None self.linebuffer = list() self.incomplete_line = b"" if url.scheme == "pserver": self.connect_pserver(url.hostname, url.port, url.username, url.password) elif url.scheme == "ssh": self.connect_ssh(url.hostname, url.port, url.username) elif url.scheme == "fake": self.connect_fake() else: raise NotFound("Invalid CVS origin URL '%s'" % url) # we should have a connection now assert self.socket or self.ssh self.conn_write_str( "Root %s\nValid-responses %s\nvalid-requests\n" "UseUnchanged\n" % (self.cvsroot_path, " ".join(VALID_RESPONSES)) ) response = self.conn_read_line() if not response: raise CVSProtocolError("No response from CVS server") try: if response[0:15] != b"Valid-requests ": raise CVSProtocolError( "Invalid response from " "CVS server: %s" % response ) except IndexError: raise CVSProtocolError("Invalid response from CVS server: %s" % response) response = self.conn_read_line() if response != b"ok\n": raise CVSProtocolError("Invalid response from CVS server: %s" % response) def __del__(self): self.conn_close() - def _parse_rlog_response(self, fp): + def _parse_rlog_response(self, fp: IO[bytes]): rlog_output = tempfile.TemporaryFile() expect_error = False - for line in fp.readlines(): + for line in fp: if expect_error: - raise CVSProtocolError("CVS server error: %s" % line) + raise CVSProtocolError("CVS server error: %r" % line) if line == b"ok\n": break elif line[0:2] == b"M ": rlog_output.write(line[2:]) elif line[0:8] == b"MT text ": rlog_output.write(line[8:-1]) elif line[0:8] == b"MT date ": rlog_output.write(line[8:-1]) elif line[0:10] == b"MT newline": rlog_output.write(line[10:]) elif line[0:7] == b"error ": expect_error = True continue else: - raise CVSProtocolError("Bad CVS protocol response: %s" % line) + raise CVSProtocolError("Bad CVS protocol response: %r" % line) rlog_output.seek(0) return rlog_output def fetch_rlog(self, path: bytes = b"", state=""): if path: path_arg, encoding = decode_path(path) else: path_arg, encoding = self.cvs_module_name, "utf-8" if len(state) > 0: state_arg = "Argument -s%s\n" % state else: state_arg = "" fp = tempfile.TemporaryFile() self.conn_write_str( "Global_option -q\n" f"{state_arg}" "Argument --\n" f"Argument {path_arg}\n" "rlog\n", encoding=encoding, ) while True: response = self.conn_read_line() if response is None: raise CVSProtocolError("No response from CVS server") if response[0:2] == b"E ": if len(path) > 0 and ( response.endswith(b" - ignored\n") or b"could not read RCS file" in response ): response = self.conn_read_line() if response not in (b"error \n", b"ok\n"): raise CVSProtocolError( "Invalid response from CVS server: %s" % response ) return None # requested path does not exist (ignore) raise CVSProtocolError("Error response from CVS server: %s" % response) fp.write(response) if response == b"ok\n": break fp.seek(0) return self._parse_rlog_response(fp) def checkout(self, path: bytes, rev: str, dest_dir: bytes, expand_keywords: bool): """ Download a file revision from the cvs server and store the file's contents in a temporary file. If expand_keywords is set then ask the server to expand RCS keywords in file content. From the server's point of view this function behaves much like 'cvs update -r rev path'. The server is unaware that we do not actually maintain a CVS working copy. Because of this it sends more information than we need. We simply skip responses that are of no interest to us. """ skip_line = False expect_modeline = False expect_bytecount = False have_bytecount = False bytecount = 0 path_str, encoding = decode_path(path) dirname = os.path.dirname(path_str) if dirname: self.conn_write_str( "Directory %s\n%s\n" % (dirname, os.path.join(self.cvsroot_path, dirname)) ) filename = os.path.basename(path_str) co_output = tempfile.NamedTemporaryFile( dir=os.fsdecode(dest_dir), delete=True, prefix="cvsclient-checkout-%s-r%s-" % (filename, rev), ) if expand_keywords: # use server-side per-file default expansion rules karg = "" else: # force binary file mode karg = "Argument -kb\n" # TODO: cvs <= 1.10 servers expect to be given every Directory along the path. self.conn_write_str( "Directory %s\n%s\n" "Global_option -q\n" "Argument -r%s\n" "%s" "Argument --\nArgument %s\nco \n" % ( self.cvs_module_name, os.path.join(self.cvsroot_path, self.cvs_module_name), rev, karg, path_str, ), encoding=encoding, ) while True: if have_bytecount: if bytecount < 0: raise CVSProtocolError("server sent too much file content data") response = self.conn_read_line(require_newline=False) if response is None: raise CVSProtocolError("Incomplete response from CVS server") if len(response) > bytecount: # When a file lacks a final newline we receive a line which # contains file content as well as CVS protocol response data. # Split last line of file content from CVS protocol data... co_output.write(response[:bytecount]) response = response[bytecount:] bytecount = 0 # ...and process the CVS protocol response below. else: co_output.write(response) bytecount -= len(response) continue else: response = self.conn_read_line() if response[0:2] == b"E ": raise CVSProtocolError("Error from CVS server: %s" % response) if response == b"ok\n": if have_bytecount: break else: raise CVSProtocolError("server sent 'ok' but no file contents") if skip_line: skip_line = False continue elif expect_bytecount: try: bytecount = int(response[0:-1]) # strip trailing \n except ValueError: raise CVSProtocolError("Bad CVS protocol response: %s" % response) have_bytecount = True continue elif response in (b"M \n", b"MT +updated\n", b"MT -updated\n"): continue elif response[0:9] == b"MT fname ": continue elif response.split(b" ")[0] in ( b"Created", b"Checked-in", b"Update-existing", b"Updated", b"Removed", ): skip_line = True continue elif response[0:1] == b"/": expect_modeline = True continue elif expect_modeline and response[0:2] == b"u=": expect_modeline = False expect_bytecount = True continue elif response[0:2] == b"M ": continue elif response[0:8] == b"MT text ": continue elif response[0:10] == b"MT newline": continue else: raise CVSProtocolError("Bad CVS protocol response: %s" % response) co_output.seek(0) return co_output diff --git a/swh/loader/cvs/loader.py b/swh/loader/cvs/loader.py index f5ce5e3..abaa86e 100644 --- a/swh/loader/cvs/loader.py +++ b/swh/loader/cvs/loader.py @@ -1,669 +1,680 @@ # Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information """Loader in charge of injecting either new or existing cvs repositories to swh-storage. """ from datetime import datetime import os import os.path import subprocess import tempfile import time -from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Sequence, Tuple, cast +from typing import ( + Any, + BinaryIO, + Dict, + Iterator, + List, + Optional, + Sequence, + TextIO, + Tuple, + cast, +) from urllib.parse import urlparse import sentry_sdk from tenacity import retry from tenacity.retry import retry_if_exception_type from tenacity.stop import stop_after_attempt from swh.loader.core.loader import BaseLoader from swh.loader.core.utils import clean_dangling_folders from swh.loader.cvs.cvs2gitdump.cvs2gitdump import ( CHANGESET_FUZZ_SEC, ChangeSetKey, CvsConv, FileRevision, RcsKeywords, file_path, ) from swh.loader.cvs.cvsclient import CVSClient, CVSProtocolError, decode_path import swh.loader.cvs.rcsparse as rcsparse from swh.loader.cvs.rlog import RlogConv from swh.loader.exception import NotFound from swh.model import from_disk, hashutil from swh.model.model import ( Content, Directory, Person, Revision, RevisionType, Sha1Git, SkippedContent, Snapshot, SnapshotBranch, TargetType, TimestampWithTimezone, ) from swh.storage.algos.snapshot import snapshot_get_latest from swh.storage.interface import StorageInterface DEFAULT_BRANCH = b"HEAD" TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.cvs." def rsync_retry(): return retry( retry=retry_if_exception_type(subprocess.CalledProcessError), stop=stop_after_attempt(max_attempt_number=4), reraise=True, ) class BadPathException(Exception): pass class CvsLoader(BaseLoader): """Swh cvs loader. The repository is local. The loader deals with update on an already previously loaded repository. """ visit_type = "cvs" cvs_module_name: str cvsclient: CVSClient # remote CVS repository access (history is parsed from CVS rlog): rlog_file: BinaryIO swh_revision_gen: Iterator[ Tuple[List[Content], List[SkippedContent], List[Directory], Revision] ] def __init__( self, storage: StorageInterface, url: str, origin_url: Optional[str] = None, visit_date: Optional[datetime] = None, cvsroot_path: Optional[str] = None, temp_directory: str = "/tmp", **kwargs: Any, ): self.cvsroot_url = url.rstrip("/") # origin url as unique identifier for origin in swh archive origin_url = origin_url.rstrip("/") if origin_url else self.cvsroot_url super().__init__(storage=storage, origin_url=origin_url, **kwargs) self.temp_directory = temp_directory # internal state used to store swh objects self._contents: List[Content] = [] self._skipped_contents: List[SkippedContent] = [] self._directories: List[Directory] = [] self._revisions: List[Revision] = [] # internal state, current visit self._last_revision: Optional[Revision] = None self._visit_status = "full" self.visit_date = visit_date or self.visit_date self.cvsroot_path = cvsroot_path - self.custom_id_keyword = None + self.custom_id_keyword: Optional[str] = None self.excluded_keywords: List[str] = [] self.snapshot: Optional[Snapshot] = None self.last_snapshot: Optional[Snapshot] = snapshot_get_latest( self.storage, self.origin.url ) def compute_swh_revision( self, k: ChangeSetKey, logmsg: Optional[bytes] ) -> Tuple[Revision, from_disk.Directory]: """Compute swh hash data per CVS changeset. Returns: tuple (rev, swh_directory) - rev: current SWH revision computed from checked out work tree - swh_directory: dictionary of path, swh hash data with type """ # Compute SWH revision from the on-disk state swh_dir = from_disk.Directory.from_disk(path=os.fsencode(self.worktree_path)) parents: Tuple[Sha1Git, ...] if self._last_revision: parents = (self._last_revision.id,) else: parents = () revision = self.build_swh_revision(k, logmsg, swh_dir.hash, parents) self.log.debug("SWH revision ID: %s", hashutil.hash_to_hex(revision.id)) self._last_revision = revision return (revision, swh_dir) def file_path_is_safe(self, wtpath: bytes): tempdir = os.fsencode(self.tempdir_path) if os.fsencode("%s..%s" % (os.path.sep, os.path.sep)) in wtpath: # Paths with back-references should not appear # in CVS protocol messages or CVS rlog output return False elif os.path.commonpath([tempdir, os.path.normpath(wtpath)]) != tempdir: # The path must be a child of our temporary directory. return False else: return True def checkout_file_with_rcsparse( self, k: ChangeSetKey, f: FileRevision, rcsfile: rcsparse.rcsfile ) -> None: assert self.cvsroot_path assert self.server_style_cvsroot path = file_path(os.fsencode(self.cvsroot_path), f.path) wtpath = os.path.join(os.fsencode(self.tempdir_path), path) if not self.file_path_is_safe(wtpath): raise BadPathException(f"unsafe path found in RCS file: {f.path!r}") self.log.debug("rev %s state %s file %s", f.rev, f.state, f.path) if f.state == "dead": # remove this file from work tree try: os.remove(wtpath) except FileNotFoundError: pass else: # create, or update, this file in the work tree if not rcsfile: rcsfile = rcsparse.rcsfile(f.path) rcs = RcsKeywords() # We try our best to generate the same commit hashes over both pserver # and rsync. To avoid differences in file content due to expansion of # RCS keywords which contain absolute file paths (such as "Header"), # attempt to expand such paths in the same way as a regular CVS server # would expand them. # Whether this will avoid content differences depends on pserver and # rsync servers exposing the same server-side path to the CVS repository. # However, this is the best we can do, and only matters if an origin can # be fetched over both pserver and rsync. Each will still be treated as # a distinct origin, but will hopefully point at the same SWH snapshot. # In any case, an absolute path based on the origin URL looks nicer than # an absolute path based on a temporary directory used by the CVS loader. path_str, encoding = decode_path(f.path) server_style_path = path_str.replace( self.cvsroot_path, self.server_style_cvsroot ) if server_style_path[0] != "/": server_style_path = "/" + server_style_path if self.custom_id_keyword is not None: rcs.add_id_keyword(self.custom_id_keyword) contents = rcs.expand_keyword( server_style_path, rcsfile, f.rev, self.excluded_keywords, filename_encoding=encoding, ) os.makedirs(os.path.dirname(wtpath), exist_ok=True) outfile = open(wtpath, mode="wb") outfile.write(contents) outfile.close() def checkout_file_with_cvsclient( self, k: ChangeSetKey, f: FileRevision, cvsclient: CVSClient ): assert self.cvsroot_path path = file_path(os.fsencode(self.cvsroot_path), f.path) wtpath = os.path.join(os.fsencode(self.tempdir_path), path) if not self.file_path_is_safe(wtpath): raise BadPathException(f"unsafe path found in cvs rlog output: {f.path!r}") self.log.debug("rev %s state %s file %s", f.rev, f.state, f.path) if f.state == "dead": # remove this file from work tree try: os.remove(wtpath) except FileNotFoundError: pass else: dirname = os.path.dirname(wtpath) os.makedirs(dirname, exist_ok=True) self.log.debug("checkout to %s\n", wtpath) fp = cvsclient.checkout(path, f.rev, dirname, expand_keywords=True) os.rename(fp.name, wtpath) try: fp.close() except FileNotFoundError: # Well, we have just renamed the file... pass def process_cvs_changesets( self, cvs_changesets: List[ChangeSetKey], use_rcsparse: bool, ) -> Iterator[ Tuple[List[Content], List[SkippedContent], List[Directory], Revision] ]: """Process CVS revisions. At each CVS revision, check out contents and compute swh hashes. Yields: tuple (contents, skipped-contents, directories, revision) of dict as a dictionary with keys, sha1_git, sha1, etc... """ for k in cvs_changesets: tstr = time.strftime("%c", time.gmtime(k.max_time)) self.log.debug( "changeset from %s by %s on branch %s", tstr, k.author, k.branch ) logmsg: Optional[bytes] = b"" # Check out all files of this revision and get a log message. # # The log message is obtained from the first file in the changeset. # The message will usually be the same for all affected files, and # the SWH archive will only store one version of the log message. for f in k.revs: rcsfile = None if use_rcsparse: if rcsfile is None: rcsfile = rcsparse.rcsfile(f.path) if not logmsg: logmsg = rcsfile.getlog(k.revs[0].rev) self.checkout_file_with_rcsparse(k, f, rcsfile) else: if not logmsg: logmsg = self.rlog.getlog(self.rlog_file, f.path, k.revs[0].rev) self.checkout_file_with_cvsclient(k, f, self.cvsclient) # TODO: prune empty directories? (revision, swh_dir) = self.compute_swh_revision(k, logmsg) (contents, skipped_contents, directories) = from_disk.iter_directory( swh_dir ) yield contents, skipped_contents, directories, revision def pre_cleanup(self) -> None: """Cleanup potential dangling files from prior runs (e.g. OOM killed tasks) """ clean_dangling_folders( self.temp_directory, pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, log=self.log, ) def cleanup(self) -> None: self.log.debug("cleanup") - def configure_custom_id_keyword(self, cvsconfig): + def configure_custom_id_keyword(self, cvsconfig: TextIO): """Parse CVSROOT/config and look for a custom keyword definition. There are two different configuration directives in use for this purpose. The first variant stems from a patch which was never accepted into upstream CVS and uses the tag directive: tag=MyName With this, the "MyName" keyword becomes an alias for the "Id" keyword. This variant is prelevant in CVS versions shipped on BSD. The second variant stems from upstream CVS 1.12 and looks like: LocalKeyword=MyName=SomeKeyword KeywordExpand=iMyName We only support "SomeKeyword" if it specifies "Id" or "CVSHeader", for now. The KeywordExpand directive can be used to suppress expansion of keywords by listing keywords after an initial "e" character ("exclude", as opposed to an "include" list which uses an initial "i" character). For example, this disables expansion of the Date and Name keywords: KeywordExpand=eDate,Name """ - for line in cvsconfig.readlines(): + for line in cvsconfig: line = line.strip() try: (config_key, value) = line.split("=", 1) except ValueError: continue config_key = config_key.strip() value = value.strip() if config_key == "tag": self.custom_id_keyword = value elif config_key == "LocalKeyword": try: (custom_kwname, kwname) = value.split("=", 1) except ValueError: continue if kwname.strip() in ("Id", "CVSHeader"): self.custom_id_keyword = custom_kwname.strip() elif config_key == "KeywordExpand" and value.startswith("e"): excluded_keywords = value[1:].split(",") for k in excluded_keywords: self.excluded_keywords.append(k.strip()) @rsync_retry() def execute_rsync( self, rsync_cmd: List[str], **run_opts ) -> subprocess.CompletedProcess: rsync = subprocess.run(rsync_cmd, **run_opts) rsync.check_returncode() return rsync def fetch_cvs_repo_with_rsync(self, host: str, path: str) -> None: # URL *must* end with a trailing slash in order to get CVSROOT listed url = "rsync://%s%s/" % (host, os.path.dirname(path)) rsync = self.execute_rsync( ["rsync", url], capture_output=True, encoding="utf-8" ) have_cvsroot = False have_module = False for line in rsync.stdout.split("\n"): self.log.debug("rsync server: %s", line) if line.endswith(" CVSROOT"): have_cvsroot = True elif line.endswith(" %s" % self.cvs_module_name): have_module = True if have_module and have_cvsroot: break if not have_module: raise NotFound(f"CVS module {self.cvs_module_name} not found at {url}") if not have_cvsroot: raise NotFound(f"No CVSROOT directory found at {url}") # Fetch the CVSROOT directory and the desired CVS module. assert self.cvsroot_path for d in ("CVSROOT", self.cvs_module_name): target_dir = os.path.join(self.cvsroot_path, d) os.makedirs(target_dir, exist_ok=True) # Append trailing path separators ("/" in the URL and os.path.sep in the # local target directory path) to ensure that rsync will place files # directly within our target directory . self.execute_rsync( ["rsync", "-az", url + d + "/", target_dir + os.path.sep] ) def prepare(self) -> None: self._last_revision = None self.tempdir_path = tempfile.mkdtemp( suffix="-%s" % os.getpid(), prefix=TEMPORARY_DIR_PREFIX_PATTERN, dir=self.temp_directory, ) url = urlparse(self.origin.url) self.log.debug( "prepare; origin_url=%s scheme=%s path=%s", self.origin.url, url.scheme, url.path, ) if not url.path: raise NotFound(f"Invalid CVS origin URL '{self.origin.url}'") self.cvs_module_name = os.path.basename(url.path) self.server_style_cvsroot = os.path.dirname(url.path) self.worktree_path = os.path.join(self.tempdir_path, self.cvs_module_name) if url.scheme == "file" or url.scheme == "rsync": # local CVS repository conversion if not self.cvsroot_path: self.cvsroot_path = tempfile.mkdtemp( suffix="-%s" % os.getpid(), prefix=TEMPORARY_DIR_PREFIX_PATTERN, dir=self.temp_directory, ) if url.scheme == "file": if not os.path.exists(url.path): raise NotFound elif url.scheme == "rsync": assert url.hostname is not None self.fetch_cvs_repo_with_rsync(url.hostname, url.path) have_rcsfile = False have_cvsroot = False for root, dirs, files in os.walk(os.fsencode(self.cvsroot_path)): if b"CVSROOT" in dirs: have_cvsroot = True dirs.remove(b"CVSROOT") continue for f in files: filepath = os.path.join(root, f) if f[-2:] == b",v": rcsfile = rcsparse.rcsfile(filepath) # noqa: F841 self.log.debug( "Looks like we have data to convert; " "found a valid RCS file at %s", filepath, ) have_rcsfile = True break if have_rcsfile: break if not have_rcsfile: raise NotFound( f"Directory {self.cvsroot_path} does not contain any valid " "RCS files", ) if not have_cvsroot: self.log.warn( "The CVS repository at '%s' lacks a CVSROOT directory; " "we might be ingesting an incomplete copy of the repository", self.cvsroot_path, ) # The file CVSROOT/config will usually contain ASCII data only. # We allow UTF-8 just in case. Other encodings may result in an # error and will require manual intervention, for now. cvsconfig_path = os.path.join(self.cvsroot_path, "CVSROOT", "config") cvsconfig = open(cvsconfig_path, mode="r", encoding="utf-8") self.configure_custom_id_keyword(cvsconfig) cvsconfig.close() # Unfortunately, there is no way to convert CVS history in an # iterative fashion because the data is not indexed by any kind # of changeset ID. We need to walk the history of each and every # RCS file in the repository during every visit, even if no new # changes will be added to the SWH archive afterwards. # "CVS’s repository is the software equivalent of a telephone book # sorted by telephone number." # https://corecursive.com/software-that-doesnt-suck-with-jim-blandy/ # # An implicit assumption made here is that self.cvs_changesets will # fit into memory in its entirety. If it won't fit then the CVS walker # will need to be modified such that it spools the list of changesets # to disk instead. cvs = CvsConv(self.cvsroot_path, RcsKeywords(), False, CHANGESET_FUZZ_SEC) self.log.debug("Walking CVS module %s", self.cvs_module_name) cvs.walk(self.cvs_module_name) cvs_changesets = sorted(cvs.changesets) self.log.debug( "CVS changesets found in %s: %d", self.cvs_module_name, len(cvs_changesets), ) self.swh_revision_gen = self.process_cvs_changesets( cvs_changesets, use_rcsparse=True ) elif url.scheme == "pserver" or url.scheme == "fake" or url.scheme == "ssh": # remote CVS repository conversion if not self.cvsroot_path: self.cvsroot_path = os.path.dirname(url.path) self.cvsclient = CVSClient(url) cvsroot_path = os.path.dirname(url.path) self.log.debug( "Fetching CVS rlog from %s:%s/%s", url.hostname, cvsroot_path, self.cvs_module_name, ) try: main_rlog_file = self.cvsclient.fetch_rlog() except CVSProtocolError as cvs_err: if "cannot find module" in str(cvs_err): raise NotFound( f"CVS module named {self.cvs_module_name} cannot be found" ) else: raise self.rlog = RlogConv(cvsroot_path, CHANGESET_FUZZ_SEC) self.rlog.parse_rlog(main_rlog_file) # Find file deletion events only visible in Attic directories. main_changesets = self.rlog.changesets attic_paths = [] attic_rlog_files = [] assert self.cvsroot_path cvsroot_path_bytes = os.fsencode(self.cvsroot_path) for k in main_changesets: for changed_file in k.revs: path = file_path(cvsroot_path_bytes, changed_file.path) if path.startswith(cvsroot_path_bytes): path = path[ len(os.path.commonpath([cvsroot_path_bytes, path])) + 1 : ] parent_path = os.path.dirname(path) if parent_path.split(b"/")[-1] == b"Attic": continue attic_path = parent_path + b"/Attic" if attic_path in attic_paths: continue attic_paths.append(attic_path) # avoid multiple visits # Try to fetch more rlog data from this Attic directory. attic_rlog_file = self.cvsclient.fetch_rlog( path=attic_path, state="dead", ) if attic_rlog_file: attic_rlog_files.append(attic_rlog_file) if len(attic_rlog_files) == 0: self.rlog_file = main_rlog_file else: # Combine all the rlog pieces we found and re-parse. fp = tempfile.TemporaryFile() for attic_rlog_file in attic_rlog_files: - for line in attic_rlog_file.readlines(): + for line in attic_rlog_file: fp.write(line) - attic_rlog_file.close() + attic_rlog_file.close() main_rlog_file.seek(0) - for line in main_rlog_file.readlines(): + for line in main_rlog_file: fp.write(line) main_rlog_file.close() fp.seek(0) self.rlog.parse_rlog(cast(BinaryIO, fp)) self.rlog_file = cast(BinaryIO, fp) cvs_changesets = sorted(self.rlog.changesets) self.log.debug( "CVS changesets found for %s: %d", self.cvs_module_name, len(cvs_changesets), ) self.swh_revision_gen = self.process_cvs_changesets( cvs_changesets, use_rcsparse=False ) else: raise NotFound(f"Invalid CVS origin URL '{self.origin.url}'") def fetch_data(self) -> bool: """Fetch the next CVS revision.""" try: data = next(self.swh_revision_gen) except StopIteration: assert self._last_revision is not None self.snapshot = self.generate_and_load_snapshot(self._last_revision) self.log.debug( "SWH snapshot ID: %s", hashutil.hash_to_hex(self.snapshot.id) ) self.flush() self.loaded_snapshot_id = self.snapshot.id return False except Exception: self.log.exception("Exception in fetch_data:") sentry_sdk.capture_exception() self._visit_status = "failed" return False # Stopping iteration self._contents, self._skipped_contents, self._directories, rev = data self._revisions = [rev] return True def build_swh_revision( self, k: ChangeSetKey, logmsg: Optional[bytes], dir_id: bytes, parents: Sequence[bytes], ) -> Revision: """Given a CVS revision, build a swh revision. Args: k: changeset data logmsg: the changeset's log message dir_id: the tree's hash identifier parents: the revision's parents identifier Returns: The swh revision dictionary. """ author = Person.from_fullname(k.author.encode("UTF-8")) date = TimestampWithTimezone.from_dict(k.max_time) return Revision( type=RevisionType.CVS, date=date, committer_date=date, directory=dir_id, message=logmsg, author=author, committer=author, synthetic=True, extra_headers=[], parents=tuple(parents), ) def generate_and_load_snapshot(self, revision: Revision) -> Snapshot: """Create the snapshot either from existing revision. Args: revision (dict): Last revision seen if any (None by default) Returns: Optional[Snapshot] The newly created snapshot """ snap = Snapshot( branches={ DEFAULT_BRANCH: SnapshotBranch( target=revision.id, target_type=TargetType.REVISION ) } ) self.log.debug("snapshot: %s", snap) self.storage.snapshot_add([snap]) return snap def store_data(self) -> None: "Add our current CVS changeset to the archive." self.storage.skipped_content_add(self._skipped_contents) self.storage.content_add(self._contents) self.storage.directory_add(self._directories) self.storage.revision_add(self._revisions) self.flush() self._skipped_contents = [] self._contents = [] self._directories = [] self._revisions = [] def load_status(self) -> Dict[str, Any]: if self.snapshot is None: load_status = "failed" elif self.last_snapshot == self.snapshot: load_status = "uneventful" else: load_status = "eventful" return { "status": load_status, } def visit_status(self) -> str: return self._visit_status diff --git a/swh/loader/cvs/tests/test_loader.py b/swh/loader/cvs/tests/test_loader.py index 4c41fdf..c7274f7 100644 --- a/swh/loader/cvs/tests/test_loader.py +++ b/swh/loader/cvs/tests/test_loader.py @@ -1,1360 +1,1360 @@ # Copyright (C) 2016-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import os import subprocess import tempfile from typing import Any, Dict from urllib.parse import urlparse import pytest from swh.loader.cvs.cvsclient import CVSClient from swh.loader.cvs.loader import BadPathException, CvsLoader from swh.loader.tests import ( assert_last_visit_matches, check_snapshot, get_stats, prepare_repository_from_archive, ) from swh.model.hashutil import hash_to_bytes from swh.model.model import Snapshot, SnapshotBranch, TargetType RUNBABY_SNAPSHOT = Snapshot( id=hash_to_bytes("e64667c400049f560a3856580e0d9e511ffa66c9"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("0f6db8ce49472d7829ddd6141f71c68c0d563f0e"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_not_found_no_mock(swh_storage, tmp_path): """Given an unknown repository, the loader visit ends up in status not_found""" unknown_repo_url = "unknown-repository" loader = CvsLoader(swh_storage, unknown_repo_url, cvsroot_path=tmp_path) assert loader.load() == {"status": "uneventful"} assert_last_visit_matches( swh_storage, unknown_repo_url, status="not_found", type="cvs", ) def test_loader_cvs_ssh_module_not_found(swh_storage, tmp_path, mocker): url = "ssh://anoncvs@anoncvs.example.org/cvsroot/foo" mocker.patch("swh.loader.cvs.cvsclient.socket") mocker.patch("swh.loader.cvs.cvsclient.subprocess") from swh.loader.cvs.loader import CVSClient as Client conn_read_line = mocker.patch.object(Client, "conn_read_line") conn_read_line.side_effect = [ # server response lines when client is initialized b"Valid-requests ", b"ok\n", # server response line when CVS module is missing "E cvs rlog: cannot find module `foo' - ignored\n".encode(), ] loader = CvsLoader(swh_storage, url, cvsroot_path=tmp_path) assert loader.load() == {"status": "uneventful"} assert_last_visit_matches( swh_storage, url, status="not_found", type="cvs", ) def test_loader_cvs_visit(swh_storage, datadir, tmp_path): """Eventful visit should yield 1 snapshot""" archive_name = "runbaby" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=RUNBABY_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 5, "directory": 1, "origin": 1, "origin_visit": 1, "release": 0, "revision": 1, "skipped_content": 0, "snapshot": 1, } check_snapshot(RUNBABY_SNAPSHOT, loader.storage) def test_loader_cvs_2_visits_no_change(swh_storage, datadir, tmp_path): """Eventful visit followed by uneventful visit should yield the same snapshot""" archive_name = "runbaby" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} visit_status1 = assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=RUNBABY_SNAPSHOT.id, ) loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "uneventful"} visit_status2 = assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=RUNBABY_SNAPSHOT.id, ) assert visit_status1.date < visit_status2.date assert visit_status1.snapshot == visit_status2.snapshot stats = get_stats(loader.storage) assert stats["origin_visit"] == 1 + 1 # computed twice the same snapshot assert stats["snapshot"] == 1 GREEK_SNAPSHOT = Snapshot( id=hash_to_bytes("c76f8b58a6dfbe6fccb9a85b695f914aa5c4a95a"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("e138207ddd5e1965b5ab9a522bfc2e0ecd233b67"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_with_file_additions_and_deletions(swh_storage, datadir, tmp_path): """Eventful conversion of history with file additions and deletions""" archive_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 8, "directory": 13, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT, loader.storage) def test_loader_cvs_pserver_with_file_additions_and_deletions( swh_storage, datadir, tmp_path ): """Eventful CVS pserver conversion with file additions and deletions""" archive_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 8, "directory": 13, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT, loader.storage) GREEK_SNAPSHOT2 = Snapshot( id=hash_to_bytes("e3d2e8860286000f546c01aa2a3e1630170eb3b6"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("f1ff9a3c7624b1be5e5d51f9ec0abf7dcddbf0b2"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_2_visits_with_change(swh_storage, datadir, tmp_path): """Eventful visit followed by eventful visit should yield two snapshots""" archive_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} visit_status1 = assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 8, "directory": 13, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 1, } archive_name2 = "greek-repository2" archive_path2 = os.path.join(datadir, f"{archive_name2}.tgz") repo_url = prepare_repository_from_archive(archive_path2, archive_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} visit_status2 = assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT2.id, ) stats = get_stats(loader.storage) assert stats == { "content": 10, "directory": 15, "origin": 1, "origin_visit": 2, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 2, } check_snapshot(GREEK_SNAPSHOT2, loader.storage) assert visit_status1.date < visit_status2.date assert visit_status1.snapshot != visit_status2.snapshot def test_loader_cvs_visit_pserver(swh_storage, datadir, tmp_path): """Eventful visit to CVS pserver should yield 1 snapshot""" archive_name = "runbaby" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/runbaby" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=RUNBABY_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 5, "directory": 1, "origin": 1, "origin_visit": 1, "release": 0, "revision": 1, "skipped_content": 0, "snapshot": 1, } check_snapshot(RUNBABY_SNAPSHOT, loader.storage) GREEK_SNAPSHOT3 = Snapshot( id=hash_to_bytes("6e9910ed072662cb482d9017cbf5e1973e6dc09f"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("d9f4837dc55a87d83730c6e277c88b67dae80272"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_visit_pserver_no_eol(swh_storage, datadir, tmp_path): """Visit to CVS pserver with file that lacks trailing eol""" archive_name = "greek-repository3" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT3.id, ) stats = get_stats(loader.storage) assert stats == { "content": 9, "directory": 15, "origin": 1, "origin_visit": 1, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT3, loader.storage) GREEK_SNAPSHOT4 = Snapshot( id=hash_to_bytes("a8593e9233601b31e012d36975f817d2c993d04b"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("51bb99655225c810ee259087fcae505899725360"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_visit_expand_id_keyword(swh_storage, datadir, tmp_path): """Visit to CVS repository with file with an RCS Id keyword""" archive_name = "greek-repository4" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT4.id, ) stats = get_stats(loader.storage) assert stats == { "content": 12, "directory": 20, "origin": 1, "origin_visit": 1, "release": 0, "revision": 11, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT4, loader.storage) def test_loader_cvs_visit_pserver_expand_id_keyword(swh_storage, datadir, tmp_path): """Visit to CVS pserver with file with an RCS Id keyword""" archive_name = "greek-repository4" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT4.id, ) stats = get_stats(loader.storage) assert stats == { "content": 12, "directory": 20, "origin": 1, "origin_visit": 1, "release": 0, "revision": 11, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT4, loader.storage) GREEK_SNAPSHOT5 = Snapshot( id=hash_to_bytes("6484ec9bfff677731cbb6d2bd5058dabfae952ed"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("514b3bef07d56e393588ceda18cc1dfa2dc4e04a"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_with_file_deleted_and_readded(swh_storage, datadir, tmp_path): """Eventful conversion of history with file deletion and re-addition""" archive_name = "greek-repository5" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT5.id, ) stats = get_stats(loader.storage) assert stats == { "content": 9, "directory": 14, "origin": 1, "origin_visit": 1, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT5, loader.storage) def test_loader_cvs_pserver_with_file_deleted_and_readded( swh_storage, datadir, tmp_path ): """Eventful pserver conversion with file deletion and re-addition""" archive_name = "greek-repository5" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT5.id, ) stats = get_stats(loader.storage) assert stats == { "content": 9, "directory": 14, "origin": 1, "origin_visit": 1, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT5, loader.storage) DINO_SNAPSHOT = Snapshot( id=hash_to_bytes("6cf774cec1030ff3e9a301681303adb537855d09"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("b7d3ea1fa878d51323b5200ad2c6ee9d5b656f10"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_readded_file_in_attic(swh_storage, datadir, tmp_path): """Conversion of history with RCS files in the Attic""" # This repository has some file revisions marked "dead" in the Attic only. # This is different to the re-added file tests above, where the RCS file # was moved out of the Attic again as soon as the corresponding deleted # file was re-added. Failure to detect the "dead" file revisions in the # Attic would result in errors in our converted history. archive_name = "dino-readded-file" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/src" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=DINO_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 38, "directory": 70, "origin": 1, "origin_visit": 1, "release": 0, "revision": 35, "skipped_content": 0, "snapshot": 1, } check_snapshot(DINO_SNAPSHOT, loader.storage) def test_loader_cvs_pserver_readded_file_in_attic(swh_storage, datadir, tmp_path): """Conversion over pserver with RCS files in the Attic""" # This repository has some file revisions marked "dead" in the Attic only. # This is different to the re-added file tests above, where the RCS file # was moved out of the Attic again as soon as the corresponding deleted # file was re-added. Failure to detect the "dead" file revisions in the # Attic would result in errors in our converted history. # This has special implications for the pserver case, because the "dead" # revisions will not appear in in the output of 'cvs rlog' by default. archive_name = "dino-readded-file" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/src" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=DINO_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 38, "directory": 70, "origin": 1, "origin_visit": 1, "release": 0, "revision": 35, "skipped_content": 0, "snapshot": 1, } check_snapshot(DINO_SNAPSHOT, loader.storage) DINO_SNAPSHOT2 = Snapshot( id=hash_to_bytes("afdeca6b8ec8f58367b4e014e2210233f1c5bf3d"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("84e428103d42b84713c77afb9420d667062f8676"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_split_commits_by_commitid(swh_storage, datadir, tmp_path): """Conversion of RCS history which needs to be split by commit ID""" # This repository has some file revisions which use the same log message # and can only be told apart by commit IDs. Without commit IDs, these commits # would get merged into a single commit in our conversion result. archive_name = "dino-commitid" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/dino" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=DINO_SNAPSHOT2.id, ) check_snapshot(DINO_SNAPSHOT2, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 18, "directory": 18, "origin": 1, "origin_visit": 1, "release": 0, "revision": 18, "skipped_content": 0, "snapshot": 1, } def test_loader_cvs_pserver_split_commits_by_commitid(swh_storage, datadir, tmp_path): """Conversion via pserver which needs to be split by commit ID""" # This repository has some file revisions which use the same log message # and can only be told apart by commit IDs. Without commit IDs, these commits # would get merged into a single commit in our conversion result. archive_name = "dino-commitid" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/dino" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=DINO_SNAPSHOT2.id, ) check_snapshot(DINO_SNAPSHOT2, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 18, "directory": 18, "origin": 1, "origin_visit": 1, "release": 0, "revision": 18, "skipped_content": 0, "snapshot": 1, } GREEK_SNAPSHOT6 = Snapshot( id=hash_to_bytes("859ae7ca5b31fee594c98abecdd41eff17cae079"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("fa48fb4551898cd8d3305cace971b3b95639e83e"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_empty_lines_in_log_message(swh_storage, datadir, tmp_path): """Conversion of RCS history with empty lines in a log message""" archive_name = "greek-repository6" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT6.id, ) check_snapshot(GREEK_SNAPSHOT6, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 9, "directory": 14, "origin": 1, "origin_visit": 1, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 1, } def test_loader_cvs_pserver_empty_lines_in_log_message(swh_storage, datadir, tmp_path): """Conversion via pserver with empty lines in a log message""" archive_name = "greek-repository6" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT6.id, ) check_snapshot(GREEK_SNAPSHOT6, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 9, "directory": 14, "origin": 1, "origin_visit": 1, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 1, } def get_head_revision_paths_info(loader: CvsLoader) -> Dict[bytes, Dict[str, Any]]: assert loader.snapshot is not None root_dir = loader.snapshot.branches[b"HEAD"].target revision = loader.storage.revision_get([root_dir])[0] assert revision is not None paths = {} for entry in loader.storage.directory_ls(revision.directory, recursive=True): paths[entry["name"]] = entry return paths def test_loader_cvs_with_header_keyword(swh_storage, datadir, tmp_path): """Eventful conversion of history with Header keyword in a file""" archive_name = "greek-repository7" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} repo_url = f"fake://{repo_url[7:]}" loader2 = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader2.load() == {"status": "eventful"} # We cannot verify the snapshot ID. It is unpredicable due to use of the $Header$ # RCS keyword which contains the temporary directory where the repository is stored. expected_stats = { "content": 9, "directory": 14, "origin": 2, "origin_visit": 2, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 1, } stats = get_stats(loader.storage) assert stats == expected_stats stats = get_stats(loader2.storage) assert stats == expected_stats # Ensure that file 'alpha', which contains a $Header$ keyword, # was imported with equal content via file:// and fake:// URLs. paths = get_head_revision_paths_info(loader) paths2 = get_head_revision_paths_info(loader2) alpha = paths[b"alpha"] alpha2 = paths2[b"alpha"] assert alpha["sha1"] == alpha2["sha1"] GREEK_SNAPSHOT8 = Snapshot( id=hash_to_bytes("5278a1f73ed0f804c68f72614a5f78ca5074ab9c"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("b389258fec8151d719e79da80b5e5355a48ec8bc"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_expand_log_keyword(swh_storage, datadir, tmp_path): """Conversion of RCS history with Log keyword in files""" archive_name = "greek-repository8" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT8.id, ) check_snapshot(GREEK_SNAPSHOT8, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 14, "directory": 20, "origin": 1, "origin_visit": 1, "release": 0, "revision": 11, "skipped_content": 0, "snapshot": 1, } def test_loader_cvs_pserver_expand_log_keyword(swh_storage, datadir, tmp_path): """Conversion of RCS history with Log keyword in files""" archive_name = "greek-repository8" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT8.id, ) check_snapshot(GREEK_SNAPSHOT8, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 14, "directory": 20, "origin": 1, "origin_visit": 1, "release": 0, "revision": 11, "skipped_content": 0, "snapshot": 1, } GREEK_SNAPSHOT9 = Snapshot( id=hash_to_bytes("3d08834666df7a589abea07ac409771ebe7e8fe4"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("9971cbb3b540dfe75f3bcce5021cb73d63b47df3"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_visit_expand_custom_keyword(swh_storage, datadir, tmp_path): """Visit to CVS repository with file with a custom RCS keyword""" archive_name = "greek-repository9" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT9.id, ) stats = get_stats(loader.storage) assert stats == { "content": 9, "directory": 14, "origin": 1, "origin_visit": 1, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT9, loader.storage) RCSBASE_SNAPSHOT = Snapshot( id=hash_to_bytes("2c75041ba8868df04349c1c8f4c29f992967b8aa"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("46f076387ff170dc3d4da5e43d953c1fc744c821"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_expand_log_keyword2(swh_storage, datadir, tmp_path): """Another conversion of RCS history with Log keyword in files""" archive_name = "rcsbase-log-kw-test-repo" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/src" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=RCSBASE_SNAPSHOT.id, ) check_snapshot(RCSBASE_SNAPSHOT, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 2, "directory": 3, "origin": 1, "origin_visit": 1, "release": 0, "revision": 3, "skipped_content": 0, "snapshot": 1, } def test_loader_cvs_pserver_expand_log_keyword2(swh_storage, datadir, tmp_path): """Another conversion of RCS history with Log keyword in files""" archive_name = "rcsbase-log-kw-test-repo" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/src" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=RCSBASE_SNAPSHOT.id, ) check_snapshot(RCSBASE_SNAPSHOT, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 2, "directory": 3, "origin": 1, "origin_visit": 1, "release": 0, "revision": 3, "skipped_content": 0, "snapshot": 1, } @pytest.mark.parametrize( "rlog_unsafe_path", [ # paths that walk to parent directory: "unsafe_rlog_with_unsafe_relative_path.rlog", # absolute path outside the CVS server's root directory: "unsafe_rlog_wrong_arborescence.rlog", ], ) def test_loader_cvs_weird_paths_in_rlog( swh_storage, datadir, tmp_path, mocker, rlog_unsafe_path ): """Handle cvs rlog output which contains unsafe paths""" archive_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" # And let's pretend the server returned this rlog output instead of # what it would actually return. rlog_file = tempfile.NamedTemporaryFile( dir=tmp_path, mode="w+", delete=False, prefix="weird-path-rlog-" ) rlog_file_path = rlog_file.name rlog_weird_paths = open(os.path.join(datadir, rlog_unsafe_path)) - for line in rlog_weird_paths.readlines(): + for line in rlog_weird_paths: rlog_file.write(line.replace("{cvsroot_path}", os.path.dirname(repo_url[7:]))) rlog_file.close() rlog_file_override = open(rlog_file_path, "rb") # re-open as bytes instead of str mock_read = mocker.patch("swh.loader.cvs.cvsclient.CVSClient.fetch_rlog") mock_read.return_value = rlog_file_override def side_effect(self, path="", state=""): return None mock_read.side_effect = side_effect(side_effect) try: loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name), ) except BadPathException: pass assert loader.load() == {"status": "failed"} assert_last_visit_matches( swh_storage, repo_url, status="failed", type="cvs", ) assert mock_read.called rlog_file_override.close() os.unlink(rlog_file_path) def test_loader_rsync_retry(swh_storage, mocker, tmp_path): module_name = "module" host = "example.org" path = f"/cvsroot/{module_name}" repo_url = f"rsync://{host}{path}/" rsync_first_call = ["rsync", repo_url] rsync_second_call = [ "rsync", "-az", f"{repo_url}CVSROOT/", os.path.join(tmp_path, "CVSROOT/"), ] rsync_third_call = [ "rsync", "-az", f"{repo_url}{module_name}/", os.path.join(tmp_path, f"{module_name}/"), ] mock_subprocess = mocker.patch("swh.loader.cvs.loader.subprocess") mock_subprocess.run.side_effect = [ subprocess.CompletedProcess(args=rsync_first_call, returncode=23), subprocess.CompletedProcess( args=rsync_first_call, returncode=0, stdout=f""" drwxr-xr-x 21 2012/11/04 06:58:58 . drwxr-xr-x 39 2021/01/22 10:21:05 CVSROOT drwxr-xr-x 15 2020/12/28 00:50:21 {module_name}""", ), subprocess.CompletedProcess( args=rsync_second_call, returncode=23, ), subprocess.CompletedProcess( args=rsync_second_call, returncode=23, ), subprocess.CompletedProcess(args=rsync_second_call, returncode=0), subprocess.CompletedProcess( args=rsync_third_call, returncode=23, ), subprocess.CompletedProcess( args=rsync_third_call, returncode=23, ), subprocess.CompletedProcess(args=rsync_third_call, returncode=0), ] loader = CvsLoader(swh_storage, repo_url) loader.cvs_module_name = module_name loader.cvsroot_path = tmp_path loader.fetch_cvs_repo_with_rsync(host, path) @pytest.mark.parametrize( "pserver_url", [ "pserver://anonymous:anonymous@cvs.example.org/cvsroot/project/module", "pserver://anonymous@cvs.example.org/cvsroot/project/module", ], ) def test_cvs_client_connect_pserver(mocker, pserver_url): from swh.loader.cvs.cvsclient import socket conn = mocker.MagicMock() conn.recv.side_effect = [b"I LOVE YOU\n", b"Valid-requests \n", b"ok\n"] mocker.patch.object(socket, "create_connection").return_value = conn parsed_url = urlparse(pserver_url) # check cvs client can be instantiated without errors CVSClient(parsed_url) @pytest.mark.parametrize("protocol", ["rsync", "pserver"]) def test_loader_cvs_with_non_utf8_directory_paths( swh_storage, datadir, tmp_path, protocol ): archive_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/greek-tree" # CVS module name protocol_prefix = "file://" if protocol == "pserver": protocol_prefix = "fake://" repo_url = repo_url.replace("file://", protocol_prefix) for root, _, files in os.walk(repo_url.replace(protocol_prefix, "")): for file in files: # clone existing file in repository but makes it path non UTF-8 encoded filepath = os.path.join(root, file) with open(filepath, "rb") as f: filecontent = f.read() filepath = root.encode() + ("é" + file).encode("iso-8859-1") with open(filepath, "wb") as f: f.write(filecontent) loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} CPMIXIN_SNAPSHOT = Snapshot( id=hash_to_bytes("105b49290a48cc780f5519588ae822e2dd942930"), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes("658f18d145376f0b71716649602752b509cfdbd4"), target_type=TargetType.REVISION, ) }, ) @pytest.mark.parametrize("protocol", ["rsync", "pserver"]) def test_loader_cvs_with_rev_numbers_greater_than_one( swh_storage, datadir, tmp_path, protocol ): archive_name = "cpmixin" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/cpmixin" # CVS module name protocol_prefix = "file://" if protocol == "pserver": protocol_prefix = "fake://" repo_url = repo_url.replace("file://", protocol_prefix) loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=CPMIXIN_SNAPSHOT.id, ) check_snapshot(CPMIXIN_SNAPSHOT, loader.storage)