Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/cvs/loader.py
Show All 16 Lines | |||||
from urllib3.util import parse_url | from urllib3.util import parse_url | ||||
from swh.loader.core.loader import BaseLoader | from swh.loader.core.loader import BaseLoader | ||||
from swh.loader.core.utils import clean_dangling_folders | from swh.loader.core.utils import clean_dangling_folders | ||||
from swh.loader.cvs.cvs2gitdump.cvs2gitdump import ( | from swh.loader.cvs.cvs2gitdump.cvs2gitdump import ( | ||||
CHANGESET_FUZZ_SEC, | CHANGESET_FUZZ_SEC, | ||||
ChangeSetKey, | ChangeSetKey, | ||||
CvsConv, | CvsConv, | ||||
FileRevision, | |||||
RcsKeywords, | RcsKeywords, | ||||
file_path, | file_path, | ||||
) | ) | ||||
import swh.loader.cvs.cvsclient as cvsclient | from swh.loader.cvs.cvsclient import CVSClient | ||||
import swh.loader.cvs.rcsparse as rcsparse | import swh.loader.cvs.rcsparse as rcsparse | ||||
from swh.loader.cvs.rlog import RlogConv | from swh.loader.cvs.rlog import RlogConv | ||||
from swh.loader.exception import NotFound | from swh.loader.exception import NotFound | ||||
from swh.model import from_disk, hashutil | from swh.model import from_disk, hashutil | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Content, | Content, | ||||
Directory, | Directory, | ||||
Origin, | Origin, | ||||
Show All 9 Lines | |||||
) | ) | ||||
from swh.storage.algos.snapshot import snapshot_get_latest | from swh.storage.algos.snapshot import snapshot_get_latest | ||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||
DEFAULT_BRANCH = b"HEAD" | DEFAULT_BRANCH = b"HEAD" | ||||
TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.cvs." | TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.cvs." | ||||
stsp: Does this class serve some purpose or could it be removed? | |||||
Done Inline ActionsOops, I used it to debug and forgot to remove it vlorentz: Oops, I used it to debug and forgot to remove it | |||||
class Foo: | |||||
pass | |||||
class CvsLoader(BaseLoader): | class CvsLoader(BaseLoader): | ||||
"""Swh cvs loader. | """Swh cvs loader. | ||||
The repository is local. The loader deals with | The repository is local. The loader deals with | ||||
update on an already previously loaded repository. | update on an already previously loaded repository. | ||||
""" | """ | ||||
visit_type = "cvs" | visit_type = "cvs" | ||||
cvs_module_name: str | cvs_module_name: str | ||||
cvsclient: cvsclient.CVSClient | cvsclient: CVSClient | ||||
# remote CVS repository access (history is parsed from CVS rlog): | # remote CVS repository access (history is parsed from CVS rlog): | ||||
rlog_file: BinaryIO | rlog_file: BinaryIO | ||||
swh_revision_gen: Iterator[ | swh_revision_gen: Iterator[ | ||||
Tuple[List[Content], List[SkippedContent], List[Directory], Revision] | Tuple[List[Content], List[SkippedContent], List[Directory], Revision] | ||||
] | ] | ||||
Show All 35 Lines | ): | ||||
) | ) | ||||
self.cvsroot_path = cvsroot_path | self.cvsroot_path = cvsroot_path | ||||
self.snapshot: Optional[Snapshot] = None | self.snapshot: Optional[Snapshot] = None | ||||
self.last_snapshot: Optional[Snapshot] = snapshot_get_latest( | self.last_snapshot: Optional[Snapshot] = snapshot_get_latest( | ||||
self.storage, self.origin_url | self.storage, self.origin_url | ||||
) | ) | ||||
def compute_swh_revision(self, k, logmsg) -> Tuple[Revision, from_disk.Directory]: | def compute_swh_revision( | ||||
self, k: ChangeSetKey, logmsg: Optional[bytes] | |||||
) -> Tuple[Revision, from_disk.Directory]: | |||||
"""Compute swh hash data per CVS changeset. | """Compute swh hash data per CVS changeset. | ||||
Returns: | Returns: | ||||
tuple (rev, swh_directory) | tuple (rev, swh_directory) | ||||
- rev: current SWH revision computed from checked out work tree | - rev: current SWH revision computed from checked out work tree | ||||
- swh_directory: dictionary of path, swh hash data with type | - swh_directory: dictionary of path, swh hash data with type | ||||
""" | """ | ||||
# Compute SWH revision from the on-disk state | # Compute SWH revision from the on-disk state | ||||
swh_dir = from_disk.Directory.from_disk(path=os.fsencode(self.worktree_path)) | swh_dir = from_disk.Directory.from_disk(path=os.fsencode(self.worktree_path)) | ||||
parents: Tuple[Sha1Git, ...] | parents: Tuple[Sha1Git, ...] | ||||
if self._last_revision: | if self._last_revision: | ||||
parents = (self._last_revision.id,) | parents = (self._last_revision.id,) | ||||
else: | else: | ||||
parents = () | parents = () | ||||
revision = self.build_swh_revision(k, logmsg, swh_dir.hash, parents) | revision = self.build_swh_revision(k, logmsg, swh_dir.hash, parents) | ||||
self.log.debug("SWH revision ID: %s", hashutil.hash_to_hex(revision.id)) | self.log.debug("SWH revision ID: %s", hashutil.hash_to_hex(revision.id)) | ||||
self._last_revision = revision | self._last_revision = revision | ||||
return (revision, swh_dir) | return (revision, swh_dir) | ||||
def checkout_file_with_rcsparse(self, k, f, rcsfile): | def checkout_file_with_rcsparse( | ||||
self, k: ChangeSetKey, f: FileRevision, rcsfile: rcsparse.rcsfile | |||||
) -> None: | |||||
path = file_path(self.cvsroot_path, f.path) | path = file_path(self.cvsroot_path, f.path) | ||||
wtpath = os.path.join(self.worktree_path, path) | wtpath = os.path.join(self.worktree_path, path) | ||||
self.log.info("rev %s of file %s" % (f.rev, f.path)) | self.log.info("rev %s of file %s" % (f.rev, f.path)) | ||||
if f.state == "dead": | if f.state == "dead": | ||||
# remove this file from work tree | # remove this file from work tree | ||||
try: | try: | ||||
os.remove(wtpath) | os.remove(wtpath) | ||||
except FileNotFoundError: | except FileNotFoundError: | ||||
pass | pass | ||||
else: | else: | ||||
# create, or update, this file in the work tree | # create, or update, this file in the work tree | ||||
if not rcsfile: | if not rcsfile: | ||||
rcsfile = rcsparse.rcsfile(f.path) | rcsfile = rcsparse.rcsfile(f.path) | ||||
rcs = RcsKeywords() | rcs = RcsKeywords() | ||||
contents = rcs.expand_keyword(f.path, rcsfile, f.rev) | contents = rcs.expand_keyword(f.path, rcsfile, f.rev) | ||||
os.makedirs(os.path.dirname(wtpath), exist_ok=True) | os.makedirs(os.path.dirname(wtpath), exist_ok=True) | ||||
outfile = open(wtpath, mode="wb") | outfile = open(wtpath, mode="wb") | ||||
outfile.write(contents) | outfile.write(contents) | ||||
outfile.close() | outfile.close() | ||||
def checkout_file_with_cvsclient(self, k, f, cvsclient): | def checkout_file_with_cvsclient( | ||||
self, k: ChangeSetKey, f: FileRevision, cvsclient: CVSClient | |||||
): | |||||
path = file_path(self.cvsroot_path, f.path) | path = file_path(self.cvsroot_path, f.path) | ||||
wtpath = os.path.join(self.worktree_path, path) | wtpath = os.path.join(self.worktree_path, path) | ||||
self.log.info("rev %s of file %s" % (f.rev, f.path)) | self.log.info("rev %s of file %s" % (f.rev, f.path)) | ||||
if f.state == "dead": | if f.state == "dead": | ||||
# remove this file from work tree | # remove this file from work tree | ||||
try: | try: | ||||
os.remove(wtpath) | os.remove(wtpath) | ||||
except FileNotFoundError: | except FileNotFoundError: | ||||
pass | pass | ||||
else: | else: | ||||
dirname = os.path.dirname(wtpath) | dirname = os.path.dirname(wtpath) | ||||
os.makedirs(dirname, exist_ok=True) | os.makedirs(dirname, exist_ok=True) | ||||
self.log.debug("checkout to %s\n" % wtpath) | self.log.debug("checkout to %s\n" % wtpath) | ||||
fp = cvsclient.checkout(f.path, f.rev, dirname) | fp = cvsclient.checkout(f.path, f.rev, dirname) | ||||
os.rename(fp.name, wtpath) | os.rename(fp.name, wtpath) | ||||
try: | try: | ||||
fp.close() | fp.close() | ||||
except FileNotFoundError: | except FileNotFoundError: | ||||
# Well, we have just renamed the file... | # Well, we have just renamed the file... | ||||
pass | pass | ||||
def process_cvs_changesets( | def process_cvs_changesets( | ||||
self, cvs_changesets, use_rcsparse, | self, | ||||
cvs_changesets: List[ChangeSetKey], | |||||
use_rcsparse: bool, | |||||
) -> Iterator[ | ) -> Iterator[ | ||||
Tuple[List[Content], List[SkippedContent], List[Directory], Revision] | Tuple[List[Content], List[SkippedContent], List[Directory], Revision] | ||||
]: | ]: | ||||
"""Process CVS revisions. | """Process CVS revisions. | ||||
At each CVS revision, check out contents and compute swh hashes. | At each CVS revision, check out contents and compute swh hashes. | ||||
Yields: | Yields: | ||||
tuple (contents, skipped-contents, directories, revision) of dict as a | tuple (contents, skipped-contents, directories, revision) of dict as a | ||||
dictionary with keys, sha1_git, sha1, etc... | dictionary with keys, sha1_git, sha1, etc... | ||||
""" | """ | ||||
for k in cvs_changesets: | for k in cvs_changesets: | ||||
tstr = time.strftime("%c", time.gmtime(k.max_time)) | tstr = time.strftime("%c", time.gmtime(k.max_time)) | ||||
self.log.info( | self.log.info( | ||||
"changeset from %s by %s on branch %s", tstr, k.author, k.branch | "changeset from %s by %s on branch %s", tstr, k.author, k.branch | ||||
) | ) | ||||
logmsg = "" | logmsg: Optional[bytes] = b"" | ||||
# Check out all files of this revision and get a log message. | # Check out all files of this revision and get a log message. | ||||
# | # | ||||
# The log message is obtained from the first file in the changeset. | # The log message is obtained from the first file in the changeset. | ||||
# The message will usually be the same for all affected files, and | # The message will usually be the same for all affected files, and | ||||
# the SWH archive will only store one version of the log message. | # the SWH archive will only store one version of the log message. | ||||
for f in k.revs: | for f in k.revs: | ||||
rcsfile = None | rcsfile = None | ||||
if use_rcsparse: | if use_rcsparse: | ||||
Show All 16 Lines | class CvsLoader(BaseLoader): | ||||
def prepare_origin_visit(self) -> None: | def prepare_origin_visit(self) -> None: | ||||
self.origin = Origin( | self.origin = Origin( | ||||
url=self.origin_url if self.origin_url else self.cvsroot_url | url=self.origin_url if self.origin_url else self.cvsroot_url | ||||
) | ) | ||||
def pre_cleanup(self) -> None: | def pre_cleanup(self) -> None: | ||||
"""Cleanup potential dangling files from prior runs (e.g. OOM killed | """Cleanup potential dangling files from prior runs (e.g. OOM killed | ||||
tasks) | tasks) | ||||
""" | """ | ||||
clean_dangling_folders( | clean_dangling_folders( | ||||
self.temp_directory, | self.temp_directory, | ||||
pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, | pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, | ||||
log=self.log, | log=self.log, | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 105 Lines • ▼ Show 20 Lines | def prepare(self) -> None: | ||||
self.cvs_module_name, | self.cvs_module_name, | ||||
len(cvs_changesets), | len(cvs_changesets), | ||||
) | ) | ||||
self.swh_revision_gen = self.process_cvs_changesets( | self.swh_revision_gen = self.process_cvs_changesets( | ||||
cvs_changesets, use_rcsparse=True | cvs_changesets, use_rcsparse=True | ||||
) | ) | ||||
elif url.scheme == "pserver" or url.scheme == "fake" or url.scheme == "ssh": | elif url.scheme == "pserver" or url.scheme == "fake" or url.scheme == "ssh": | ||||
# remote CVS repository conversion | # remote CVS repository conversion | ||||
self.cvsclient = cvsclient.CVSClient(url) | self.cvsclient = CVSClient(url) | ||||
cvsroot_path = os.path.dirname(url.path) | cvsroot_path = os.path.dirname(url.path) | ||||
self.log.info( | self.log.info( | ||||
"Fetching CVS rlog from %s:%s/%s", | "Fetching CVS rlog from %s:%s/%s", | ||||
url.host, | url.host, | ||||
cvsroot_path, | cvsroot_path, | ||||
self.cvs_module_name, | self.cvs_module_name, | ||||
) | ) | ||||
self.rlog = RlogConv(cvsroot_path, CHANGESET_FUZZ_SEC) | self.rlog = RlogConv(cvsroot_path, CHANGESET_FUZZ_SEC) | ||||
Show All 20 Lines | def fetch_data(self) -> bool: | ||||
except Exception: | except Exception: | ||||
self.log.exception("Exception in fetch_data:") | self.log.exception("Exception in fetch_data:") | ||||
return False # Stopping iteration | return False # Stopping iteration | ||||
self._contents, self._skipped_contents, self._directories, rev = data | self._contents, self._skipped_contents, self._directories, rev = data | ||||
self._revisions = [rev] | self._revisions = [rev] | ||||
return True | return True | ||||
def build_swh_revision( | def build_swh_revision( | ||||
self, k: ChangeSetKey, logmsg: bytes, dir_id: bytes, parents: Sequence[bytes] | self, | ||||
k: ChangeSetKey, | |||||
logmsg: Optional[bytes], | |||||
dir_id: bytes, | |||||
parents: Sequence[bytes], | |||||
) -> Revision: | ) -> Revision: | ||||
"""Given a CVS revision, build a swh revision. | """Given a CVS revision, build a swh revision. | ||||
Args: | Args: | ||||
k: changeset data | k: changeset data | ||||
logmsg: the changeset's log message | logmsg: the changeset's log message | ||||
dir_id: the tree's hash identifier | dir_id: the tree's hash identifier | ||||
parents: the revision's parents identifier | parents: the revision's parents identifier | ||||
Returns: | Returns: | ||||
The swh revision dictionary. | The swh revision dictionary. | ||||
""" | """ | ||||
author = Person.from_fullname(k.author.encode("UTF-8")) | author = Person.from_fullname(k.author.encode("UTF-8")) | ||||
date = TimestampWithTimezone.from_datetime(k.max_time) | date = TimestampWithTimezone.from_dict(k.max_time) | ||||
return Revision( | return Revision( | ||||
type=RevisionType.CVS, | type=RevisionType.CVS, | ||||
date=date, | date=date, | ||||
committer_date=date, | committer_date=date, | ||||
directory=dir_id, | directory=dir_id, | ||||
message=logmsg, | message=logmsg, | ||||
author=author, | author=author, | ||||
committer=author, | committer=author, | ||||
synthetic=True, | synthetic=True, | ||||
extra_headers=[], | extra_headers=[], | ||||
parents=tuple(parents), | parents=tuple(parents), | ||||
) | ) | ||||
def generate_and_load_snapshot(self, revision) -> Snapshot: | def generate_and_load_snapshot(self, revision: Revision) -> Snapshot: | ||||
"""Create the snapshot either from existing revision. | """Create the snapshot either from existing revision. | ||||
Args: | Args: | ||||
revision (dict): Last revision seen if any (None by default) | revision (dict): Last revision seen if any (None by default) | ||||
Returns: | Returns: | ||||
Optional[Snapshot] The newly created snapshot | Optional[Snapshot] The newly created snapshot | ||||
Show All 10 Lines | def generate_and_load_snapshot(self, revision: Revision) -> Snapshot: | ||||
return snap | return snap | ||||
def store_data(self) -> None: | def store_data(self) -> None: | ||||
"Add our current CVS changeset to the archive." | "Add our current CVS changeset to the archive." | ||||
self.storage.skipped_content_add(self._skipped_contents) | self.storage.skipped_content_add(self._skipped_contents) | ||||
self.storage.content_add(self._contents) | self.storage.content_add(self._contents) | ||||
self.storage.directory_add(self._directories) | self.storage.directory_add(self._directories) | ||||
self.storage.revision_add(self._revisions) | self.storage.revision_add(self._revisions) | ||||
assert self._last_revision is not None | |||||
self.snapshot = self.generate_and_load_snapshot(self._last_revision) | self.snapshot = self.generate_and_load_snapshot(self._last_revision) | ||||
self.log.debug("SWH snapshot ID: %s", hashutil.hash_to_hex(self.snapshot.id)) | self.log.debug("SWH snapshot ID: %s", hashutil.hash_to_hex(self.snapshot.id)) | ||||
self.flush() | self.flush() | ||||
self.loaded_snapshot_id = self.snapshot.id | self.loaded_snapshot_id = self.snapshot.id | ||||
self._skipped_contents = [] | self._skipped_contents = [] | ||||
self._contents = [] | self._contents = [] | ||||
self._directories = [] | self._directories = [] | ||||
self._revisions = [] | self._revisions = [] | ||||
def load_status(self) -> Dict[str, Any]: | def load_status(self) -> Dict[str, Any]: | ||||
assert self.snapshot is not None | assert self.snapshot is not None | ||||
if self.last_snapshot == self.snapshot: | if self.last_snapshot == self.snapshot: | ||||
load_status = "uneventful" | load_status = "uneventful" | ||||
else: | else: | ||||
load_status = "eventful" | load_status = "eventful" | ||||
return { | return { | ||||
"status": load_status, | "status": load_status, | ||||
} | } | ||||
def visit_status(self): | def visit_status(self) -> str: | ||||
return self._visit_status | return self._visit_status |
Does this class serve some purpose or could it be removed?