Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/cvs/loader.py
# Copyright (C) 2015-2021 The Software Heritage developers | # Copyright (C) 2015-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU Affero General Public License version 3, or any later version | # License: GNU Affero General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
"""Loader in charge of injecting either new or existing cvs repositories to | """Loader in charge of injecting either new or existing cvs repositories to | ||||
swh-storage. | swh-storage. | ||||
""" | """ | ||||
from datetime import datetime | from datetime import datetime | ||||
import os | import os | ||||
import os.path | import os.path | ||||
import sentry_sdk | |||||
import subprocess | import subprocess | ||||
import tempfile | import tempfile | ||||
import time | import time | ||||
from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Sequence, Tuple, cast | from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Sequence, Tuple, cast | ||||
import sentry_sdk | |||||
from tenacity import retry | from tenacity import retry | ||||
from tenacity.retry import retry_if_exception_type | from tenacity.retry import retry_if_exception_type | ||||
from tenacity.stop import stop_after_attempt | from tenacity.stop import stop_after_attempt | ||||
from urllib3.util import parse_url | from urllib3.util import parse_url | ||||
from swh.loader.core.loader import BaseLoader | from swh.loader.core.loader import BaseLoader | ||||
from swh.loader.core.utils import clean_dangling_folders | from swh.loader.core.utils import clean_dangling_folders | ||||
from swh.loader.cvs.cvs2gitdump.cvs2gitdump import ( | from swh.loader.cvs.cvs2gitdump.cvs2gitdump import ( | ||||
CHANGESET_FUZZ_SEC, | CHANGESET_FUZZ_SEC, | ||||
ChangeSetKey, | ChangeSetKey, | ||||
CvsConv, | CvsConv, | ||||
▲ Show 20 Lines • Show All 107 Lines • ▼ Show 20 Lines | ) -> Tuple[Revision, from_disk.Directory]: | ||||
# Compute SWH revision from the on-disk state | # Compute SWH revision from the on-disk state | ||||
swh_dir = from_disk.Directory.from_disk(path=os.fsencode(self.worktree_path)) | swh_dir = from_disk.Directory.from_disk(path=os.fsencode(self.worktree_path)) | ||||
parents: Tuple[Sha1Git, ...] | parents: Tuple[Sha1Git, ...] | ||||
if self._last_revision: | if self._last_revision: | ||||
parents = (self._last_revision.id,) | parents = (self._last_revision.id,) | ||||
else: | else: | ||||
parents = () | parents = () | ||||
revision = self.build_swh_revision(k, logmsg, swh_dir.hash, parents) | revision = self.build_swh_revision(k, logmsg, swh_dir.hash, parents) | ||||
self.log.info("SWH revision ID: %s", hashutil.hash_to_hex(revision.id)) | self.log.debug("SWH revision ID: %s", hashutil.hash_to_hex(revision.id)) | ||||
self._last_revision = revision | self._last_revision = revision | ||||
return (revision, swh_dir) | return (revision, swh_dir) | ||||
def file_path_is_safe(self, wtpath): | def file_path_is_safe(self, wtpath): | ||||
if "%s..%s" % (os.path.sep, os.path.sep) in wtpath: | if "%s..%s" % (os.path.sep, os.path.sep) in wtpath: | ||||
# Paths with back-references should not appear | # Paths with back-references should not appear | ||||
# in CVS protocol messages or CVS rlog output | # in CVS protocol messages or CVS rlog output | ||||
return False | return False | ||||
Show All 10 Lines | def checkout_file_with_rcsparse( | ||||
self, k: ChangeSetKey, f: FileRevision, rcsfile: rcsparse.rcsfile | self, k: ChangeSetKey, f: FileRevision, rcsfile: rcsparse.rcsfile | ||||
) -> None: | ) -> None: | ||||
assert self.cvsroot_path | assert self.cvsroot_path | ||||
assert self.server_style_cvsroot | assert self.server_style_cvsroot | ||||
path = file_path(self.cvsroot_path, f.path) | path = file_path(self.cvsroot_path, f.path) | ||||
wtpath = os.path.join(self.tempdir_path, path) | wtpath = os.path.join(self.tempdir_path, path) | ||||
if not self.file_path_is_safe(wtpath): | if not self.file_path_is_safe(wtpath): | ||||
raise BadPathException(f"unsafe path found in RCS file: {f.path}") | raise BadPathException(f"unsafe path found in RCS file: {f.path}") | ||||
self.log.info("rev %s state %s file %s", f.rev, f.state, f.path) | self.log.debug("rev %s state %s file %s", f.rev, f.state, f.path) | ||||
if f.state == "dead": | if f.state == "dead": | ||||
# remove this file from work tree | # remove this file from work tree | ||||
try: | try: | ||||
os.remove(wtpath) | os.remove(wtpath) | ||||
except FileNotFoundError: | except FileNotFoundError: | ||||
pass | pass | ||||
else: | else: | ||||
# create, or update, this file in the work tree | # create, or update, this file in the work tree | ||||
Show All 32 Lines | class CvsLoader(BaseLoader): | ||||
def checkout_file_with_cvsclient( | def checkout_file_with_cvsclient( | ||||
self, k: ChangeSetKey, f: FileRevision, cvsclient: CVSClient | self, k: ChangeSetKey, f: FileRevision, cvsclient: CVSClient | ||||
): | ): | ||||
assert self.cvsroot_path | assert self.cvsroot_path | ||||
path = file_path(self.cvsroot_path, f.path) | path = file_path(self.cvsroot_path, f.path) | ||||
wtpath = os.path.join(self.tempdir_path, path) | wtpath = os.path.join(self.tempdir_path, path) | ||||
if not self.file_path_is_safe(wtpath): | if not self.file_path_is_safe(wtpath): | ||||
raise BadPathException(f"unsafe path found in cvs rlog output: {f.path}") | raise BadPathException(f"unsafe path found in cvs rlog output: {f.path}") | ||||
self.log.info("rev %s state %s file %s", f.rev, f.state, f.path) | self.log.debug("rev %s state %s file %s", f.rev, f.state, f.path) | ||||
if f.state == "dead": | if f.state == "dead": | ||||
# remove this file from work tree | # remove this file from work tree | ||||
try: | try: | ||||
os.remove(wtpath) | os.remove(wtpath) | ||||
except FileNotFoundError: | except FileNotFoundError: | ||||
pass | pass | ||||
else: | else: | ||||
dirname = os.path.dirname(wtpath) | dirname = os.path.dirname(wtpath) | ||||
Show All 20 Lines | ]: | ||||
Yields: | Yields: | ||||
tuple (contents, skipped-contents, directories, revision) of dict as a | tuple (contents, skipped-contents, directories, revision) of dict as a | ||||
dictionary with keys, sha1_git, sha1, etc... | dictionary with keys, sha1_git, sha1, etc... | ||||
""" | """ | ||||
for k in cvs_changesets: | for k in cvs_changesets: | ||||
tstr = time.strftime("%c", time.gmtime(k.max_time)) | tstr = time.strftime("%c", time.gmtime(k.max_time)) | ||||
self.log.info( | self.log.debug( | ||||
"changeset from %s by %s on branch %s", tstr, k.author, k.branch | "changeset from %s by %s on branch %s", tstr, k.author, k.branch | ||||
) | ) | ||||
logmsg: Optional[bytes] = b"" | logmsg: Optional[bytes] = b"" | ||||
# Check out all files of this revision and get a log message. | # Check out all files of this revision and get a log message. | ||||
# | # | ||||
# The log message is obtained from the first file in the changeset. | # The log message is obtained from the first file in the changeset. | ||||
# The message will usually be the same for all affected files, and | # The message will usually be the same for all affected files, and | ||||
# the SWH archive will only store one version of the log message. | # the SWH archive will only store one version of the log message. | ||||
Show All 24 Lines | def pre_cleanup(self) -> None: | ||||
""" | """ | ||||
clean_dangling_folders( | clean_dangling_folders( | ||||
self.temp_directory, | self.temp_directory, | ||||
pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, | pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, | ||||
log=self.log, | log=self.log, | ||||
) | ) | ||||
def cleanup(self) -> None: | def cleanup(self) -> None: | ||||
self.log.info("cleanup") | self.log.debug("cleanup") | ||||
def configure_custom_id_keyword(self, cvsconfig): | def configure_custom_id_keyword(self, cvsconfig): | ||||
"""Parse CVSROOT/config and look for a custom keyword definition. | """Parse CVSROOT/config and look for a custom keyword definition. | ||||
There are two different configuration directives in use for this purpose. | There are two different configuration directives in use for this purpose. | ||||
The first variant stems from a patch which was never accepted into | The first variant stems from a patch which was never accepted into | ||||
upstream CVS and uses the tag directive: tag=MyName | upstream CVS and uses the tag directive: tag=MyName | ||||
With this, the "MyName" keyword becomes an alias for the "Id" keyword. | With this, the "MyName" keyword becomes an alias for the "Id" keyword. | ||||
▲ Show 20 Lines • Show All 155 Lines • ▼ Show 20 Lines | def prepare(self) -> None: | ||||
# sorted by telephone number." | # sorted by telephone number." | ||||
# https://corecursive.com/software-that-doesnt-suck-with-jim-blandy/ | # https://corecursive.com/software-that-doesnt-suck-with-jim-blandy/ | ||||
# | # | ||||
# An implicit assumption made here is that self.cvs_changesets will | # An implicit assumption made here is that self.cvs_changesets will | ||||
# fit into memory in its entirety. If it won't fit then the CVS walker | # fit into memory in its entirety. If it won't fit then the CVS walker | ||||
# will need to be modified such that it spools the list of changesets | # will need to be modified such that it spools the list of changesets | ||||
# to disk instead. | # to disk instead. | ||||
cvs = CvsConv(self.cvsroot_path, RcsKeywords(), False, CHANGESET_FUZZ_SEC) | cvs = CvsConv(self.cvsroot_path, RcsKeywords(), False, CHANGESET_FUZZ_SEC) | ||||
self.log.info("Walking CVS module %s", self.cvs_module_name) | self.log.debug("Walking CVS module %s", self.cvs_module_name) | ||||
cvs.walk(self.cvs_module_name) | cvs.walk(self.cvs_module_name) | ||||
cvs_changesets = sorted(cvs.changesets) | cvs_changesets = sorted(cvs.changesets) | ||||
self.log.info( | self.log.debug( | ||||
"CVS changesets found in %s: %d", | "CVS changesets found in %s: %d", | ||||
self.cvs_module_name, | self.cvs_module_name, | ||||
len(cvs_changesets), | len(cvs_changesets), | ||||
) | ) | ||||
self.swh_revision_gen = self.process_cvs_changesets( | self.swh_revision_gen = self.process_cvs_changesets( | ||||
cvs_changesets, use_rcsparse=True | cvs_changesets, use_rcsparse=True | ||||
) | ) | ||||
elif url.scheme == "pserver" or url.scheme == "fake" or url.scheme == "ssh": | elif url.scheme == "pserver" or url.scheme == "fake" or url.scheme == "ssh": | ||||
# remote CVS repository conversion | # remote CVS repository conversion | ||||
if not self.cvsroot_path: | if not self.cvsroot_path: | ||||
self.cvsroot_path = os.path.dirname(url.path) | self.cvsroot_path = os.path.dirname(url.path) | ||||
self.cvsclient = CVSClient(url) | self.cvsclient = CVSClient(url) | ||||
cvsroot_path = os.path.dirname(url.path) | cvsroot_path = os.path.dirname(url.path) | ||||
self.log.info( | self.log.debug( | ||||
"Fetching CVS rlog from %s:%s/%s", | "Fetching CVS rlog from %s:%s/%s", | ||||
url.host, | url.host, | ||||
cvsroot_path, | cvsroot_path, | ||||
self.cvs_module_name, | self.cvs_module_name, | ||||
) | ) | ||||
self.rlog = RlogConv(cvsroot_path, CHANGESET_FUZZ_SEC) | self.rlog = RlogConv(cvsroot_path, CHANGESET_FUZZ_SEC) | ||||
main_rlog_file = self.cvsclient.fetch_rlog() | main_rlog_file = self.cvsclient.fetch_rlog() | ||||
self.rlog.parse_rlog(main_rlog_file) | self.rlog.parse_rlog(main_rlog_file) | ||||
Show All 36 Lines | def prepare(self) -> None: | ||||
main_rlog_file.seek(0) | main_rlog_file.seek(0) | ||||
for line in main_rlog_file.readlines(): | for line in main_rlog_file.readlines(): | ||||
fp.write(line) | fp.write(line) | ||||
main_rlog_file.close() | main_rlog_file.close() | ||||
fp.seek(0) | fp.seek(0) | ||||
self.rlog.parse_rlog(cast(BinaryIO, fp)) | self.rlog.parse_rlog(cast(BinaryIO, fp)) | ||||
self.rlog_file = cast(BinaryIO, fp) | self.rlog_file = cast(BinaryIO, fp) | ||||
cvs_changesets = sorted(self.rlog.changesets) | cvs_changesets = sorted(self.rlog.changesets) | ||||
self.log.info( | self.log.debug( | ||||
"CVS changesets found for %s: %d", | "CVS changesets found for %s: %d", | ||||
self.cvs_module_name, | self.cvs_module_name, | ||||
len(cvs_changesets), | len(cvs_changesets), | ||||
) | ) | ||||
self.swh_revision_gen = self.process_cvs_changesets( | self.swh_revision_gen = self.process_cvs_changesets( | ||||
cvs_changesets, use_rcsparse=False | cvs_changesets, use_rcsparse=False | ||||
) | ) | ||||
else: | else: | ||||
raise NotFound(f"Invalid CVS origin URL '{self.origin.url}'") | raise NotFound(f"Invalid CVS origin URL '{self.origin.url}'") | ||||
def fetch_data(self) -> bool: | def fetch_data(self) -> bool: | ||||
"""Fetch the next CVS revision.""" | """Fetch the next CVS revision.""" | ||||
try: | try: | ||||
data = next(self.swh_revision_gen) | data = next(self.swh_revision_gen) | ||||
except StopIteration: | except StopIteration: | ||||
assert self._last_revision is not None | assert self._last_revision is not None | ||||
self.snapshot = self.generate_and_load_snapshot(self._last_revision) | self.snapshot = self.generate_and_load_snapshot(self._last_revision) | ||||
self.log.info("SWH snapshot ID: %s", hashutil.hash_to_hex(self.snapshot.id)) | self.log.debug( | ||||
"SWH snapshot ID: %s", hashutil.hash_to_hex(self.snapshot.id) | |||||
) | |||||
self.flush() | self.flush() | ||||
self.loaded_snapshot_id = self.snapshot.id | self.loaded_snapshot_id = self.snapshot.id | ||||
return False | return False | ||||
except Exception: | except Exception: | ||||
self.log.exception("Exception in fetch_data:") | self.log.exception("Exception in fetch_data:") | ||||
sentry_sdk.capture_exception() | sentry_sdk.capture_exception() | ||||
self._visit_status = "failed" | self._visit_status = "failed" | ||||
return False # Stopping iteration | return False # Stopping iteration | ||||
▲ Show 20 Lines • Show All 85 Lines • Show Last 20 Lines |