diff --git a/swh/loader/cvs/loader.py b/swh/loader/cvs/loader.py index 7eea077..038eec8 100644 --- a/swh/loader/cvs/loader.py +++ b/swh/loader/cvs/loader.py @@ -1,339 +1,342 @@ # Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Loader in charge of injecting either new or existing cvs repositories to swh-storage. """ from datetime import datetime from mmap import ACCESS_WRITE, mmap import os import pty import re import shutil import subprocess import tempfile import time from typing import Dict, Iterator, List, Optional, Sequence, Tuple from urllib3.util import parse_url from swh.loader.core.loader import BaseLoader from swh.loader.core.utils import clean_dangling_folders from swh.loader.exception import NotFound import swh.loader.cvs.rcsparse as rcsparse from swh.loader.cvs.cvs2gitdump.cvs2gitdump import CvsConv, RcsKeywords, CHANGESET_FUZZ_SEC, file_path, ChangeSetKey from swh.model import from_disk, hashutil from swh.model.model import Person, Revision, RevisionType, TimestampWithTimezone from swh.model.model import ( Content, Directory, Origin, Revision, SkippedContent, Snapshot, SnapshotBranch, TargetType, ) from swh.storage.algos.snapshot import snapshot_get_latest from swh.storage.interface import StorageInterface DEFAULT_BRANCH = b"HEAD" TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.cvs." class CvsLoader(BaseLoader): """Swh cvs loader. The repository is local. The loader deals with update on an already previously loaded repository. """ visit_type = "cvs" def __init__( self, storage: StorageInterface, url: str, origin_url: Optional[str] = None, visit_date: Optional[datetime] = None, cvsroot_path: Optional[str] = None, swh_revision: Optional[str] = None, start_from_scratch: bool = False, temp_directory: str = "/tmp", debug: bool = False, check_revision: int = 0, max_content_size: Optional[int] = None, ): super().__init__( storage=storage, logging_class="swh.loader.cvs.CvsLoader", max_content_size=max_content_size, ) self.cvsroot_url = url # origin url as unique identifier for origin in swh archive self.origin_url = origin_url if origin_url else self.cvsroot_url self.debug = debug self.temp_directory = temp_directory self.done = False self.cvs_module_name = None self.cvs_module_path = None self.cvs_changesets = None self.rcs = RcsKeywords() # Revision check is configurable self.check_revision = check_revision # internal state used to store swh objects self._contents: List[Content] = [] self._skipped_contents: List[SkippedContent] = [] self._directories: List[Directory] = [] self._revisions: List[Revision] = [] self._snapshot: Optional[Snapshot] = None # internal state, current visit self._last_revision = None self._visit_status = "full" self._load_status = "uneventful" self.visit_date = visit_date self.cvsroot_path = cvsroot_path self.start_from_scratch = start_from_scratch self.snapshot = None # state from previous visit self.latest_snapshot = None self.latest_revision = None def prepare_origin_visit(self): self.origin = Origin(url=self.origin_url if self.origin_url else self.cvsroot_url) def cleanup(self): self.log.info("cleanup") def fetch_cvs_repo_with_rsync(self, host, path_on_server): # URL *must* end with a trailing slash in order to get CVSROOT listed url = 'rsync://%s%s/' % (host, path_on_server) rsync = subprocess.run(['rsync', url], capture_output=True, encoding='ascii') rsync.check_returncode() have_cvsroot = False have_module = False for line in rsync.stdout.split('\n'): self.log.debug("rsync server: %s" % line) if line.endswith(' CVSROOT'): have_cvsroot = True elif line.endswith(' %s' % self.cvs_module_name): have_module = True if have_module and have_cvsroot: break if not have_module: raise NotFound("CVS module %s not found at %s" \ % (self.cvs_module_name, host, url)) if not have_cvsroot: raise NotFound("No CVSROOT directory found at %s" % url) rsync = subprocess.run(['rsync', '-a', url, self.cvsroot_path]) rsync.check_returncode() def prepare(self): if not self.cvsroot_path: self.cvsroot_path = tempfile.mkdtemp( suffix="-%s" % os.getpid(), prefix=TEMPORARY_DIR_PREFIX_PATTERN, dir=self.temp_directory, ) self.worktree_path = tempfile.mkdtemp( suffix="-%s" % os.getpid(), prefix=TEMPORARY_DIR_PREFIX_PATTERN, dir=self.temp_directory, ) url = parse_url(self.origin_url) self.log.debug("prepare; origin_url=%s scheme=%s path=%s" % (self.origin_url, url.scheme, url.path)) if not url.path: raise NotFound("Invalid CVS origin URL '%s'" % self.origin_url) self.cvs_module_name = os.path.basename(url.path) os.mkdir(os.path.join(self.worktree_path, self.cvs_module_name)); self.cvs_module_path = os.path.join(self.cvsroot_path, self.cvs_module_name) if url.scheme == 'file': if not os.path.exists(url.path): raise NotFound elif url.scheme == 'rsync': self.fetch_cvs_repo_with_rsync(url.host, url.path) else: raise NotFound("Invalid CVS origin URL '%s'" % self.origin_url) have_rcsfile = False have_cvsroot = False for root, dirs, files in os.walk(self.cvsroot_path): if 'CVSROOT' in dirs: have_cvsroot = True dirs.remove('CVSROOT') continue; for f in files: filepath = os.path.join(root, f) if f[-2:] == ',v': try: rcsfile = rcsparse.rcsfile(filepath) except(Exception): raise else: self.log.debug("Looks like we have data to convert; " "found a valid RCS file at %s" % filepath) have_rcsfile = True break if have_rcsfile: break; if not have_rcsfile: raise NotFound("Directory %s does not contain any valid RCS files %s" % self.cvsroot_path) if not have_cvsroot: self.log.warn("The CVS repository at '%s' lacks a CVSROOT directory; " "we might be ingesting an incomplete copy of the repository" % self.cvsroot_path) def fetch_data(self): """Fetch CVS revision information. Unfortunately, there is no way to convert CVS history in an iterative fashion because the data is not indexed by any kind of changeset ID. We need to walk the history of each and every RCS file in the repository during every visit, even if no new changes will be added to the SWH archive afterwards. "CVS’s repository is the software equivalent of a telephone book sorted by telephone number." https://corecursive.com/software-that-doesnt-suck-with-jim-blandy/ An implicit assumption made here is that self.cvs_changesets will fit into memory in its entirety. If it won't fit then the CVS walker will need to be modified such that it spools the list of changesets to disk instead. """ cvs = CvsConv(self.cvsroot_path, self.rcs, False, CHANGESET_FUZZ_SEC) self.log.debug("Walking CVS module %s", self.cvs_module_name) cvs.walk(self.cvs_module_name) self.cvs_changesets = sorted(cvs.changesets) self.log.info('CVS changesets found in %s: %d' % (self.cvs_module_name, len(self.cvs_changesets))) + if len(self.cvs_changesets) > 0: + self._load_status = "eventful" def build_swh_revision(self, k: ChangeSetKey, dir_id: bytes, parents: Sequence[bytes] ) -> Revision: """Given a CVS revision, build a swh revision. Args: commit: the commit data: revision id, date, author, and message dir_id: the tree's hash identifier parents: the revision's parents identifier Returns: The swh revision dictionary. """ author = Person.from_fullname(k.author.encode('UTF-8')) date = TimestampWithTimezone.from_datetime(k.max_time) # XXX parsing the rcsfile twice, once in expand_keyword(), and again here rcs = rcsparse.rcsfile(k.revs[0].path) msg = rcs.getlog(k.revs[0].rev) return Revision( type=RevisionType.CVS, date=date, committer_date=date, directory=dir_id, message=msg, author=author, committer=author, synthetic=True, extra_headers=[], parents=tuple(parents)) def generate_and_load_snapshot( self, revision: Optional[Revision] = None, snapshot: Optional[Snapshot] = None ) -> Snapshot: """Create the snapshot either from existing revision or snapshot. Revision (supposedly new) has priority over the snapshot (supposedly existing one). Args: revision (dict): Last revision seen if any (None by default) snapshot (dict): Snapshot to use if any (None by default) Returns: Optional[Snapshot] The newly created snapshot """ if revision: # Priority to the revision snap = Snapshot( branches={ DEFAULT_BRANCH: SnapshotBranch( target=revision.id, target_type=TargetType.REVISION ) } ) elif snapshot: # Fallback to prior snapshot snap = snapshot else: raise ValueError( "generate_and_load_snapshot called with null revision and snapshot!" ) self.log.debug("snapshot: %s" % snap) self.storage.snapshot_add([snap]) return snap def store_data(self): """Add CVS revisions to the archive. Compute SWH changeset IDs from CVS revision information and add new revisions to the archive. """ # XXX At present changeset IDs are recomputed on the fly during every visit. # If we were able to maintain a cached somewhere which can be indexed by a # cvs2gitdump.ChangeSetKey and yields an SWH revision hash we could avoid # doing a lot of redundant work during every visit. for k in self.cvs_changesets: tstr = time.strftime('%c', time.gmtime(k.max_time)) self.log.debug("changeset from %s by %s on branch %s", tstr, k.author, k.branch); # Check out the on-disk state of this revision for f in k.revs: path = file_path(self.cvsroot_path, f.path) wtpath = os.path.join(self.worktree_path, path) self.log.debug("rev %s of file %s" % (f.rev, f.path)); if f.state == 'dead': # remove this file from work tree try: os.remove(wtpath) except FileNotFoundError: pass else: # create, or update, this file in the work tree contents = self.rcs.expand_keyword(f.path, f.rev) try: outfile = open(wtpath, mode='wb') except FileNotFoundError: os.makedirs(os.path.dirname(wtpath)) outfile = open(wtpath, mode='wb') outfile.write(contents) outfile.close() # Compute SWH revision from the on-disk state swh_dir = from_disk.Directory.from_disk(path=os.fsencode(self.worktree_path)) (content, skipped_content, directories) = from_disk.iter_directory(swh_dir) revision = self.build_swh_revision(k, swh_dir.hash, []) - self.log.debug("SWH revision: %s" % revision) + self.log.debug("SWH revision ID: %s" % hashutil.hash_to_hex(revision.id)) self._contents.extend(content) self._skipped_contents.extend(skipped_content) self._directories.extend(directories) self._revisions.append(revision) self._last_revision = revision self.storage.skipped_content_add(self._skipped_contents) self.storage.content_add(self._contents) self.storage.directory_add(self._directories) self.storage.revision_add(self._revisions) self.snapshot = self.generate_and_load_snapshot( revision=self._last_revision, snapshot=self._snapshot ) + self.log.debug("SWH snapshot ID: %s" % hashutil.hash_to_hex(self.snapshot.id)) self.flush() self.loaded_snapshot_id = self.snapshot.id def load_status(self): return { "status": self._load_status, } def visit_status(self): return self._visit_status diff --git a/swh/loader/cvs/tests/test_loader.py b/swh/loader/cvs/tests/test_loader.py index f835f9a..d59de15 100644 --- a/swh/loader/cvs/tests/test_loader.py +++ b/swh/loader/cvs/tests/test_loader.py @@ -1,61 +1,71 @@ # Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest from swh.loader.cvs.loader import CvsLoader from swh.loader.tests import ( assert_last_visit_matches, check_snapshot, get_stats, prepare_repository_from_archive, ) from swh.model.hashutil import hash_to_bytes from swh.model.model import Snapshot, SnapshotBranch, TargetType +RUNBABY_SNAPSHOT = Snapshot( + id=hash_to_bytes("1cff69ab9bd70822d5e3006092f943ccaafdcf57"), + branches={ + b"HEAD": SnapshotBranch( + target=hash_to_bytes("ef511d258fa55035c2bc2a5b05cad233cee1d328"), + target_type=TargetType.REVISION, + ) + }, +) + def test_loader_cvs_not_found_no_mock(swh_storage, tmp_path): """Given an unknown repository, the loader visit ends up in status not_found""" unknown_repo_url = "unknown-repository" loader = CvsLoader(swh_storage, unknown_repo_url, cvsroot_path=tmp_path) assert loader.load() == {"status": "uneventful"} assert_last_visit_matches( swh_storage, unknown_repo_url, status="not_found", type="cvs", ) def test_loader_cvs_visit(swh_storage, datadir, tmp_path): """Eventful visit should yield 1 snapshot""" archive_name = "runbaby" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) loader = CvsLoader(swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", - type="svn", - snapshot=GOURMET_SNAPSHOT.id, + type="cvs", + snapshot=RUNBABY_SNAPSHOT.id, ) stats = get_stats(loader.storage) assert stats == { - "content": 19, - "directory": 17, + "content": 5, + "directory": 2, "origin": 1, "origin_visit": 1, "release": 0, - "revision": 6, + "revision": 1, "skipped_content": 0, "snapshot": 1, } #check_snapshot(GOURMET_SNAPSHOT, loader.storage)