Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9345586
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
53 KB
Subscribers
None
View Options
diff --git a/swh/loader/cvs/loader.py b/swh/loader/cvs/loader.py
index 8f56eee..2e1ae68 100644
--- a/swh/loader/cvs/loader.py
+++ b/swh/loader/cvs/loader.py
@@ -1,556 +1,555 @@
# Copyright (C) 2015-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Loader in charge of injecting either new or existing cvs repositories to
swh-storage.
"""
from datetime import datetime
import os
import os.path
import subprocess
import tempfile
import time
from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Sequence, Tuple, cast
from urllib3.util import parse_url
from swh.loader.core.loader import BaseLoader
from swh.loader.core.utils import clean_dangling_folders
from swh.loader.cvs.cvs2gitdump.cvs2gitdump import (
CHANGESET_FUZZ_SEC,
ChangeSetKey,
CvsConv,
FileRevision,
RcsKeywords,
file_path,
)
from swh.loader.cvs.cvsclient import CVSClient
import swh.loader.cvs.rcsparse as rcsparse
from swh.loader.cvs.rlog import RlogConv
from swh.loader.exception import NotFound
from swh.model import from_disk, hashutil
from swh.model.model import (
Content,
Directory,
Origin,
Person,
Revision,
RevisionType,
Sha1Git,
SkippedContent,
Snapshot,
SnapshotBranch,
TargetType,
TimestampWithTimezone,
)
from swh.storage.algos.snapshot import snapshot_get_latest
from swh.storage.interface import StorageInterface
DEFAULT_BRANCH = b"HEAD"
TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.cvs."
class CvsLoader(BaseLoader):
"""Swh cvs loader.
The repository is local. The loader deals with
update on an already previously loaded repository.
"""
visit_type = "cvs"
cvs_module_name: str
cvsclient: CVSClient
# remote CVS repository access (history is parsed from CVS rlog):
rlog_file: BinaryIO
swh_revision_gen: Iterator[
Tuple[List[Content], List[SkippedContent], List[Directory], Revision]
]
def __init__(
self,
storage: StorageInterface,
url: str,
origin_url: Optional[str] = None,
visit_date: Optional[datetime] = None,
cvsroot_path: Optional[str] = None,
temp_directory: str = "/tmp",
max_content_size: Optional[int] = None,
):
super().__init__(
storage=storage,
logging_class="swh.loader.cvs.CvsLoader",
max_content_size=max_content_size,
)
self.cvsroot_url = url
# origin url as unique identifier for origin in swh archive
self.origin_url = origin_url if origin_url else self.cvsroot_url
self.temp_directory = temp_directory
# internal state used to store swh objects
self._contents: List[Content] = []
self._skipped_contents: List[SkippedContent] = []
self._directories: List[Directory] = []
self._revisions: List[Revision] = []
# internal state, current visit
self._last_revision: Optional[Revision] = None
self._visit_status = "full"
self.visit_date = visit_date
self.cvsroot_path = cvsroot_path
self.snapshot: Optional[Snapshot] = None
self.last_snapshot: Optional[Snapshot] = snapshot_get_latest(
self.storage, self.origin_url
)
def compute_swh_revision(
self, k: ChangeSetKey, logmsg: Optional[bytes]
) -> Tuple[Revision, from_disk.Directory]:
"""Compute swh hash data per CVS changeset.
Returns:
tuple (rev, swh_directory)
- rev: current SWH revision computed from checked out work tree
- swh_directory: dictionary of path, swh hash data with type
"""
# Compute SWH revision from the on-disk state
swh_dir = from_disk.Directory.from_disk(path=os.fsencode(self.worktree_path))
parents: Tuple[Sha1Git, ...]
if self._last_revision:
parents = (self._last_revision.id,)
else:
parents = ()
revision = self.build_swh_revision(k, logmsg, swh_dir.hash, parents)
self.log.info("SWH revision ID: %s", hashutil.hash_to_hex(revision.id))
self._last_revision = revision
return (revision, swh_dir)
def checkout_file_with_rcsparse(
self, k: ChangeSetKey, f: FileRevision, rcsfile: rcsparse.rcsfile
) -> None:
assert self.cvsroot_path
assert self.server_style_cvsroot
path = file_path(self.cvsroot_path, f.path)
- wtpath = os.path.join(self.worktree_path, path)
+ wtpath = os.path.join(self.tempdir_path, path)
self.log.info("rev %s state %s file %s" % (f.rev, f.state, f.path))
if f.state == "dead":
# remove this file from work tree
try:
os.remove(wtpath)
except FileNotFoundError:
pass
else:
# create, or update, this file in the work tree
if not rcsfile:
rcsfile = rcsparse.rcsfile(f.path)
rcs = RcsKeywords()
# We try our best to generate the same commit hashes over both pserver
# and rsync. To avoid differences in file content due to expansion of
# RCS keywords which contain absolute file paths (such as "Header"),
# attempt to expand such paths in the same way as a regular CVS server
# would expand them.
# Whether this will avoid content differences depends on pserver and
# rsync servers exposing the same server-side path to the CVS repository.
# However, this is the best we can do, and only matters if an origin can
# be fetched over both pserver and rsync. Each will still be treated as
# a distinct origin, but will hopefully point at the same SWH snapshot.
# In any case, an absolute path based on the origin URL looks nicer than
# an absolute path based on a temporary directory used by the CVS loader.
server_style_path = f.path.replace(
self.cvsroot_path, self.server_style_cvsroot
)
if server_style_path[0] != "/":
server_style_path = "/" + server_style_path
contents = rcs.expand_keyword(server_style_path, rcsfile, f.rev)
os.makedirs(os.path.dirname(wtpath), exist_ok=True)
outfile = open(wtpath, mode="wb")
outfile.write(contents)
outfile.close()
def checkout_file_with_cvsclient(
self, k: ChangeSetKey, f: FileRevision, cvsclient: CVSClient
):
assert self.cvsroot_path
path = file_path(self.cvsroot_path, f.path)
- wtpath = os.path.join(self.worktree_path, path)
+ wtpath = os.path.join(self.tempdir_path, path)
self.log.info("rev %s state %s file %s" % (f.rev, f.state, f.path))
if f.state == "dead":
# remove this file from work tree
try:
os.remove(wtpath)
except FileNotFoundError:
pass
else:
dirname = os.path.dirname(wtpath)
os.makedirs(dirname, exist_ok=True)
self.log.debug("checkout to %s\n" % wtpath)
fp = cvsclient.checkout(path, f.rev, dirname, expand_keywords=True)
os.rename(fp.name, wtpath)
try:
fp.close()
except FileNotFoundError:
# Well, we have just renamed the file...
pass
def process_cvs_changesets(
self, cvs_changesets: List[ChangeSetKey], use_rcsparse: bool,
) -> Iterator[
Tuple[List[Content], List[SkippedContent], List[Directory], Revision]
]:
"""Process CVS revisions.
At each CVS revision, check out contents and compute swh hashes.
Yields:
tuple (contents, skipped-contents, directories, revision) of dict as a
dictionary with keys, sha1_git, sha1, etc...
"""
for k in cvs_changesets:
tstr = time.strftime("%c", time.gmtime(k.max_time))
self.log.info(
"changeset from %s by %s on branch %s", tstr, k.author, k.branch
)
logmsg: Optional[bytes] = b""
# Check out all files of this revision and get a log message.
#
# The log message is obtained from the first file in the changeset.
# The message will usually be the same for all affected files, and
# the SWH archive will only store one version of the log message.
for f in k.revs:
rcsfile = None
if use_rcsparse:
if rcsfile is None:
rcsfile = rcsparse.rcsfile(f.path)
if not logmsg:
logmsg = rcsfile.getlog(k.revs[0].rev)
self.checkout_file_with_rcsparse(k, f, rcsfile)
else:
if not logmsg:
logmsg = self.rlog.getlog(self.rlog_file, f.path, k.revs[0].rev)
self.checkout_file_with_cvsclient(k, f, self.cvsclient)
# TODO: prune empty directories?
(revision, swh_dir) = self.compute_swh_revision(k, logmsg)
(contents, skipped_contents, directories) = from_disk.iter_directory(
swh_dir
)
yield contents, skipped_contents, directories, revision
def prepare_origin_visit(self) -> None:
self.origin = Origin(
url=self.origin_url if self.origin_url else self.cvsroot_url
)
def pre_cleanup(self) -> None:
"""Cleanup potential dangling files from prior runs (e.g. OOM killed
tasks)
"""
clean_dangling_folders(
self.temp_directory,
pattern_check=TEMPORARY_DIR_PREFIX_PATTERN,
log=self.log,
)
def cleanup(self) -> None:
self.log.info("cleanup")
def fetch_cvs_repo_with_rsync(self, host: str, path: str) -> None:
# URL *must* end with a trailing slash in order to get CVSROOT listed
url = "rsync://%s%s/" % (host, os.path.dirname(path))
rsync = subprocess.run(["rsync", url], capture_output=True, encoding="ascii")
rsync.check_returncode()
have_cvsroot = False
have_module = False
for line in rsync.stdout.split("\n"):
self.log.debug("rsync server: %s", line)
if line.endswith(" CVSROOT"):
have_cvsroot = True
elif line.endswith(" %s" % self.cvs_module_name):
have_module = True
if have_module and have_cvsroot:
break
if not have_module:
raise NotFound(
"CVS module %s not found at %s" % (self.cvs_module_name, url)
)
if not have_cvsroot:
raise NotFound("No CVSROOT directory found at %s" % url)
- # mypy complains: List item 3 has incompatible type "Optional[str]";
- # because self.cvsroot_path is an optional argument. We do however
- # ensure that it is initialized if the loader is not passed a
- # corresponding argument. Better ideas than ignoring types on this line?
+ assert self.cvsroot_path
subprocess.run(
- ["rsync", "-a", url, self.cvsroot_path] # type: ignore
+ # Ensure that rsync will place files directly within our cvsroot
+ # directory by appending a "/" to our cvsroot path.
+ ["rsync", "-a", url, self.cvsroot_path + "/"]
).check_returncode()
def prepare(self) -> None:
self._last_revision = None
- self.worktree_path = tempfile.mkdtemp(
+ self.tempdir_path = tempfile.mkdtemp(
suffix="-%s" % os.getpid(),
prefix=TEMPORARY_DIR_PREFIX_PATTERN,
dir=self.temp_directory,
)
url = parse_url(self.origin_url)
self.log.debug(
"prepare; origin_url=%s scheme=%s path=%s",
self.origin_url,
url.scheme,
url.path,
)
if not url.path:
raise NotFound("Invalid CVS origin URL '%s'" % self.origin_url)
self.cvs_module_name = os.path.basename(url.path)
self.server_style_cvsroot = os.path.dirname(url.path)
- os.mkdir(os.path.join(self.worktree_path, self.cvs_module_name))
+ self.worktree_path = os.path.join(self.tempdir_path, self.cvs_module_name)
if url.scheme == "file" or url.scheme == "rsync":
# local CVS repository conversion
if not self.cvsroot_path:
self.cvsroot_path = tempfile.mkdtemp(
suffix="-%s" % os.getpid(),
prefix=TEMPORARY_DIR_PREFIX_PATTERN,
dir=self.temp_directory,
)
if url.scheme == "file":
if not os.path.exists(url.path):
raise NotFound
elif url.scheme == "rsync":
self.fetch_cvs_repo_with_rsync(url.host, url.path)
have_rcsfile = False
have_cvsroot = False
for root, dirs, files in os.walk(self.cvsroot_path):
if "CVSROOT" in dirs:
have_cvsroot = True
dirs.remove("CVSROOT")
continue
for f in files:
filepath = os.path.join(root, f)
if f[-2:] == ",v":
rcsfile = rcsparse.rcsfile(filepath) # noqa: F841
self.log.debug(
"Looks like we have data to convert; "
"found a valid RCS file at %s",
filepath,
)
have_rcsfile = True
break
if have_rcsfile:
break
if not have_rcsfile:
raise NotFound(
"Directory %s does not contain any valid RCS files %s",
self.cvsroot_path,
)
if not have_cvsroot:
self.log.warn(
"The CVS repository at '%s' lacks a CVSROOT directory; "
"we might be ingesting an incomplete copy of the repository",
self.cvsroot_path,
)
# Unfortunately, there is no way to convert CVS history in an
# iterative fashion because the data is not indexed by any kind
# of changeset ID. We need to walk the history of each and every
# RCS file in the repository during every visit, even if no new
# changes will be added to the SWH archive afterwards.
# "CVS’s repository is the software equivalent of a telephone book
# sorted by telephone number."
# https://corecursive.com/software-that-doesnt-suck-with-jim-blandy/
#
# An implicit assumption made here is that self.cvs_changesets will
# fit into memory in its entirety. If it won't fit then the CVS walker
# will need to be modified such that it spools the list of changesets
# to disk instead.
cvs = CvsConv(self.cvsroot_path, RcsKeywords(), False, CHANGESET_FUZZ_SEC)
self.log.info("Walking CVS module %s", self.cvs_module_name)
cvs.walk(self.cvs_module_name)
cvs_changesets = sorted(cvs.changesets)
self.log.info(
"CVS changesets found in %s: %d",
self.cvs_module_name,
len(cvs_changesets),
)
self.swh_revision_gen = self.process_cvs_changesets(
cvs_changesets, use_rcsparse=True
)
elif url.scheme == "pserver" or url.scheme == "fake" or url.scheme == "ssh":
# remote CVS repository conversion
if not self.cvsroot_path:
self.cvsroot_path = os.path.dirname(url.path)
self.cvsclient = CVSClient(url)
cvsroot_path = os.path.dirname(url.path)
self.log.info(
"Fetching CVS rlog from %s:%s/%s",
url.host,
cvsroot_path,
self.cvs_module_name,
)
self.rlog = RlogConv(cvsroot_path, CHANGESET_FUZZ_SEC)
main_rlog_file = self.cvsclient.fetch_rlog()
self.rlog.parse_rlog(main_rlog_file)
# Find file deletion events only visible in Attic directories.
main_changesets = self.rlog.changesets
attic_paths = []
attic_rlog_files = []
assert self.cvsroot_path
for k in main_changesets:
for changed_file in k.revs:
path = file_path(self.cvsroot_path, changed_file.path)
if path.startswith(self.cvsroot_path):
path = path[
len(os.path.commonpath([self.cvsroot_path, path])) + 1 :
]
parent_path = os.path.dirname(path)
if parent_path.split("/")[-1] == "Attic":
continue
attic_path = parent_path + "/Attic"
if attic_path in attic_paths:
continue
attic_paths.append(attic_path) # avoid multiple visits
# Try to fetch more rlog data from this Attic directory.
attic_rlog_file = self.cvsclient.fetch_rlog(
path=attic_path, state="dead",
)
if attic_rlog_file:
attic_rlog_files.append(attic_rlog_file)
if len(attic_rlog_files) == 0:
self.rlog_file = main_rlog_file
else:
# Combine all the rlog pieces we found and re-parse.
fp = tempfile.TemporaryFile()
for attic_rlog_file in attic_rlog_files:
for line in attic_rlog_file.readlines():
fp.write(line)
attic_rlog_file.close()
main_rlog_file.seek(0)
for line in main_rlog_file.readlines():
fp.write(line)
main_rlog_file.close()
fp.seek(0)
self.rlog.parse_rlog(cast(BinaryIO, fp))
self.rlog_file = cast(BinaryIO, fp)
cvs_changesets = sorted(self.rlog.changesets)
self.log.info(
"CVS changesets found for %s: %d",
self.cvs_module_name,
len(cvs_changesets),
)
self.swh_revision_gen = self.process_cvs_changesets(
cvs_changesets, use_rcsparse=False
)
else:
raise NotFound("Invalid CVS origin URL '%s'" % self.origin_url)
def fetch_data(self) -> bool:
"""Fetch the next CVS revision."""
try:
data = next(self.swh_revision_gen)
except StopIteration:
assert self._last_revision is not None
self.snapshot = self.generate_and_load_snapshot(self._last_revision)
self.log.info("SWH snapshot ID: %s", hashutil.hash_to_hex(self.snapshot.id))
self.flush()
self.loaded_snapshot_id = self.snapshot.id
return False
except Exception:
self.log.exception("Exception in fetch_data:")
return False # Stopping iteration
self._contents, self._skipped_contents, self._directories, rev = data
self._revisions = [rev]
return True
def build_swh_revision(
self,
k: ChangeSetKey,
logmsg: Optional[bytes],
dir_id: bytes,
parents: Sequence[bytes],
) -> Revision:
"""Given a CVS revision, build a swh revision.
Args:
k: changeset data
logmsg: the changeset's log message
dir_id: the tree's hash identifier
parents: the revision's parents identifier
Returns:
The swh revision dictionary.
"""
author = Person.from_fullname(k.author.encode("UTF-8"))
date = TimestampWithTimezone.from_dict(k.max_time)
return Revision(
type=RevisionType.CVS,
date=date,
committer_date=date,
directory=dir_id,
message=logmsg,
author=author,
committer=author,
synthetic=True,
extra_headers=[],
parents=tuple(parents),
)
def generate_and_load_snapshot(self, revision: Revision) -> Snapshot:
"""Create the snapshot either from existing revision.
Args:
revision (dict): Last revision seen if any (None by default)
Returns:
Optional[Snapshot] The newly created snapshot
"""
snap = Snapshot(
branches={
DEFAULT_BRANCH: SnapshotBranch(
target=revision.id, target_type=TargetType.REVISION
)
}
)
self.log.debug("snapshot: %s", snap)
self.storage.snapshot_add([snap])
return snap
def store_data(self) -> None:
"Add our current CVS changeset to the archive."
self.storage.skipped_content_add(self._skipped_contents)
self.storage.content_add(self._contents)
self.storage.directory_add(self._directories)
self.storage.revision_add(self._revisions)
self.flush()
self._skipped_contents = []
self._contents = []
self._directories = []
self._revisions = []
def load_status(self) -> Dict[str, Any]:
assert self.snapshot is not None
if self.last_snapshot == self.snapshot:
load_status = "uneventful"
else:
load_status = "eventful"
return {
"status": load_status,
}
def visit_status(self) -> str:
return self._visit_status
diff --git a/swh/loader/cvs/tests/test_loader.py b/swh/loader/cvs/tests/test_loader.py
index 8866000..c0b3f70 100644
--- a/swh/loader/cvs/tests/test_loader.py
+++ b/swh/loader/cvs/tests/test_loader.py
@@ -1,949 +1,949 @@
# Copyright (C) 2016-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
from typing import Any, Dict
from swh.loader.cvs.loader import CvsLoader
from swh.loader.tests import (
assert_last_visit_matches,
check_snapshot,
get_stats,
prepare_repository_from_archive,
)
from swh.model.hashutil import hash_to_bytes
from swh.model.model import Snapshot, SnapshotBranch, TargetType
RUNBABY_SNAPSHOT = Snapshot(
- id=hash_to_bytes("1cff69ab9bd70822d5e3006092f943ccaafdcf57"),
+ id=hash_to_bytes("e64667c400049f560a3856580e0d9e511ffa66c9"),
branches={
b"HEAD": SnapshotBranch(
- target=hash_to_bytes("ef511d258fa55035c2bc2a5b05cad233cee1d328"),
+ target=hash_to_bytes("0f6db8ce49472d7829ddd6141f71c68c0d563f0e"),
target_type=TargetType.REVISION,
)
},
)
def test_loader_cvs_not_found_no_mock(swh_storage, tmp_path):
"""Given an unknown repository, the loader visit ends up in status not_found"""
unknown_repo_url = "unknown-repository"
loader = CvsLoader(swh_storage, unknown_repo_url, cvsroot_path=tmp_path)
assert loader.load() == {"status": "uneventful"}
assert_last_visit_matches(
swh_storage, unknown_repo_url, status="not_found", type="cvs",
)
def test_loader_cvs_visit(swh_storage, datadir, tmp_path):
"""Eventful visit should yield 1 snapshot"""
archive_name = "runbaby"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="cvs",
snapshot=RUNBABY_SNAPSHOT.id,
)
stats = get_stats(loader.storage)
assert stats == {
"content": 5,
- "directory": 2,
+ "directory": 1,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 1,
"skipped_content": 0,
"snapshot": 1,
}
check_snapshot(RUNBABY_SNAPSHOT, loader.storage)
def test_loader_cvs_2_visits_no_change(swh_storage, datadir, tmp_path):
"""Eventful visit followed by uneventful visit should yield the same snapshot
"""
archive_name = "runbaby"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)
)
assert loader.load() == {"status": "eventful"}
visit_status1 = assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="cvs",
snapshot=RUNBABY_SNAPSHOT.id,
)
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)
)
assert loader.load() == {"status": "uneventful"}
visit_status2 = assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="cvs",
snapshot=RUNBABY_SNAPSHOT.id,
)
assert visit_status1.date < visit_status2.date
assert visit_status1.snapshot == visit_status2.snapshot
stats = get_stats(loader.storage)
assert stats["origin_visit"] == 1 + 1 # computed twice the same snapshot
assert stats["snapshot"] == 1
GREEK_SNAPSHOT = Snapshot(
- id=hash_to_bytes("5e74af67d69dfd7aea0eb118154d062f71f50120"),
+ id=hash_to_bytes("c76f8b58a6dfbe6fccb9a85b695f914aa5c4a95a"),
branches={
b"HEAD": SnapshotBranch(
- target=hash_to_bytes("e18b92f14cd5b3efb3fcb4ea46cfaf97f25f301b"),
+ target=hash_to_bytes("e138207ddd5e1965b5ab9a522bfc2e0ecd233b67"),
target_type=TargetType.REVISION,
)
},
)
def test_loader_cvs_with_file_additions_and_deletions(swh_storage, datadir, tmp_path):
"""Eventful conversion of history with file additions and deletions"""
archive_name = "greek-repository"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
repo_url += "/greek-tree" # CVS module name
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT.id,
)
stats = get_stats(loader.storage)
assert stats == {
"content": 8,
- "directory": 20,
+ "directory": 13,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 7,
"skipped_content": 0,
"snapshot": 1,
}
check_snapshot(GREEK_SNAPSHOT, loader.storage)
def test_loader_cvs_pserver_with_file_additions_and_deletions(
swh_storage, datadir, tmp_path
):
"""Eventful CVS pserver conversion with file additions and deletions"""
archive_name = "greek-repository"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
repo_url += "/greek-tree" # CVS module name
# Ask our cvsclient to connect via the 'cvs server' command
repo_url = f"fake://{repo_url[7:]}"
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT.id,
)
stats = get_stats(loader.storage)
assert stats == {
"content": 8,
- "directory": 20,
+ "directory": 13,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 7,
"skipped_content": 0,
"snapshot": 1,
}
check_snapshot(GREEK_SNAPSHOT, loader.storage)
GREEK_SNAPSHOT2 = Snapshot(
- id=hash_to_bytes("048885ae2145ffe81588aea95dcf75c536ecdf26"),
+ id=hash_to_bytes("e3d2e8860286000f546c01aa2a3e1630170eb3b6"),
branches={
b"HEAD": SnapshotBranch(
- target=hash_to_bytes("55eb1438c03588607ce4b8db8f45e8e23075951b"),
+ target=hash_to_bytes("f1ff9a3c7624b1be5e5d51f9ec0abf7dcddbf0b2"),
target_type=TargetType.REVISION,
)
},
)
def test_loader_cvs_2_visits_with_change(swh_storage, datadir, tmp_path):
"""Eventful visit followed by eventful visit should yield two snapshots"""
archive_name = "greek-repository"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
repo_url += "/greek-tree" # CVS module name
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)
)
assert loader.load() == {"status": "eventful"}
visit_status1 = assert_last_visit_matches(
loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT.id,
)
stats = get_stats(loader.storage)
assert stats == {
"content": 8,
- "directory": 20,
+ "directory": 13,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 7,
"skipped_content": 0,
"snapshot": 1,
}
archive_name2 = "greek-repository2"
archive_path2 = os.path.join(datadir, f"{archive_name2}.tgz")
repo_url = prepare_repository_from_archive(archive_path2, archive_name, tmp_path)
repo_url += "/greek-tree" # CVS module name
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)
)
assert loader.load() == {"status": "eventful"}
visit_status2 = assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="cvs",
snapshot=GREEK_SNAPSHOT2.id,
)
stats = get_stats(loader.storage)
assert stats == {
"content": 10,
- "directory": 23,
+ "directory": 15,
"origin": 1,
"origin_visit": 2,
"release": 0,
"revision": 8,
"skipped_content": 0,
"snapshot": 2,
}
check_snapshot(GREEK_SNAPSHOT2, loader.storage)
assert visit_status1.date < visit_status2.date
assert visit_status1.snapshot != visit_status2.snapshot
def test_loader_cvs_visit_pserver(swh_storage, datadir, tmp_path):
"""Eventful visit to CVS pserver should yield 1 snapshot"""
archive_name = "runbaby"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
repo_url += "/runbaby" # CVS module name
# Ask our cvsclient to connect via the 'cvs server' command
repo_url = f"fake://{repo_url[7:]}"
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="cvs",
snapshot=RUNBABY_SNAPSHOT.id,
)
stats = get_stats(loader.storage)
assert stats == {
"content": 5,
- "directory": 2,
+ "directory": 1,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 1,
"skipped_content": 0,
"snapshot": 1,
}
check_snapshot(RUNBABY_SNAPSHOT, loader.storage)
GREEK_SNAPSHOT3 = Snapshot(
- id=hash_to_bytes("cd801546b0137c82f01b9b67848ba8261d64ebbb"),
+ id=hash_to_bytes("6e9910ed072662cb482d9017cbf5e1973e6dc09f"),
branches={
b"HEAD": SnapshotBranch(
- target=hash_to_bytes("14980990790ce1921db953c4c9ae03dd8861e8d6"),
+ target=hash_to_bytes("d9f4837dc55a87d83730c6e277c88b67dae80272"),
target_type=TargetType.REVISION,
)
},
)
def test_loader_cvs_visit_pserver_no_eol(swh_storage, datadir, tmp_path):
"""Visit to CVS pserver with file that lacks trailing eol"""
archive_name = "greek-repository3"
extracted_name = "greek-repository"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path)
repo_url += "/greek-tree" # CVS module name
# Ask our cvsclient to connect via the 'cvs server' command
repo_url = f"fake://{repo_url[7:]}"
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="cvs",
snapshot=GREEK_SNAPSHOT3.id,
)
stats = get_stats(loader.storage)
assert stats == {
"content": 9,
- "directory": 23,
+ "directory": 15,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 8,
"skipped_content": 0,
"snapshot": 1,
}
check_snapshot(GREEK_SNAPSHOT3, loader.storage)
GREEK_SNAPSHOT4 = Snapshot(
- id=hash_to_bytes("26e943053ea9c5f961336a72328cac22026ed3b5"),
+ id=hash_to_bytes("a8593e9233601b31e012d36975f817d2c993d04b"),
branches={
b"HEAD": SnapshotBranch(
- target=hash_to_bytes("ed784aff0e0743244bb1f30ba21c8abcd0d460ab"),
+ target=hash_to_bytes("51bb99655225c810ee259087fcae505899725360"),
target_type=TargetType.REVISION,
)
},
)
def test_loader_cvs_visit_expand_id_keyword(swh_storage, datadir, tmp_path):
"""Visit to CVS repository with file with an RCS Id keyword"""
archive_name = "greek-repository4"
extracted_name = "greek-repository"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path)
repo_url += "/greek-tree" # CVS module name
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="cvs",
snapshot=GREEK_SNAPSHOT4.id,
)
stats = get_stats(loader.storage)
assert stats == {
"content": 12,
- "directory": 31,
+ "directory": 20,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 11,
"skipped_content": 0,
"snapshot": 1,
}
check_snapshot(GREEK_SNAPSHOT4, loader.storage)
def test_loader_cvs_visit_pserver_expand_id_keyword(swh_storage, datadir, tmp_path):
"""Visit to CVS pserver with file with an RCS Id keyword"""
archive_name = "greek-repository4"
extracted_name = "greek-repository"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path)
repo_url += "/greek-tree" # CVS module name
# Ask our cvsclient to connect via the 'cvs server' command
repo_url = f"fake://{repo_url[7:]}"
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="cvs",
snapshot=GREEK_SNAPSHOT4.id,
)
stats = get_stats(loader.storage)
assert stats == {
"content": 12,
- "directory": 31,
+ "directory": 20,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 11,
"skipped_content": 0,
"snapshot": 1,
}
check_snapshot(GREEK_SNAPSHOT4, loader.storage)
GREEK_SNAPSHOT5 = Snapshot(
- id=hash_to_bytes("ee6faeaf50aa513c53c8ba29194116a5ef88add6"),
+ id=hash_to_bytes("6484ec9bfff677731cbb6d2bd5058dabfae952ed"),
branches={
b"HEAD": SnapshotBranch(
- target=hash_to_bytes("4320f152cc61ed660d25fdeebc787b3099e55a96"),
+ target=hash_to_bytes("514b3bef07d56e393588ceda18cc1dfa2dc4e04a"),
target_type=TargetType.REVISION,
)
},
)
def test_loader_cvs_with_file_deleted_and_readded(swh_storage, datadir, tmp_path):
"""Eventful conversion of history with file deletion and re-addition"""
archive_name = "greek-repository5"
extracted_name = "greek-repository"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path)
repo_url += "/greek-tree" # CVS module name
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="cvs",
snapshot=GREEK_SNAPSHOT5.id,
)
stats = get_stats(loader.storage)
assert stats == {
"content": 9,
- "directory": 22,
+ "directory": 14,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 8,
"skipped_content": 0,
"snapshot": 1,
}
check_snapshot(GREEK_SNAPSHOT5, loader.storage)
def test_loader_cvs_pserver_with_file_deleted_and_readded(
swh_storage, datadir, tmp_path
):
"""Eventful pserver conversion with file deletion and re-addition"""
archive_name = "greek-repository5"
extracted_name = "greek-repository"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path)
repo_url += "/greek-tree" # CVS module name
# Ask our cvsclient to connect via the 'cvs server' command
repo_url = f"fake://{repo_url[7:]}"
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="cvs",
snapshot=GREEK_SNAPSHOT5.id,
)
stats = get_stats(loader.storage)
assert stats == {
"content": 9,
- "directory": 22,
+ "directory": 14,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 8,
"skipped_content": 0,
"snapshot": 1,
}
check_snapshot(GREEK_SNAPSHOT5, loader.storage)
DINO_SNAPSHOT = Snapshot(
- id=hash_to_bytes("417021c16e17c5e0038cf0e73dbf48a6142c8304"),
+ id=hash_to_bytes("6cf774cec1030ff3e9a301681303adb537855d09"),
branches={
b"HEAD": SnapshotBranch(
- target=hash_to_bytes("df61a776c401a178cc796545849fc87bdadb2001"),
+ target=hash_to_bytes("b7d3ea1fa878d51323b5200ad2c6ee9d5b656f10"),
target_type=TargetType.REVISION,
)
},
)
def test_loader_cvs_readded_file_in_attic(swh_storage, datadir, tmp_path):
"""Conversion of history with RCS files in the Attic"""
# This repository has some file revisions marked "dead" in the Attic only.
# This is different to the re-added file tests above, where the RCS file
# was moved out of the Attic again as soon as the corresponding deleted
# file was re-added. Failure to detect the "dead" file revisions in the
# Attic would result in errors in our converted history.
archive_name = "dino-readded-file"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
repo_url += "/src" # CVS module name
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage, repo_url, status="full", type="cvs", snapshot=DINO_SNAPSHOT.id,
)
stats = get_stats(loader.storage)
assert stats == {
"content": 38,
- "directory": 105,
+ "directory": 70,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 35,
"skipped_content": 0,
"snapshot": 1,
}
check_snapshot(DINO_SNAPSHOT, loader.storage)
def test_loader_cvs_pserver_readded_file_in_attic(swh_storage, datadir, tmp_path):
"""Conversion over pserver with RCS files in the Attic"""
# This repository has some file revisions marked "dead" in the Attic only.
# This is different to the re-added file tests above, where the RCS file
# was moved out of the Attic again as soon as the corresponding deleted
# file was re-added. Failure to detect the "dead" file revisions in the
# Attic would result in errors in our converted history.
# This has special implications for the pserver case, because the "dead"
# revisions will not appear in in the output of 'cvs rlog' by default.
archive_name = "dino-readded-file"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
repo_url += "/src" # CVS module name
# Ask our cvsclient to connect via the 'cvs server' command
repo_url = f"fake://{repo_url[7:]}"
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage, repo_url, status="full", type="cvs", snapshot=DINO_SNAPSHOT.id,
)
stats = get_stats(loader.storage)
assert stats == {
"content": 38,
- "directory": 105,
+ "directory": 70,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 35,
"skipped_content": 0,
"snapshot": 1,
}
check_snapshot(DINO_SNAPSHOT, loader.storage)
DINO_SNAPSHOT2 = Snapshot(
- id=hash_to_bytes("a9d6ce0b4f22dc4fd752ad4c25ec9ea71ed568d7"),
+ id=hash_to_bytes("afdeca6b8ec8f58367b4e014e2210233f1c5bf3d"),
branches={
b"HEAD": SnapshotBranch(
- target=hash_to_bytes("150616a2a3206f00a73f2d6a017dde22c52e4a83"),
+ target=hash_to_bytes("84e428103d42b84713c77afb9420d667062f8676"),
target_type=TargetType.REVISION,
)
},
)
def test_loader_cvs_split_commits_by_commitid(swh_storage, datadir, tmp_path):
"""Conversion of RCS history which needs to be split by commit ID"""
# This repository has some file revisions which use the same log message
# and can only be told apart by commit IDs. Without commit IDs, these commits
# would get merged into a single commit in our conversion result.
archive_name = "dino-commitid"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
repo_url += "/dino" # CVS module name
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage, repo_url, status="full", type="cvs", snapshot=DINO_SNAPSHOT2.id,
)
check_snapshot(DINO_SNAPSHOT2, loader.storage)
stats = get_stats(loader.storage)
assert stats == {
"content": 18,
- "directory": 36,
+ "directory": 18,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 18,
"skipped_content": 0,
"snapshot": 1,
}
def test_loader_cvs_pserver_split_commits_by_commitid(swh_storage, datadir, tmp_path):
"""Conversion via pserver which needs to be split by commit ID"""
# This repository has some file revisions which use the same log message
# and can only be told apart by commit IDs. Without commit IDs, these commits
# would get merged into a single commit in our conversion result.
archive_name = "dino-commitid"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
repo_url += "/dino" # CVS module name
# Ask our cvsclient to connect via the 'cvs server' command
repo_url = f"fake://{repo_url[7:]}"
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage, repo_url, status="full", type="cvs", snapshot=DINO_SNAPSHOT2.id,
)
check_snapshot(DINO_SNAPSHOT2, loader.storage)
stats = get_stats(loader.storage)
assert stats == {
"content": 18,
- "directory": 36,
+ "directory": 18,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 18,
"skipped_content": 0,
"snapshot": 1,
}
GREEK_SNAPSHOT6 = Snapshot(
- id=hash_to_bytes("b4c9423b2711c181251deb458d4ab4a3172948ac"),
+ id=hash_to_bytes("859ae7ca5b31fee594c98abecdd41eff17cae079"),
branches={
b"HEAD": SnapshotBranch(
- target=hash_to_bytes("f317c720e1929fec0afce10e6a8cfd24ef76dfc7"),
+ target=hash_to_bytes("fa48fb4551898cd8d3305cace971b3b95639e83e"),
target_type=TargetType.REVISION,
)
},
)
def test_loader_cvs_empty_lines_in_log_message(swh_storage, datadir, tmp_path):
"""Conversion of RCS history with empty lines in a log message"""
archive_name = "greek-repository6"
extracted_name = "greek-repository"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path)
repo_url += "/greek-tree" # CVS module name
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="cvs",
snapshot=GREEK_SNAPSHOT6.id,
)
check_snapshot(GREEK_SNAPSHOT6, loader.storage)
stats = get_stats(loader.storage)
assert stats == {
"content": 9,
- "directory": 22,
+ "directory": 14,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 8,
"skipped_content": 0,
"snapshot": 1,
}
def test_loader_cvs_pserver_empty_lines_in_log_message(swh_storage, datadir, tmp_path):
"""Conversion via pserver with empty lines in a log message"""
archive_name = "greek-repository6"
extracted_name = "greek-repository"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path)
repo_url += "/greek-tree" # CVS module name
# Ask our cvsclient to connect via the 'cvs server' command
repo_url = f"fake://{repo_url[7:]}"
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="cvs",
snapshot=GREEK_SNAPSHOT6.id,
)
check_snapshot(GREEK_SNAPSHOT6, loader.storage)
stats = get_stats(loader.storage)
assert stats == {
"content": 9,
- "directory": 22,
+ "directory": 14,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 8,
"skipped_content": 0,
"snapshot": 1,
}
def get_head_revision_paths_info(loader: CvsLoader) -> Dict[bytes, Dict[str, Any]]:
assert loader.snapshot is not None
root_dir = loader.snapshot.branches[b"HEAD"].target
revision = loader.storage.revision_get([root_dir])[0]
assert revision is not None
paths = {}
for entry in loader.storage.directory_ls(revision.directory, recursive=True):
paths[entry["name"]] = entry
return paths
def test_loader_cvs_with_header_keyword(swh_storage, datadir, tmp_path):
"""Eventful conversion of history with Header keyword in a file"""
archive_name = "greek-repository7"
extracted_name = "greek-repository"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path)
repo_url += "/greek-tree" # CVS module name
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name)
)
assert loader.load() == {"status": "eventful"}
repo_url = f"fake://{repo_url[7:]}"
loader2 = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name)
)
assert loader2.load() == {"status": "eventful"}
# We cannot verify the snapshot ID. It is unpredicable due to use of the $Header$
# RCS keyword which contains the temporary directory where the repository is stored.
expected_stats = {
"content": 9,
- "directory": 22,
+ "directory": 14,
"origin": 2,
"origin_visit": 2,
"release": 0,
"revision": 8,
"skipped_content": 0,
"snapshot": 1,
}
stats = get_stats(loader.storage)
assert stats == expected_stats
stats = get_stats(loader2.storage)
assert stats == expected_stats
# Ensure that file 'alpha', which contains a $Header$ keyword,
# was imported with equal content via file:// and fake:// URLs.
paths = get_head_revision_paths_info(loader)
paths2 = get_head_revision_paths_info(loader2)
- alpha = paths[b"greek-tree/alpha"]
- alpha2 = paths2[b"greek-tree/alpha"]
+ alpha = paths[b"alpha"]
+ alpha2 = paths2[b"alpha"]
assert alpha["sha1"] == alpha2["sha1"]
GREEK_SNAPSHOT8 = Snapshot(
- id=hash_to_bytes("b98a2744199723be827d48bad2f65ee1c2df7513"),
+ id=hash_to_bytes("5278a1f73ed0f804c68f72614a5f78ca5074ab9c"),
branches={
b"HEAD": SnapshotBranch(
- target=hash_to_bytes("ee8be88b458b7fbca3037ab05e56552578e66faa"),
+ target=hash_to_bytes("b389258fec8151d719e79da80b5e5355a48ec8bc"),
target_type=TargetType.REVISION,
)
},
)
def test_loader_cvs_expand_log_keyword(swh_storage, datadir, tmp_path):
"""Conversion of RCS history with Log keyword in files"""
archive_name = "greek-repository8"
extracted_name = "greek-repository"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path)
repo_url += "/greek-tree" # CVS module name
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="cvs",
snapshot=GREEK_SNAPSHOT8.id,
)
check_snapshot(GREEK_SNAPSHOT8, loader.storage)
stats = get_stats(loader.storage)
assert stats == {
"content": 14,
- "directory": 31,
+ "directory": 20,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 11,
"skipped_content": 0,
"snapshot": 1,
}
def test_loader_cvs_pserver_expand_log_keyword(swh_storage, datadir, tmp_path):
"""Conversion of RCS history with Log keyword in files"""
archive_name = "greek-repository8"
extracted_name = "greek-repository"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path)
repo_url += "/greek-tree" # CVS module name
# Ask our cvsclient to connect via the 'cvs server' command
repo_url = f"fake://{repo_url[7:]}"
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="cvs",
snapshot=GREEK_SNAPSHOT8.id,
)
check_snapshot(GREEK_SNAPSHOT8, loader.storage)
stats = get_stats(loader.storage)
assert stats == {
"content": 14,
- "directory": 31,
+ "directory": 20,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 11,
"skipped_content": 0,
"snapshot": 1,
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Jul 4, 3:26 PM (6 d, 18 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3265756
Attached To
rDLDCVS CVS Loader
Event Timeline
Log In to Comment