diff --git a/swh/loader/cvs/loader.py b/swh/loader/cvs/loader.py index bcb9173..ab25a71 100644 --- a/swh/loader/cvs/loader.py +++ b/swh/loader/cvs/loader.py @@ -1,347 +1,388 @@ # Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Loader in charge of injecting either new or existing cvs repositories to swh-storage. """ from datetime import datetime from mmap import ACCESS_WRITE, mmap import os import pty import re import shutil import subprocess import tempfile import time from typing import Dict, Iterator, List, Optional, Sequence, Tuple from urllib3.util import parse_url from swh.loader.core.loader import BaseLoader from swh.loader.core.utils import clean_dangling_folders from swh.loader.exception import NotFound import swh.loader.cvs.rcsparse as rcsparse from swh.loader.cvs.cvs2gitdump.cvs2gitdump import CvsConv, RcsKeywords, CHANGESET_FUZZ_SEC, file_path, ChangeSetKey from swh.model import from_disk, hashutil from swh.model.model import Person, Revision, RevisionType, TimestampWithTimezone from swh.model.model import ( Content, Directory, Origin, Revision, SkippedContent, Snapshot, SnapshotBranch, TargetType, ) from swh.storage.algos.snapshot import snapshot_get_latest from swh.storage.interface import StorageInterface DEFAULT_BRANCH = b"HEAD" TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.cvs." class CvsLoader(BaseLoader): """Swh cvs loader. The repository is local. The loader deals with update on an already previously loaded repository. """ visit_type = "cvs" def __init__( self, storage: StorageInterface, url: str, origin_url: Optional[str] = None, visit_date: Optional[datetime] = None, cvsroot_path: Optional[str] = None, swh_revision: Optional[str] = None, start_from_scratch: bool = False, temp_directory: str = "/tmp", debug: bool = False, check_revision: int = 0, max_content_size: Optional[int] = None, ): super().__init__( storage=storage, logging_class="swh.loader.cvs.CvsLoader", max_content_size=max_content_size, ) self.cvsroot_url = url # origin url as unique identifier for origin in swh archive self.origin_url = origin_url if origin_url else self.cvsroot_url self.debug = debug self.temp_directory = temp_directory self.done = False self.cvs_module_name = None self.cvs_module_path = None self.cvs_changesets = None self.rcs = RcsKeywords() # Revision check is configurable self.check_revision = check_revision # internal state used to store swh objects self._contents: List[Content] = [] self._skipped_contents: List[SkippedContent] = [] self._directories: List[Directory] = [] self._revisions: List[Revision] = [] self._snapshot: Optional[Snapshot] = None + self.swh_revision_gen = None # internal state, current visit self._last_revision = None self._visit_status = "full" self._load_status = "uneventful" self.visit_date = visit_date self.cvsroot_path = cvsroot_path self.start_from_scratch = start_from_scratch self.snapshot = None # state from previous visit self.latest_snapshot = None self.latest_revision = None + def swh_hash_data_per_cvs_changeset(self): + """Compute swh hash data per CVS changeset. + + Yields: + tuple (rev, swh_directory) + - rev: current SWH revision computed from checked out work tree + - swh_directory: dictionary of path, swh hash data with type + + """ + # XXX At present changeset IDs are recomputed on the fly during every visit. + # If we were able to maintain a cached somewhere which can be indexed by a + # cvs2gitdump.ChangeSetKey and yields an SWH revision hash we could avoid + # doing a lot of redundant work during every visit. + for k in self.cvs_changesets: + tstr = time.strftime('%c', time.gmtime(k.max_time)) + self.log.debug("changeset from %s by %s on branch %s", tstr, k.author, k.branch); + # Check out the on-disk state of this revision + for f in k.revs: + path = file_path(self.cvsroot_path, f.path) + wtpath = os.path.join(self.worktree_path, path) + self.log.debug("rev %s of file %s" % (f.rev, f.path)); + if f.state == 'dead': + # remove this file from work tree + try: + os.remove(wtpath) + except FileNotFoundError: + pass + else: + # create, or update, this file in the work tree + contents = self.rcs.expand_keyword(f.path, f.rev) + try: + outfile = open(wtpath, mode='wb') + except FileNotFoundError: + os.makedirs(os.path.dirname(wtpath)) + outfile = open(wtpath, mode='wb') + outfile.write(contents) + outfile.close() + + # Compute SWH revision from the on-disk state + swh_dir = from_disk.Directory.from_disk(path=os.fsencode(self.worktree_path)) + revision = self.build_swh_revision(k, swh_dir.hash, []) + self.log.debug("SWH revision ID: %s" % hashutil.hash_to_hex(revision.id)) + self._last_revision = revision + if self._load_status == "uneventful": + # We have an eventful load if this revision is not already present in the archive + if not self.storage.revision_get([revision.id])[0]: + self._load_status = "eventful" + + yield revision, swh_dir + + + def process_cvs_changesets(self) -> Iterator[ + Tuple[List[Content], List[SkippedContent], List[Directory], Revision] + ]: + """Process CVS revisions. + + At each CVS revision, check out contents and compute swh hashes. + + Yields: + tuple (contents, skipped-contents, directories, revision) of dict as a + dictionary with keys, sha1_git, sha1, etc... + + """ + for swh_revision, swh_dir in self.swh_hash_data_per_cvs_changeset(): + # Send the associated contents/directories + (_contents, _skipped_contents, _directories) = from_disk.iter_directory(swh_dir) + yield _contents, _skipped_contents, _directories, swh_revision + + def prepare_origin_visit(self): self.origin = Origin(url=self.origin_url if self.origin_url else self.cvsroot_url) def cleanup(self): self.log.info("cleanup") def fetch_cvs_repo_with_rsync(self, host, path): # URL *must* end with a trailing slash in order to get CVSROOT listed url = 'rsync://%s%s/' % (host, os.path.dirname(path)) rsync = subprocess.run(['rsync', url], capture_output=True, encoding='ascii') rsync.check_returncode() have_cvsroot = False have_module = False for line in rsync.stdout.split('\n'): self.log.debug("rsync server: %s" % line) if line.endswith(' CVSROOT'): have_cvsroot = True elif line.endswith(' %s' % self.cvs_module_name): have_module = True if have_module and have_cvsroot: break if not have_module: raise NotFound("CVS module %s not found at %s" \ % (self.cvs_module_name, host, url)) if not have_cvsroot: raise NotFound("No CVSROOT directory found at %s" % url) rsync = subprocess.run(['rsync', '-a', url, self.cvsroot_path]) rsync.check_returncode() def prepare(self): self._last_revision = None self._load_status = "uneventful" + self.swh_revision_gen = None if not self.cvsroot_path: self.cvsroot_path = tempfile.mkdtemp( suffix="-%s" % os.getpid(), prefix=TEMPORARY_DIR_PREFIX_PATTERN, dir=self.temp_directory, ) self.worktree_path = tempfile.mkdtemp( suffix="-%s" % os.getpid(), prefix=TEMPORARY_DIR_PREFIX_PATTERN, dir=self.temp_directory, ) url = parse_url(self.origin_url) self.log.debug("prepare; origin_url=%s scheme=%s path=%s" % (self.origin_url, url.scheme, url.path)) if not url.path: raise NotFound("Invalid CVS origin URL '%s'" % self.origin_url) self.cvs_module_name = os.path.basename(url.path) os.mkdir(os.path.join(self.worktree_path, self.cvs_module_name)); self.cvs_module_path = os.path.join(self.cvsroot_path, self.cvs_module_name) if url.scheme == 'file': if not os.path.exists(url.path): raise NotFound elif url.scheme == 'rsync': self.fetch_cvs_repo_with_rsync(url.host, url.path) else: raise NotFound("Invalid CVS origin URL '%s'" % self.origin_url) have_rcsfile = False have_cvsroot = False for root, dirs, files in os.walk(self.cvsroot_path): if 'CVSROOT' in dirs: have_cvsroot = True dirs.remove('CVSROOT') continue; for f in files: filepath = os.path.join(root, f) if f[-2:] == ',v': try: rcsfile = rcsparse.rcsfile(filepath) except(Exception): raise else: self.log.debug("Looks like we have data to convert; " "found a valid RCS file at %s" % filepath) have_rcsfile = True break if have_rcsfile: break; if not have_rcsfile: raise NotFound("Directory %s does not contain any valid RCS files %s" % self.cvsroot_path) if not have_cvsroot: self.log.warn("The CVS repository at '%s' lacks a CVSROOT directory; " "we might be ingesting an incomplete copy of the repository" % self.cvsroot_path) - def fetch_data(self): - """Fetch CVS revision information. - - Unfortunately, there is no way to convert CVS history in an iterative fashion - because the data is not indexed by any kind of changeset ID. We need to walk - the history of each and every RCS file in the repository during every visit, - even if no new changes will be added to the SWH archive afterwards. - "CVS’s repository is the software equivalent of a telephone book sorted by telephone number." - https://corecursive.com/software-that-doesnt-suck-with-jim-blandy/ - - An implicit assumption made here is that self.cvs_changesets will fit into - memory in its entirety. If it won't fit then the CVS walker will need to - be modified such that it spools the list of changesets to disk instead. - """ + # Unfortunately, there is no way to convert CVS history in an iterative fashion + # because the data is not indexed by any kind of changeset ID. We need to walk + # the history of each and every RCS file in the repository during every visit, + # even if no new changes will be added to the SWH archive afterwards. + # "CVS’s repository is the software equivalent of a telephone book sorted by telephone number." + # https://corecursive.com/software-that-doesnt-suck-with-jim-blandy/ + # + # An implicit assumption made here is that self.cvs_changesets will fit into + # memory in its entirety. If it won't fit then the CVS walker will need to + # be modified such that it spools the list of changesets to disk instead. cvs = CvsConv(self.cvsroot_path, self.rcs, False, CHANGESET_FUZZ_SEC) self.log.debug("Walking CVS module %s", self.cvs_module_name) cvs.walk(self.cvs_module_name) self.cvs_changesets = sorted(cvs.changesets) self.log.info('CVS changesets found in %s: %d' % (self.cvs_module_name, len(self.cvs_changesets))) + # SWH revisions are generated and stored iteratively to avoid high memory consumption + self.swh_revision_gen = self.process_cvs_changesets() + + def fetch_data(self): + """Fetch the next CVS revision.""" + try: + data = next(self.swh_revision_gen) + except StopIteration: + return False + except Exception as e: + self.log.exception(e) + return False # Stopping iteration + self._contents, self._skipped_contents, self._directories, rev = data + self._revisions = [rev] + return True def build_swh_revision(self, k: ChangeSetKey, dir_id: bytes, parents: Sequence[bytes] ) -> Revision: """Given a CVS revision, build a swh revision. Args: commit: the commit data: revision id, date, author, and message dir_id: the tree's hash identifier parents: the revision's parents identifier Returns: The swh revision dictionary. """ author = Person.from_fullname(k.author.encode('UTF-8')) date = TimestampWithTimezone.from_datetime(k.max_time) # XXX parsing the rcsfile twice, once in expand_keyword(), and again here rcs = rcsparse.rcsfile(k.revs[0].path) msg = rcs.getlog(k.revs[0].rev) return Revision( type=RevisionType.CVS, date=date, committer_date=date, directory=dir_id, message=msg, author=author, committer=author, synthetic=True, extra_headers=[], parents=tuple(parents)) def generate_and_load_snapshot( self, revision: Optional[Revision] = None, snapshot: Optional[Snapshot] = None ) -> Snapshot: """Create the snapshot either from existing revision or snapshot. Revision (supposedly new) has priority over the snapshot (supposedly existing one). Args: revision (dict): Last revision seen if any (None by default) snapshot (dict): Snapshot to use if any (None by default) Returns: Optional[Snapshot] The newly created snapshot """ if revision: # Priority to the revision snap = Snapshot( branches={ DEFAULT_BRANCH: SnapshotBranch( target=revision.id, target_type=TargetType.REVISION ) } ) elif snapshot: # Fallback to prior snapshot snap = snapshot else: raise ValueError( "generate_and_load_snapshot called with null revision and snapshot!" ) self.log.debug("snapshot: %s" % snap) self.storage.snapshot_add([snap]) return snap def store_data(self): - """Add CVS revisions to the archive. - - Compute SWH changeset IDs from CVS revision information and add new - revisions to the archive. - """ - # XXX At present changeset IDs are recomputed on the fly during every visit. - # If we were able to maintain a cached somewhere which can be indexed by a - # cvs2gitdump.ChangeSetKey and yields an SWH revision hash we could avoid - # doing a lot of redundant work during every visit. - for k in self.cvs_changesets: - tstr = time.strftime('%c', time.gmtime(k.max_time)) - self.log.debug("changeset from %s by %s on branch %s", tstr, k.author, k.branch); - # Check out the on-disk state of this revision - for f in k.revs: - path = file_path(self.cvsroot_path, f.path) - wtpath = os.path.join(self.worktree_path, path) - self.log.debug("rev %s of file %s" % (f.rev, f.path)); - if f.state == 'dead': - # remove this file from work tree - try: - os.remove(wtpath) - except FileNotFoundError: - pass - else: - # create, or update, this file in the work tree - contents = self.rcs.expand_keyword(f.path, f.rev) - try: - outfile = open(wtpath, mode='wb') - except FileNotFoundError: - os.makedirs(os.path.dirname(wtpath)) - outfile = open(wtpath, mode='wb') - outfile.write(contents) - outfile.close() - - # Compute SWH revision from the on-disk state - swh_dir = from_disk.Directory.from_disk(path=os.fsencode(self.worktree_path)) - (content, skipped_content, directories) = from_disk.iter_directory(swh_dir) - revision = self.build_swh_revision(k, swh_dir.hash, []) - self.log.debug("SWH revision ID: %s" % hashutil.hash_to_hex(revision.id)) - self._last_revision = revision - if self._load_status = "uneventful": - # We have an eventful load if this revision is not already present in the archive - if not self.storage.revision_get([revision.id])[0]: - self._load_status = "eventful" - if self._load_status == "eventful": - self._contents.extend(content) - self._skipped_contents.extend(skipped_content) - self._directories.extend(directories) - self._revisions.append(revision) - + "Add our current CVS changeset to the archive." self.storage.skipped_content_add(self._skipped_contents) self.storage.content_add(self._contents) self.storage.directory_add(self._directories) self.storage.revision_add(self._revisions) self.snapshot = self.generate_and_load_snapshot( revision=self._last_revision, snapshot=self._snapshot ) self.log.debug("SWH snapshot ID: %s" % hashutil.hash_to_hex(self.snapshot.id)) self.flush() self.loaded_snapshot_id = self.snapshot.id + del self._skipped_contents + del self._contents + del self._directories + del self._revisions + self._skipped_contents = [] + self._contents = [] + self._directories = [] + self._revisions = [] def load_status(self): return { "status": self._load_status, } def visit_status(self): return self._visit_status