Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/cvs/loader.py
# Copyright (C) 2015-2021 The Software Heritage developers | # Copyright (C) 2015-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU Affero General Public License version 3, or any later version | # License: GNU Affero General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
"""Loader in charge of injecting either new or existing cvs repositories to | """Loader in charge of injecting either new or existing cvs repositories to | ||||
swh-storage. | swh-storage. | ||||
""" | """ | ||||
from datetime import datetime | from datetime import datetime | ||||
import os | import os | ||||
import subprocess | import subprocess | ||||
import tempfile | import tempfile | ||||
import time | import time | ||||
from typing import Iterator, List, Optional, Sequence, Tuple | from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Sequence, Tuple | ||||
from urllib3.util import parse_url | from urllib3.util import parse_url | ||||
from swh.loader.core.loader import BaseLoader | from swh.loader.core.loader import BaseLoader | ||||
from swh.loader.core.utils import clean_dangling_folders | from swh.loader.core.utils import clean_dangling_folders | ||||
from swh.loader.cvs.cvs2gitdump.cvs2gitdump import ( | from swh.loader.cvs.cvs2gitdump.cvs2gitdump import ( | ||||
CHANGESET_FUZZ_SEC, | CHANGESET_FUZZ_SEC, | ||||
ChangeSetKey, | ChangeSetKey, | ||||
CvsConv, | CvsConv, | ||||
RcsKeywords, | RcsKeywords, | ||||
file_path, | file_path, | ||||
) | ) | ||||
import swh.loader.cvs.cvsclient as cvsclient | import swh.loader.cvs.cvsclient as cvsclient | ||||
import swh.loader.cvs.rcsparse as rcsparse | import swh.loader.cvs.rcsparse as rcsparse | ||||
from swh.loader.cvs.rlog import RlogConv | from swh.loader.cvs.rlog import RlogConv | ||||
from swh.loader.exception import NotFound | from swh.loader.exception import NotFound | ||||
from swh.model import from_disk, hashutil | from swh.model import from_disk, hashutil | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Content, | Content, | ||||
Directory, | Directory, | ||||
Origin, | Origin, | ||||
Person, | Person, | ||||
Revision, | Revision, | ||||
RevisionType, | RevisionType, | ||||
Sha1Git, | |||||
SkippedContent, | SkippedContent, | ||||
Snapshot, | Snapshot, | ||||
SnapshotBranch, | SnapshotBranch, | ||||
TargetType, | TargetType, | ||||
TimestampWithTimezone, | TimestampWithTimezone, | ||||
) | ) | ||||
from swh.storage.algos.snapshot import snapshot_get_latest | from swh.storage.algos.snapshot import snapshot_get_latest | ||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||
DEFAULT_BRANCH = b"HEAD" | DEFAULT_BRANCH = b"HEAD" | ||||
TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.cvs." | TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.cvs." | ||||
class CvsLoader(BaseLoader): | class CvsLoader(BaseLoader): | ||||
"""Swh cvs loader. | """Swh cvs loader. | ||||
The repository is local. The loader deals with | The repository is local. The loader deals with | ||||
update on an already previously loaded repository. | update on an already previously loaded repository. | ||||
""" | """ | ||||
visit_type = "cvs" | visit_type = "cvs" | ||||
cvs_module_name: str | |||||
cvsclient: cvsclient.CVSClient | |||||
# remote CVS repository access (history is parsed from CVS rlog): | |||||
rlog_file: BinaryIO | |||||
swh_revision_gen: Iterator[ | |||||
Tuple[List[Content], List[SkippedContent], List[Directory], Revision] | |||||
] | |||||
def __init__( | def __init__( | ||||
self, | self, | ||||
storage: StorageInterface, | storage: StorageInterface, | ||||
url: str, | url: str, | ||||
origin_url: Optional[str] = None, | origin_url: Optional[str] = None, | ||||
visit_date: Optional[datetime] = None, | visit_date: Optional[datetime] = None, | ||||
cvsroot_path: Optional[str] = None, | cvsroot_path: Optional[str] = None, | ||||
temp_directory: str = "/tmp", | temp_directory: str = "/tmp", | ||||
max_content_size: Optional[int] = None, | max_content_size: Optional[int] = None, | ||||
): | ): | ||||
super().__init__( | super().__init__( | ||||
storage=storage, | storage=storage, | ||||
logging_class="swh.loader.cvs.CvsLoader", | logging_class="swh.loader.cvs.CvsLoader", | ||||
max_content_size=max_content_size, | max_content_size=max_content_size, | ||||
) | ) | ||||
self.cvsroot_url = url | self.cvsroot_url = url | ||||
# origin url as unique identifier for origin in swh archive | # origin url as unique identifier for origin in swh archive | ||||
self.origin_url = origin_url if origin_url else self.cvsroot_url | self.origin_url = origin_url if origin_url else self.cvsroot_url | ||||
self.temp_directory = temp_directory | self.temp_directory = temp_directory | ||||
self.done = False | |||||
self.cvs_module_name = None | |||||
# remote CVS repository access (history is parsed from CVS rlog): | |||||
self.cvsclient = None | |||||
self.rlog_file = None | |||||
# internal state used to store swh objects | # internal state used to store swh objects | ||||
self._contents: List[Content] = [] | self._contents: List[Content] = [] | ||||
self._skipped_contents: List[SkippedContent] = [] | self._skipped_contents: List[SkippedContent] = [] | ||||
self._directories: List[Directory] = [] | self._directories: List[Directory] = [] | ||||
self._revisions: List[Revision] = [] | self._revisions: List[Revision] = [] | ||||
self.swh_revision_gen = None | |||||
# internal state, current visit | # internal state, current visit | ||||
self._last_revision = None | self._last_revision: Optional[Revision] = None | ||||
self._visit_status = "full" | self._visit_status = "full" | ||||
self.visit_date = visit_date | self.visit_date = visit_date | ||||
if not cvsroot_path: | |||||
cvsroot_path = tempfile.mkdtemp( | |||||
suffix="-%s" % os.getpid(), | |||||
prefix=TEMPORARY_DIR_PREFIX_PATTERN, | |||||
dir=self.temp_directory, | |||||
) | |||||
self.cvsroot_path = cvsroot_path | self.cvsroot_path = cvsroot_path | ||||
self.snapshot = None | |||||
self.snapshot: Optional[Snapshot] = None | |||||
self.last_snapshot: Optional[Snapshot] = snapshot_get_latest( | self.last_snapshot: Optional[Snapshot] = snapshot_get_latest( | ||||
self.storage, self.origin_url | self.storage, self.origin_url | ||||
) | ) | ||||
def compute_swh_revision(self, k, logmsg): | def compute_swh_revision(self, k, logmsg) -> Tuple[Revision, from_disk.Directory]: | ||||
"""Compute swh hash data per CVS changeset. | """Compute swh hash data per CVS changeset. | ||||
Returns: | Returns: | ||||
tuple (rev, swh_directory) | tuple (rev, swh_directory) | ||||
- rev: current SWH revision computed from checked out work tree | - rev: current SWH revision computed from checked out work tree | ||||
- swh_directory: dictionary of path, swh hash data with type | - swh_directory: dictionary of path, swh hash data with type | ||||
""" | """ | ||||
# Compute SWH revision from the on-disk state | # Compute SWH revision from the on-disk state | ||||
swh_dir = from_disk.Directory.from_disk(path=os.fsencode(self.worktree_path)) | swh_dir = from_disk.Directory.from_disk(path=os.fsencode(self.worktree_path)) | ||||
parents: Tuple[Sha1Git, ...] | |||||
if self._last_revision: | if self._last_revision: | ||||
parents = (self._last_revision.id,) | parents = (self._last_revision.id,) | ||||
else: | else: | ||||
parents = () | parents = () | ||||
revision = self.build_swh_revision(k, logmsg, swh_dir.hash, parents) | revision = self.build_swh_revision(k, logmsg, swh_dir.hash, parents) | ||||
self.log.debug("SWH revision ID: %s", hashutil.hash_to_hex(revision.id)) | self.log.debug("SWH revision ID: %s", hashutil.hash_to_hex(revision.id)) | ||||
self._last_revision = revision | self._last_revision = revision | ||||
return (revision, swh_dir) | return (revision, swh_dir) | ||||
▲ Show 20 Lines • Show All 103 Lines • ▼ Show 20 Lines | ]: | ||||
# TODO: prune empty directories? | # TODO: prune empty directories? | ||||
(revision, swh_dir) = self.compute_swh_revision(k, logmsg) | (revision, swh_dir) = self.compute_swh_revision(k, logmsg) | ||||
(contents, skipped_contents, directories) = from_disk.iter_directory( | (contents, skipped_contents, directories) = from_disk.iter_directory( | ||||
swh_dir | swh_dir | ||||
) | ) | ||||
yield contents, skipped_contents, directories, revision | yield contents, skipped_contents, directories, revision | ||||
def prepare_origin_visit(self): | def prepare_origin_visit(self) -> None: | ||||
self.origin = Origin( | self.origin = Origin( | ||||
url=self.origin_url if self.origin_url else self.cvsroot_url | url=self.origin_url if self.origin_url else self.cvsroot_url | ||||
) | ) | ||||
def pre_cleanup(self): | def pre_cleanup(self) -> None: | ||||
"""Cleanup potential dangling files from prior runs (e.g. OOM killed | """Cleanup potential dangling files from prior runs (e.g. OOM killed | ||||
tasks) | tasks) | ||||
""" | """ | ||||
clean_dangling_folders( | clean_dangling_folders( | ||||
self.temp_directory, | self.temp_directory, | ||||
pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, | pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, | ||||
log=self.log, | log=self.log, | ||||
) | ) | ||||
def cleanup(self): | def cleanup(self) -> None: | ||||
self.log.info("cleanup") | self.log.info("cleanup") | ||||
def fetch_cvs_repo_with_rsync(self, host, path): | def fetch_cvs_repo_with_rsync(self, host: str, path: str) -> None: | ||||
# URL *must* end with a trailing slash in order to get CVSROOT listed | # URL *must* end with a trailing slash in order to get CVSROOT listed | ||||
url = "rsync://%s%s/" % (host, os.path.dirname(path)) | url = "rsync://%s%s/" % (host, os.path.dirname(path)) | ||||
rsync = subprocess.run(["rsync", url], capture_output=True, encoding="ascii") | rsync = subprocess.run(["rsync", url], capture_output=True, encoding="ascii") | ||||
rsync.check_returncode() | rsync.check_returncode() | ||||
have_cvsroot = False | have_cvsroot = False | ||||
have_module = False | have_module = False | ||||
for line in rsync.stdout.split("\n"): | for line in rsync.stdout.split("\n"): | ||||
self.log.debug("rsync server: %s", line) | self.log.debug("rsync server: %s", line) | ||||
if line.endswith(" CVSROOT"): | if line.endswith(" CVSROOT"): | ||||
have_cvsroot = True | have_cvsroot = True | ||||
elif line.endswith(" %s" % self.cvs_module_name): | elif line.endswith(" %s" % self.cvs_module_name): | ||||
have_module = True | have_module = True | ||||
if have_module and have_cvsroot: | if have_module and have_cvsroot: | ||||
break | break | ||||
if not have_module: | if not have_module: | ||||
raise NotFound( | raise NotFound( | ||||
"CVS module %s not found at %s" % (self.cvs_module_name, url) | "CVS module %s not found at %s" % (self.cvs_module_name, url) | ||||
) | ) | ||||
if not have_cvsroot: | if not have_cvsroot: | ||||
raise NotFound("No CVSROOT directory found at %s" % url) | raise NotFound("No CVSROOT directory found at %s" % url) | ||||
rsync = subprocess.run(["rsync", "-a", url, self.cvsroot_path]) | subprocess.run(["rsync", "-a", url, self.cvsroot_path]).check_returncode() | ||||
rsync.check_returncode() | |||||
def prepare(self): | def prepare(self) -> None: | ||||
self._last_revision = None | self._last_revision = None | ||||
self.swh_revision_gen = None | |||||
if not self.cvsroot_path: | |||||
self.cvsroot_path = tempfile.mkdtemp( | |||||
suffix="-%s" % os.getpid(), | |||||
prefix=TEMPORARY_DIR_PREFIX_PATTERN, | |||||
dir=self.temp_directory, | |||||
) | |||||
self.worktree_path = tempfile.mkdtemp( | self.worktree_path = tempfile.mkdtemp( | ||||
suffix="-%s" % os.getpid(), | suffix="-%s" % os.getpid(), | ||||
prefix=TEMPORARY_DIR_PREFIX_PATTERN, | prefix=TEMPORARY_DIR_PREFIX_PATTERN, | ||||
dir=self.temp_directory, | dir=self.temp_directory, | ||||
) | ) | ||||
url = parse_url(self.origin_url) | url = parse_url(self.origin_url) | ||||
self.log.debug( | self.log.debug( | ||||
"prepare; origin_url=%s scheme=%s path=%s", | "prepare; origin_url=%s scheme=%s path=%s", | ||||
▲ Show 20 Lines • Show All 87 Lines • ▼ Show 20 Lines | def prepare(self) -> None: | ||||
"CVS changesets found for %s: %d", | "CVS changesets found for %s: %d", | ||||
self.cvs_module_name, | self.cvs_module_name, | ||||
len(cvs_changesets), | len(cvs_changesets), | ||||
) | ) | ||||
self.swh_revision_gen = self.process_cvs_rlog_changesets(cvs_changesets) | self.swh_revision_gen = self.process_cvs_rlog_changesets(cvs_changesets) | ||||
else: | else: | ||||
raise NotFound("Invalid CVS origin URL '%s'" % self.origin_url) | raise NotFound("Invalid CVS origin URL '%s'" % self.origin_url) | ||||
def fetch_data(self): | def fetch_data(self) -> bool: | ||||
"""Fetch the next CVS revision.""" | """Fetch the next CVS revision.""" | ||||
try: | try: | ||||
data = next(self.swh_revision_gen) | data = next(self.swh_revision_gen) | ||||
except StopIteration: | except StopIteration: | ||||
return False | return False | ||||
except Exception: | except Exception: | ||||
self.log.exception("Exception in fetch_data:") | self.log.exception("Exception in fetch_data:") | ||||
return False # Stopping iteration | return False # Stopping iteration | ||||
▲ Show 20 Lines • Show All 48 Lines • ▼ Show 20 Lines | def generate_and_load_snapshot(self, revision) -> Snapshot: | ||||
target=revision.id, target_type=TargetType.REVISION | target=revision.id, target_type=TargetType.REVISION | ||||
) | ) | ||||
} | } | ||||
) | ) | ||||
self.log.debug("snapshot: %s", snap) | self.log.debug("snapshot: %s", snap) | ||||
self.storage.snapshot_add([snap]) | self.storage.snapshot_add([snap]) | ||||
return snap | return snap | ||||
def store_data(self): | def store_data(self) -> None: | ||||
"Add our current CVS changeset to the archive." | "Add our current CVS changeset to the archive." | ||||
self.storage.skipped_content_add(self._skipped_contents) | self.storage.skipped_content_add(self._skipped_contents) | ||||
self.storage.content_add(self._contents) | self.storage.content_add(self._contents) | ||||
self.storage.directory_add(self._directories) | self.storage.directory_add(self._directories) | ||||
self.storage.revision_add(self._revisions) | self.storage.revision_add(self._revisions) | ||||
self.snapshot = self.generate_and_load_snapshot(self._last_revision) | self.snapshot = self.generate_and_load_snapshot(self._last_revision) | ||||
self.log.debug("SWH snapshot ID: %s", hashutil.hash_to_hex(self.snapshot.id)) | self.log.debug("SWH snapshot ID: %s", hashutil.hash_to_hex(self.snapshot.id)) | ||||
self.flush() | self.flush() | ||||
self.loaded_snapshot_id = self.snapshot.id | self.loaded_snapshot_id = self.snapshot.id | ||||
self._skipped_contents = [] | self._skipped_contents = [] | ||||
self._contents = [] | self._contents = [] | ||||
self._directories = [] | self._directories = [] | ||||
self._revisions = [] | self._revisions = [] | ||||
def load_status(self): | def load_status(self) -> Dict[str, Any]: | ||||
assert self.snapshot is not None | assert self.snapshot is not None | ||||
if self.last_snapshot == self.snapshot: | if self.last_snapshot == self.snapshot: | ||||
load_status = "uneventful" | load_status = "uneventful" | ||||
else: | else: | ||||
load_status = "eventful" | load_status = "eventful" | ||||
return { | return { | ||||
"status": load_status, | "status": load_status, | ||||
} | } | ||||
def visit_status(self): | def visit_status(self): | ||||
return self._visit_status | return self._visit_status |