diff --git a/swh/loader/cvs/loader.py b/swh/loader/cvs/loader.py index 8f56eee..2e1ae68 100644 --- a/swh/loader/cvs/loader.py +++ b/swh/loader/cvs/loader.py @@ -1,556 +1,555 @@ # Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information """Loader in charge of injecting either new or existing cvs repositories to swh-storage. """ from datetime import datetime import os import os.path import subprocess import tempfile import time from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Sequence, Tuple, cast from urllib3.util import parse_url from swh.loader.core.loader import BaseLoader from swh.loader.core.utils import clean_dangling_folders from swh.loader.cvs.cvs2gitdump.cvs2gitdump import ( CHANGESET_FUZZ_SEC, ChangeSetKey, CvsConv, FileRevision, RcsKeywords, file_path, ) from swh.loader.cvs.cvsclient import CVSClient import swh.loader.cvs.rcsparse as rcsparse from swh.loader.cvs.rlog import RlogConv from swh.loader.exception import NotFound from swh.model import from_disk, hashutil from swh.model.model import ( Content, Directory, Origin, Person, Revision, RevisionType, Sha1Git, SkippedContent, Snapshot, SnapshotBranch, TargetType, TimestampWithTimezone, ) from swh.storage.algos.snapshot import snapshot_get_latest from swh.storage.interface import StorageInterface DEFAULT_BRANCH = b"HEAD" TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.cvs." class CvsLoader(BaseLoader): """Swh cvs loader. The repository is local. The loader deals with update on an already previously loaded repository. """ visit_type = "cvs" cvs_module_name: str cvsclient: CVSClient # remote CVS repository access (history is parsed from CVS rlog): rlog_file: BinaryIO swh_revision_gen: Iterator[ Tuple[List[Content], List[SkippedContent], List[Directory], Revision] ] def __init__( self, storage: StorageInterface, url: str, origin_url: Optional[str] = None, visit_date: Optional[datetime] = None, cvsroot_path: Optional[str] = None, temp_directory: str = "/tmp", max_content_size: Optional[int] = None, ): super().__init__( storage=storage, logging_class="swh.loader.cvs.CvsLoader", max_content_size=max_content_size, ) self.cvsroot_url = url # origin url as unique identifier for origin in swh archive self.origin_url = origin_url if origin_url else self.cvsroot_url self.temp_directory = temp_directory # internal state used to store swh objects self._contents: List[Content] = [] self._skipped_contents: List[SkippedContent] = [] self._directories: List[Directory] = [] self._revisions: List[Revision] = [] # internal state, current visit self._last_revision: Optional[Revision] = None self._visit_status = "full" self.visit_date = visit_date self.cvsroot_path = cvsroot_path self.snapshot: Optional[Snapshot] = None self.last_snapshot: Optional[Snapshot] = snapshot_get_latest( self.storage, self.origin_url ) def compute_swh_revision( self, k: ChangeSetKey, logmsg: Optional[bytes] ) -> Tuple[Revision, from_disk.Directory]: """Compute swh hash data per CVS changeset. Returns: tuple (rev, swh_directory) - rev: current SWH revision computed from checked out work tree - swh_directory: dictionary of path, swh hash data with type """ # Compute SWH revision from the on-disk state swh_dir = from_disk.Directory.from_disk(path=os.fsencode(self.worktree_path)) parents: Tuple[Sha1Git, ...] if self._last_revision: parents = (self._last_revision.id,) else: parents = () revision = self.build_swh_revision(k, logmsg, swh_dir.hash, parents) self.log.info("SWH revision ID: %s", hashutil.hash_to_hex(revision.id)) self._last_revision = revision return (revision, swh_dir) def checkout_file_with_rcsparse( self, k: ChangeSetKey, f: FileRevision, rcsfile: rcsparse.rcsfile ) -> None: assert self.cvsroot_path assert self.server_style_cvsroot path = file_path(self.cvsroot_path, f.path) - wtpath = os.path.join(self.worktree_path, path) + wtpath = os.path.join(self.tempdir_path, path) self.log.info("rev %s state %s file %s" % (f.rev, f.state, f.path)) if f.state == "dead": # remove this file from work tree try: os.remove(wtpath) except FileNotFoundError: pass else: # create, or update, this file in the work tree if not rcsfile: rcsfile = rcsparse.rcsfile(f.path) rcs = RcsKeywords() # We try our best to generate the same commit hashes over both pserver # and rsync. To avoid differences in file content due to expansion of # RCS keywords which contain absolute file paths (such as "Header"), # attempt to expand such paths in the same way as a regular CVS server # would expand them. # Whether this will avoid content differences depends on pserver and # rsync servers exposing the same server-side path to the CVS repository. # However, this is the best we can do, and only matters if an origin can # be fetched over both pserver and rsync. Each will still be treated as # a distinct origin, but will hopefully point at the same SWH snapshot. # In any case, an absolute path based on the origin URL looks nicer than # an absolute path based on a temporary directory used by the CVS loader. server_style_path = f.path.replace( self.cvsroot_path, self.server_style_cvsroot ) if server_style_path[0] != "/": server_style_path = "/" + server_style_path contents = rcs.expand_keyword(server_style_path, rcsfile, f.rev) os.makedirs(os.path.dirname(wtpath), exist_ok=True) outfile = open(wtpath, mode="wb") outfile.write(contents) outfile.close() def checkout_file_with_cvsclient( self, k: ChangeSetKey, f: FileRevision, cvsclient: CVSClient ): assert self.cvsroot_path path = file_path(self.cvsroot_path, f.path) - wtpath = os.path.join(self.worktree_path, path) + wtpath = os.path.join(self.tempdir_path, path) self.log.info("rev %s state %s file %s" % (f.rev, f.state, f.path)) if f.state == "dead": # remove this file from work tree try: os.remove(wtpath) except FileNotFoundError: pass else: dirname = os.path.dirname(wtpath) os.makedirs(dirname, exist_ok=True) self.log.debug("checkout to %s\n" % wtpath) fp = cvsclient.checkout(path, f.rev, dirname, expand_keywords=True) os.rename(fp.name, wtpath) try: fp.close() except FileNotFoundError: # Well, we have just renamed the file... pass def process_cvs_changesets( self, cvs_changesets: List[ChangeSetKey], use_rcsparse: bool, ) -> Iterator[ Tuple[List[Content], List[SkippedContent], List[Directory], Revision] ]: """Process CVS revisions. At each CVS revision, check out contents and compute swh hashes. Yields: tuple (contents, skipped-contents, directories, revision) of dict as a dictionary with keys, sha1_git, sha1, etc... """ for k in cvs_changesets: tstr = time.strftime("%c", time.gmtime(k.max_time)) self.log.info( "changeset from %s by %s on branch %s", tstr, k.author, k.branch ) logmsg: Optional[bytes] = b"" # Check out all files of this revision and get a log message. # # The log message is obtained from the first file in the changeset. # The message will usually be the same for all affected files, and # the SWH archive will only store one version of the log message. for f in k.revs: rcsfile = None if use_rcsparse: if rcsfile is None: rcsfile = rcsparse.rcsfile(f.path) if not logmsg: logmsg = rcsfile.getlog(k.revs[0].rev) self.checkout_file_with_rcsparse(k, f, rcsfile) else: if not logmsg: logmsg = self.rlog.getlog(self.rlog_file, f.path, k.revs[0].rev) self.checkout_file_with_cvsclient(k, f, self.cvsclient) # TODO: prune empty directories? (revision, swh_dir) = self.compute_swh_revision(k, logmsg) (contents, skipped_contents, directories) = from_disk.iter_directory( swh_dir ) yield contents, skipped_contents, directories, revision def prepare_origin_visit(self) -> None: self.origin = Origin( url=self.origin_url if self.origin_url else self.cvsroot_url ) def pre_cleanup(self) -> None: """Cleanup potential dangling files from prior runs (e.g. OOM killed tasks) """ clean_dangling_folders( self.temp_directory, pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, log=self.log, ) def cleanup(self) -> None: self.log.info("cleanup") def fetch_cvs_repo_with_rsync(self, host: str, path: str) -> None: # URL *must* end with a trailing slash in order to get CVSROOT listed url = "rsync://%s%s/" % (host, os.path.dirname(path)) rsync = subprocess.run(["rsync", url], capture_output=True, encoding="ascii") rsync.check_returncode() have_cvsroot = False have_module = False for line in rsync.stdout.split("\n"): self.log.debug("rsync server: %s", line) if line.endswith(" CVSROOT"): have_cvsroot = True elif line.endswith(" %s" % self.cvs_module_name): have_module = True if have_module and have_cvsroot: break if not have_module: raise NotFound( "CVS module %s not found at %s" % (self.cvs_module_name, url) ) if not have_cvsroot: raise NotFound("No CVSROOT directory found at %s" % url) - # mypy complains: List item 3 has incompatible type "Optional[str]"; - # because self.cvsroot_path is an optional argument. We do however - # ensure that it is initialized if the loader is not passed a - # corresponding argument. Better ideas than ignoring types on this line? + assert self.cvsroot_path subprocess.run( - ["rsync", "-a", url, self.cvsroot_path] # type: ignore + # Ensure that rsync will place files directly within our cvsroot + # directory by appending a "/" to our cvsroot path. + ["rsync", "-a", url, self.cvsroot_path + "/"] ).check_returncode() def prepare(self) -> None: self._last_revision = None - self.worktree_path = tempfile.mkdtemp( + self.tempdir_path = tempfile.mkdtemp( suffix="-%s" % os.getpid(), prefix=TEMPORARY_DIR_PREFIX_PATTERN, dir=self.temp_directory, ) url = parse_url(self.origin_url) self.log.debug( "prepare; origin_url=%s scheme=%s path=%s", self.origin_url, url.scheme, url.path, ) if not url.path: raise NotFound("Invalid CVS origin URL '%s'" % self.origin_url) self.cvs_module_name = os.path.basename(url.path) self.server_style_cvsroot = os.path.dirname(url.path) - os.mkdir(os.path.join(self.worktree_path, self.cvs_module_name)) + self.worktree_path = os.path.join(self.tempdir_path, self.cvs_module_name) if url.scheme == "file" or url.scheme == "rsync": # local CVS repository conversion if not self.cvsroot_path: self.cvsroot_path = tempfile.mkdtemp( suffix="-%s" % os.getpid(), prefix=TEMPORARY_DIR_PREFIX_PATTERN, dir=self.temp_directory, ) if url.scheme == "file": if not os.path.exists(url.path): raise NotFound elif url.scheme == "rsync": self.fetch_cvs_repo_with_rsync(url.host, url.path) have_rcsfile = False have_cvsroot = False for root, dirs, files in os.walk(self.cvsroot_path): if "CVSROOT" in dirs: have_cvsroot = True dirs.remove("CVSROOT") continue for f in files: filepath = os.path.join(root, f) if f[-2:] == ",v": rcsfile = rcsparse.rcsfile(filepath) # noqa: F841 self.log.debug( "Looks like we have data to convert; " "found a valid RCS file at %s", filepath, ) have_rcsfile = True break if have_rcsfile: break if not have_rcsfile: raise NotFound( "Directory %s does not contain any valid RCS files %s", self.cvsroot_path, ) if not have_cvsroot: self.log.warn( "The CVS repository at '%s' lacks a CVSROOT directory; " "we might be ingesting an incomplete copy of the repository", self.cvsroot_path, ) # Unfortunately, there is no way to convert CVS history in an # iterative fashion because the data is not indexed by any kind # of changeset ID. We need to walk the history of each and every # RCS file in the repository during every visit, even if no new # changes will be added to the SWH archive afterwards. # "CVS’s repository is the software equivalent of a telephone book # sorted by telephone number." # https://corecursive.com/software-that-doesnt-suck-with-jim-blandy/ # # An implicit assumption made here is that self.cvs_changesets will # fit into memory in its entirety. If it won't fit then the CVS walker # will need to be modified such that it spools the list of changesets # to disk instead. cvs = CvsConv(self.cvsroot_path, RcsKeywords(), False, CHANGESET_FUZZ_SEC) self.log.info("Walking CVS module %s", self.cvs_module_name) cvs.walk(self.cvs_module_name) cvs_changesets = sorted(cvs.changesets) self.log.info( "CVS changesets found in %s: %d", self.cvs_module_name, len(cvs_changesets), ) self.swh_revision_gen = self.process_cvs_changesets( cvs_changesets, use_rcsparse=True ) elif url.scheme == "pserver" or url.scheme == "fake" or url.scheme == "ssh": # remote CVS repository conversion if not self.cvsroot_path: self.cvsroot_path = os.path.dirname(url.path) self.cvsclient = CVSClient(url) cvsroot_path = os.path.dirname(url.path) self.log.info( "Fetching CVS rlog from %s:%s/%s", url.host, cvsroot_path, self.cvs_module_name, ) self.rlog = RlogConv(cvsroot_path, CHANGESET_FUZZ_SEC) main_rlog_file = self.cvsclient.fetch_rlog() self.rlog.parse_rlog(main_rlog_file) # Find file deletion events only visible in Attic directories. main_changesets = self.rlog.changesets attic_paths = [] attic_rlog_files = [] assert self.cvsroot_path for k in main_changesets: for changed_file in k.revs: path = file_path(self.cvsroot_path, changed_file.path) if path.startswith(self.cvsroot_path): path = path[ len(os.path.commonpath([self.cvsroot_path, path])) + 1 : ] parent_path = os.path.dirname(path) if parent_path.split("/")[-1] == "Attic": continue attic_path = parent_path + "/Attic" if attic_path in attic_paths: continue attic_paths.append(attic_path) # avoid multiple visits # Try to fetch more rlog data from this Attic directory. attic_rlog_file = self.cvsclient.fetch_rlog( path=attic_path, state="dead", ) if attic_rlog_file: attic_rlog_files.append(attic_rlog_file) if len(attic_rlog_files) == 0: self.rlog_file = main_rlog_file else: # Combine all the rlog pieces we found and re-parse. fp = tempfile.TemporaryFile() for attic_rlog_file in attic_rlog_files: for line in attic_rlog_file.readlines(): fp.write(line) attic_rlog_file.close() main_rlog_file.seek(0) for line in main_rlog_file.readlines(): fp.write(line) main_rlog_file.close() fp.seek(0) self.rlog.parse_rlog(cast(BinaryIO, fp)) self.rlog_file = cast(BinaryIO, fp) cvs_changesets = sorted(self.rlog.changesets) self.log.info( "CVS changesets found for %s: %d", self.cvs_module_name, len(cvs_changesets), ) self.swh_revision_gen = self.process_cvs_changesets( cvs_changesets, use_rcsparse=False ) else: raise NotFound("Invalid CVS origin URL '%s'" % self.origin_url) def fetch_data(self) -> bool: """Fetch the next CVS revision.""" try: data = next(self.swh_revision_gen) except StopIteration: assert self._last_revision is not None self.snapshot = self.generate_and_load_snapshot(self._last_revision) self.log.info("SWH snapshot ID: %s", hashutil.hash_to_hex(self.snapshot.id)) self.flush() self.loaded_snapshot_id = self.snapshot.id return False except Exception: self.log.exception("Exception in fetch_data:") return False # Stopping iteration self._contents, self._skipped_contents, self._directories, rev = data self._revisions = [rev] return True def build_swh_revision( self, k: ChangeSetKey, logmsg: Optional[bytes], dir_id: bytes, parents: Sequence[bytes], ) -> Revision: """Given a CVS revision, build a swh revision. Args: k: changeset data logmsg: the changeset's log message dir_id: the tree's hash identifier parents: the revision's parents identifier Returns: The swh revision dictionary. """ author = Person.from_fullname(k.author.encode("UTF-8")) date = TimestampWithTimezone.from_dict(k.max_time) return Revision( type=RevisionType.CVS, date=date, committer_date=date, directory=dir_id, message=logmsg, author=author, committer=author, synthetic=True, extra_headers=[], parents=tuple(parents), ) def generate_and_load_snapshot(self, revision: Revision) -> Snapshot: """Create the snapshot either from existing revision. Args: revision (dict): Last revision seen if any (None by default) Returns: Optional[Snapshot] The newly created snapshot """ snap = Snapshot( branches={ DEFAULT_BRANCH: SnapshotBranch( target=revision.id, target_type=TargetType.REVISION ) } ) self.log.debug("snapshot: %s", snap) self.storage.snapshot_add([snap]) return snap def store_data(self) -> None: "Add our current CVS changeset to the archive." self.storage.skipped_content_add(self._skipped_contents) self.storage.content_add(self._contents) self.storage.directory_add(self._directories) self.storage.revision_add(self._revisions) self.flush() self._skipped_contents = [] self._contents = [] self._directories = [] self._revisions = [] def load_status(self) -> Dict[str, Any]: assert self.snapshot is not None if self.last_snapshot == self.snapshot: load_status = "uneventful" else: load_status = "eventful" return { "status": load_status, } def visit_status(self) -> str: return self._visit_status diff --git a/swh/loader/cvs/tests/test_loader.py b/swh/loader/cvs/tests/test_loader.py index 8866000..c0b3f70 100644 --- a/swh/loader/cvs/tests/test_loader.py +++ b/swh/loader/cvs/tests/test_loader.py @@ -1,949 +1,949 @@ # Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import os from typing import Any, Dict from swh.loader.cvs.loader import CvsLoader from swh.loader.tests import ( assert_last_visit_matches, check_snapshot, get_stats, prepare_repository_from_archive, ) from swh.model.hashutil import hash_to_bytes from swh.model.model import Snapshot, SnapshotBranch, TargetType RUNBABY_SNAPSHOT = Snapshot( - id=hash_to_bytes("1cff69ab9bd70822d5e3006092f943ccaafdcf57"), + id=hash_to_bytes("e64667c400049f560a3856580e0d9e511ffa66c9"), branches={ b"HEAD": SnapshotBranch( - target=hash_to_bytes("ef511d258fa55035c2bc2a5b05cad233cee1d328"), + target=hash_to_bytes("0f6db8ce49472d7829ddd6141f71c68c0d563f0e"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_not_found_no_mock(swh_storage, tmp_path): """Given an unknown repository, the loader visit ends up in status not_found""" unknown_repo_url = "unknown-repository" loader = CvsLoader(swh_storage, unknown_repo_url, cvsroot_path=tmp_path) assert loader.load() == {"status": "uneventful"} assert_last_visit_matches( swh_storage, unknown_repo_url, status="not_found", type="cvs", ) def test_loader_cvs_visit(swh_storage, datadir, tmp_path): """Eventful visit should yield 1 snapshot""" archive_name = "runbaby" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=RUNBABY_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 5, - "directory": 2, + "directory": 1, "origin": 1, "origin_visit": 1, "release": 0, "revision": 1, "skipped_content": 0, "snapshot": 1, } check_snapshot(RUNBABY_SNAPSHOT, loader.storage) def test_loader_cvs_2_visits_no_change(swh_storage, datadir, tmp_path): """Eventful visit followed by uneventful visit should yield the same snapshot """ archive_name = "runbaby" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} visit_status1 = assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=RUNBABY_SNAPSHOT.id, ) loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "uneventful"} visit_status2 = assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=RUNBABY_SNAPSHOT.id, ) assert visit_status1.date < visit_status2.date assert visit_status1.snapshot == visit_status2.snapshot stats = get_stats(loader.storage) assert stats["origin_visit"] == 1 + 1 # computed twice the same snapshot assert stats["snapshot"] == 1 GREEK_SNAPSHOT = Snapshot( - id=hash_to_bytes("5e74af67d69dfd7aea0eb118154d062f71f50120"), + id=hash_to_bytes("c76f8b58a6dfbe6fccb9a85b695f914aa5c4a95a"), branches={ b"HEAD": SnapshotBranch( - target=hash_to_bytes("e18b92f14cd5b3efb3fcb4ea46cfaf97f25f301b"), + target=hash_to_bytes("e138207ddd5e1965b5ab9a522bfc2e0ecd233b67"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_with_file_additions_and_deletions(swh_storage, datadir, tmp_path): """Eventful conversion of history with file additions and deletions""" archive_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 8, - "directory": 20, + "directory": 13, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT, loader.storage) def test_loader_cvs_pserver_with_file_additions_and_deletions( swh_storage, datadir, tmp_path ): """Eventful CVS pserver conversion with file additions and deletions""" archive_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 8, - "directory": 20, + "directory": 13, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT, loader.storage) GREEK_SNAPSHOT2 = Snapshot( - id=hash_to_bytes("048885ae2145ffe81588aea95dcf75c536ecdf26"), + id=hash_to_bytes("e3d2e8860286000f546c01aa2a3e1630170eb3b6"), branches={ b"HEAD": SnapshotBranch( - target=hash_to_bytes("55eb1438c03588607ce4b8db8f45e8e23075951b"), + target=hash_to_bytes("f1ff9a3c7624b1be5e5d51f9ec0abf7dcddbf0b2"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_2_visits_with_change(swh_storage, datadir, tmp_path): """Eventful visit followed by eventful visit should yield two snapshots""" archive_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} visit_status1 = assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 8, - "directory": 20, + "directory": 13, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 1, } archive_name2 = "greek-repository2" archive_path2 = os.path.join(datadir, f"{archive_name2}.tgz") repo_url = prepare_repository_from_archive(archive_path2, archive_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} visit_status2 = assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT2.id, ) stats = get_stats(loader.storage) assert stats == { "content": 10, - "directory": 23, + "directory": 15, "origin": 1, "origin_visit": 2, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 2, } check_snapshot(GREEK_SNAPSHOT2, loader.storage) assert visit_status1.date < visit_status2.date assert visit_status1.snapshot != visit_status2.snapshot def test_loader_cvs_visit_pserver(swh_storage, datadir, tmp_path): """Eventful visit to CVS pserver should yield 1 snapshot""" archive_name = "runbaby" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/runbaby" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=RUNBABY_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 5, - "directory": 2, + "directory": 1, "origin": 1, "origin_visit": 1, "release": 0, "revision": 1, "skipped_content": 0, "snapshot": 1, } check_snapshot(RUNBABY_SNAPSHOT, loader.storage) GREEK_SNAPSHOT3 = Snapshot( - id=hash_to_bytes("cd801546b0137c82f01b9b67848ba8261d64ebbb"), + id=hash_to_bytes("6e9910ed072662cb482d9017cbf5e1973e6dc09f"), branches={ b"HEAD": SnapshotBranch( - target=hash_to_bytes("14980990790ce1921db953c4c9ae03dd8861e8d6"), + target=hash_to_bytes("d9f4837dc55a87d83730c6e277c88b67dae80272"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_visit_pserver_no_eol(swh_storage, datadir, tmp_path): """Visit to CVS pserver with file that lacks trailing eol""" archive_name = "greek-repository3" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT3.id, ) stats = get_stats(loader.storage) assert stats == { "content": 9, - "directory": 23, + "directory": 15, "origin": 1, "origin_visit": 1, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT3, loader.storage) GREEK_SNAPSHOT4 = Snapshot( - id=hash_to_bytes("26e943053ea9c5f961336a72328cac22026ed3b5"), + id=hash_to_bytes("a8593e9233601b31e012d36975f817d2c993d04b"), branches={ b"HEAD": SnapshotBranch( - target=hash_to_bytes("ed784aff0e0743244bb1f30ba21c8abcd0d460ab"), + target=hash_to_bytes("51bb99655225c810ee259087fcae505899725360"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_visit_expand_id_keyword(swh_storage, datadir, tmp_path): """Visit to CVS repository with file with an RCS Id keyword""" archive_name = "greek-repository4" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT4.id, ) stats = get_stats(loader.storage) assert stats == { "content": 12, - "directory": 31, + "directory": 20, "origin": 1, "origin_visit": 1, "release": 0, "revision": 11, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT4, loader.storage) def test_loader_cvs_visit_pserver_expand_id_keyword(swh_storage, datadir, tmp_path): """Visit to CVS pserver with file with an RCS Id keyword""" archive_name = "greek-repository4" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT4.id, ) stats = get_stats(loader.storage) assert stats == { "content": 12, - "directory": 31, + "directory": 20, "origin": 1, "origin_visit": 1, "release": 0, "revision": 11, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT4, loader.storage) GREEK_SNAPSHOT5 = Snapshot( - id=hash_to_bytes("ee6faeaf50aa513c53c8ba29194116a5ef88add6"), + id=hash_to_bytes("6484ec9bfff677731cbb6d2bd5058dabfae952ed"), branches={ b"HEAD": SnapshotBranch( - target=hash_to_bytes("4320f152cc61ed660d25fdeebc787b3099e55a96"), + target=hash_to_bytes("514b3bef07d56e393588ceda18cc1dfa2dc4e04a"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_with_file_deleted_and_readded(swh_storage, datadir, tmp_path): """Eventful conversion of history with file deletion and re-addition""" archive_name = "greek-repository5" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT5.id, ) stats = get_stats(loader.storage) assert stats == { "content": 9, - "directory": 22, + "directory": 14, "origin": 1, "origin_visit": 1, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT5, loader.storage) def test_loader_cvs_pserver_with_file_deleted_and_readded( swh_storage, datadir, tmp_path ): """Eventful pserver conversion with file deletion and re-addition""" archive_name = "greek-repository5" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT5.id, ) stats = get_stats(loader.storage) assert stats == { "content": 9, - "directory": 22, + "directory": 14, "origin": 1, "origin_visit": 1, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 1, } check_snapshot(GREEK_SNAPSHOT5, loader.storage) DINO_SNAPSHOT = Snapshot( - id=hash_to_bytes("417021c16e17c5e0038cf0e73dbf48a6142c8304"), + id=hash_to_bytes("6cf774cec1030ff3e9a301681303adb537855d09"), branches={ b"HEAD": SnapshotBranch( - target=hash_to_bytes("df61a776c401a178cc796545849fc87bdadb2001"), + target=hash_to_bytes("b7d3ea1fa878d51323b5200ad2c6ee9d5b656f10"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_readded_file_in_attic(swh_storage, datadir, tmp_path): """Conversion of history with RCS files in the Attic""" # This repository has some file revisions marked "dead" in the Attic only. # This is different to the re-added file tests above, where the RCS file # was moved out of the Attic again as soon as the corresponding deleted # file was re-added. Failure to detect the "dead" file revisions in the # Attic would result in errors in our converted history. archive_name = "dino-readded-file" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/src" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=DINO_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 38, - "directory": 105, + "directory": 70, "origin": 1, "origin_visit": 1, "release": 0, "revision": 35, "skipped_content": 0, "snapshot": 1, } check_snapshot(DINO_SNAPSHOT, loader.storage) def test_loader_cvs_pserver_readded_file_in_attic(swh_storage, datadir, tmp_path): """Conversion over pserver with RCS files in the Attic""" # This repository has some file revisions marked "dead" in the Attic only. # This is different to the re-added file tests above, where the RCS file # was moved out of the Attic again as soon as the corresponding deleted # file was re-added. Failure to detect the "dead" file revisions in the # Attic would result in errors in our converted history. # This has special implications for the pserver case, because the "dead" # revisions will not appear in in the output of 'cvs rlog' by default. archive_name = "dino-readded-file" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/src" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=DINO_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { "content": 38, - "directory": 105, + "directory": 70, "origin": 1, "origin_visit": 1, "release": 0, "revision": 35, "skipped_content": 0, "snapshot": 1, } check_snapshot(DINO_SNAPSHOT, loader.storage) DINO_SNAPSHOT2 = Snapshot( - id=hash_to_bytes("a9d6ce0b4f22dc4fd752ad4c25ec9ea71ed568d7"), + id=hash_to_bytes("afdeca6b8ec8f58367b4e014e2210233f1c5bf3d"), branches={ b"HEAD": SnapshotBranch( - target=hash_to_bytes("150616a2a3206f00a73f2d6a017dde22c52e4a83"), + target=hash_to_bytes("84e428103d42b84713c77afb9420d667062f8676"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_split_commits_by_commitid(swh_storage, datadir, tmp_path): """Conversion of RCS history which needs to be split by commit ID""" # This repository has some file revisions which use the same log message # and can only be told apart by commit IDs. Without commit IDs, these commits # would get merged into a single commit in our conversion result. archive_name = "dino-commitid" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/dino" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=DINO_SNAPSHOT2.id, ) check_snapshot(DINO_SNAPSHOT2, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 18, - "directory": 36, + "directory": 18, "origin": 1, "origin_visit": 1, "release": 0, "revision": 18, "skipped_content": 0, "snapshot": 1, } def test_loader_cvs_pserver_split_commits_by_commitid(swh_storage, datadir, tmp_path): """Conversion via pserver which needs to be split by commit ID""" # This repository has some file revisions which use the same log message # and can only be told apart by commit IDs. Without commit IDs, these commits # would get merged into a single commit in our conversion result. archive_name = "dino-commitid" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) repo_url += "/dino" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=DINO_SNAPSHOT2.id, ) check_snapshot(DINO_SNAPSHOT2, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 18, - "directory": 36, + "directory": 18, "origin": 1, "origin_visit": 1, "release": 0, "revision": 18, "skipped_content": 0, "snapshot": 1, } GREEK_SNAPSHOT6 = Snapshot( - id=hash_to_bytes("b4c9423b2711c181251deb458d4ab4a3172948ac"), + id=hash_to_bytes("859ae7ca5b31fee594c98abecdd41eff17cae079"), branches={ b"HEAD": SnapshotBranch( - target=hash_to_bytes("f317c720e1929fec0afce10e6a8cfd24ef76dfc7"), + target=hash_to_bytes("fa48fb4551898cd8d3305cace971b3b95639e83e"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_empty_lines_in_log_message(swh_storage, datadir, tmp_path): """Conversion of RCS history with empty lines in a log message""" archive_name = "greek-repository6" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT6.id, ) check_snapshot(GREEK_SNAPSHOT6, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 9, - "directory": 22, + "directory": 14, "origin": 1, "origin_visit": 1, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 1, } def test_loader_cvs_pserver_empty_lines_in_log_message(swh_storage, datadir, tmp_path): """Conversion via pserver with empty lines in a log message""" archive_name = "greek-repository6" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT6.id, ) check_snapshot(GREEK_SNAPSHOT6, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 9, - "directory": 22, + "directory": 14, "origin": 1, "origin_visit": 1, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 1, } def get_head_revision_paths_info(loader: CvsLoader) -> Dict[bytes, Dict[str, Any]]: assert loader.snapshot is not None root_dir = loader.snapshot.branches[b"HEAD"].target revision = loader.storage.revision_get([root_dir])[0] assert revision is not None paths = {} for entry in loader.storage.directory_ls(revision.directory, recursive=True): paths[entry["name"]] = entry return paths def test_loader_cvs_with_header_keyword(swh_storage, datadir, tmp_path): """Eventful conversion of history with Header keyword in a file""" archive_name = "greek-repository7" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} repo_url = f"fake://{repo_url[7:]}" loader2 = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader2.load() == {"status": "eventful"} # We cannot verify the snapshot ID. It is unpredicable due to use of the $Header$ # RCS keyword which contains the temporary directory where the repository is stored. expected_stats = { "content": 9, - "directory": 22, + "directory": 14, "origin": 2, "origin_visit": 2, "release": 0, "revision": 8, "skipped_content": 0, "snapshot": 1, } stats = get_stats(loader.storage) assert stats == expected_stats stats = get_stats(loader2.storage) assert stats == expected_stats # Ensure that file 'alpha', which contains a $Header$ keyword, # was imported with equal content via file:// and fake:// URLs. paths = get_head_revision_paths_info(loader) paths2 = get_head_revision_paths_info(loader2) - alpha = paths[b"greek-tree/alpha"] - alpha2 = paths2[b"greek-tree/alpha"] + alpha = paths[b"alpha"] + alpha2 = paths2[b"alpha"] assert alpha["sha1"] == alpha2["sha1"] GREEK_SNAPSHOT8 = Snapshot( - id=hash_to_bytes("b98a2744199723be827d48bad2f65ee1c2df7513"), + id=hash_to_bytes("5278a1f73ed0f804c68f72614a5f78ca5074ab9c"), branches={ b"HEAD": SnapshotBranch( - target=hash_to_bytes("ee8be88b458b7fbca3037ab05e56552578e66faa"), + target=hash_to_bytes("b389258fec8151d719e79da80b5e5355a48ec8bc"), target_type=TargetType.REVISION, ) }, ) def test_loader_cvs_expand_log_keyword(swh_storage, datadir, tmp_path): """Conversion of RCS history with Log keyword in files""" archive_name = "greek-repository8" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT8.id, ) check_snapshot(GREEK_SNAPSHOT8, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 14, - "directory": 31, + "directory": 20, "origin": 1, "origin_visit": 1, "release": 0, "revision": 11, "skipped_content": 0, "snapshot": 1, } def test_loader_cvs_pserver_expand_log_keyword(swh_storage, datadir, tmp_path): """Conversion of RCS history with Log keyword in files""" archive_name = "greek-repository8" extracted_name = "greek-repository" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) repo_url += "/greek-tree" # CVS module name # Ask our cvsclient to connect via the 'cvs server' command repo_url = f"fake://{repo_url[7:]}" loader = CvsLoader( swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT8.id, ) check_snapshot(GREEK_SNAPSHOT8, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 14, - "directory": 31, + "directory": 20, "origin": 1, "origin_visit": 1, "release": 0, "revision": 11, "skipped_content": 0, "snapshot": 1, }