Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/loader/cvs/loader.py b/swh/loader/cvs/loader.py
index 8f56eee..2e1ae68 100644
--- a/swh/loader/cvs/loader.py
+++ b/swh/loader/cvs/loader.py
@@ -1,556 +1,555 @@
# Copyright (C) 2015-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Loader in charge of injecting either new or existing cvs repositories to
swh-storage.
"""
from datetime import datetime
import os
import os.path
import subprocess
import tempfile
import time
from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Sequence, Tuple, cast
from urllib3.util import parse_url
from swh.loader.core.loader import BaseLoader
from swh.loader.core.utils import clean_dangling_folders
from swh.loader.cvs.cvs2gitdump.cvs2gitdump import (
CHANGESET_FUZZ_SEC,
ChangeSetKey,
CvsConv,
FileRevision,
RcsKeywords,
file_path,
)
from swh.loader.cvs.cvsclient import CVSClient
import swh.loader.cvs.rcsparse as rcsparse
from swh.loader.cvs.rlog import RlogConv
from swh.loader.exception import NotFound
from swh.model import from_disk, hashutil
from swh.model.model import (
Content,
Directory,
Origin,
Person,
Revision,
RevisionType,
Sha1Git,
SkippedContent,
Snapshot,
SnapshotBranch,
TargetType,
TimestampWithTimezone,
)
from swh.storage.algos.snapshot import snapshot_get_latest
from swh.storage.interface import StorageInterface
DEFAULT_BRANCH = b"HEAD"
TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.cvs."
class CvsLoader(BaseLoader):
"""Swh cvs loader.
The repository is local. The loader deals with
update on an already previously loaded repository.
"""
visit_type = "cvs"
cvs_module_name: str
cvsclient: CVSClient
# remote CVS repository access (history is parsed from CVS rlog):
rlog_file: BinaryIO
swh_revision_gen: Iterator[
Tuple[List[Content], List[SkippedContent], List[Directory], Revision]
]
def __init__(
self,
storage: StorageInterface,
url: str,
origin_url: Optional[str] = None,
visit_date: Optional[datetime] = None,
cvsroot_path: Optional[str] = None,
temp_directory: str = "/tmp",
max_content_size: Optional[int] = None,
):
super().__init__(
storage=storage,
logging_class="swh.loader.cvs.CvsLoader",
max_content_size=max_content_size,
)
self.cvsroot_url = url
# origin url as unique identifier for origin in swh archive
self.origin_url = origin_url if origin_url else self.cvsroot_url
self.temp_directory = temp_directory
# internal state used to store swh objects
self._contents: List[Content] = []
self._skipped_contents: List[SkippedContent] = []
self._directories: List[Directory] = []
self._revisions: List[Revision] = []
# internal state, current visit
self._last_revision: Optional[Revision] = None
self._visit_status = "full"
self.visit_date = visit_date
self.cvsroot_path = cvsroot_path
self.snapshot: Optional[Snapshot] = None
self.last_snapshot: Optional[Snapshot] = snapshot_get_latest(
self.storage, self.origin_url
)
def compute_swh_revision(
self, k: ChangeSetKey, logmsg: Optional[bytes]
) -> Tuple[Revision, from_disk.Directory]:
"""Compute swh hash data per CVS changeset.
Returns:
tuple (rev, swh_directory)
- rev: current SWH revision computed from checked out work tree
- swh_directory: dictionary of path, swh hash data with type
"""
# Compute SWH revision from the on-disk state
swh_dir = from_disk.Directory.from_disk(path=os.fsencode(self.worktree_path))
parents: Tuple[Sha1Git, ...]
if self._last_revision:
parents = (self._last_revision.id,)
else:
parents = ()
revision = self.build_swh_revision(k, logmsg, swh_dir.hash, parents)
self.log.info("SWH revision ID: %s", hashutil.hash_to_hex(revision.id))
self._last_revision = revision
return (revision, swh_dir)
def checkout_file_with_rcsparse(
self, k: ChangeSetKey, f: FileRevision, rcsfile: rcsparse.rcsfile
) -> None:
assert self.cvsroot_path
assert self.server_style_cvsroot
path = file_path(self.cvsroot_path, f.path)
- wtpath = os.path.join(self.worktree_path, path)
+ wtpath = os.path.join(self.tempdir_path, path)
self.log.info("rev %s state %s file %s" % (f.rev, f.state, f.path))
if f.state == "dead":
# remove this file from work tree
try:
os.remove(wtpath)
except FileNotFoundError:
pass
else:
# create, or update, this file in the work tree
if not rcsfile:
rcsfile = rcsparse.rcsfile(f.path)
rcs = RcsKeywords()
# We try our best to generate the same commit hashes over both pserver
# and rsync. To avoid differences in file content due to expansion of
# RCS keywords which contain absolute file paths (such as "Header"),
# attempt to expand such paths in the same way as a regular CVS server
# would expand them.
# Whether this will avoid content differences depends on pserver and
# rsync servers exposing the same server-side path to the CVS repository.
# However, this is the best we can do, and only matters if an origin can
# be fetched over both pserver and rsync. Each will still be treated as
# a distinct origin, but will hopefully point at the same SWH snapshot.
# In any case, an absolute path based on the origin URL looks nicer than
# an absolute path based on a temporary directory used by the CVS loader.
server_style_path = f.path.replace(
self.cvsroot_path, self.server_style_cvsroot
)
if server_style_path[0] != "/":
server_style_path = "/" + server_style_path
contents = rcs.expand_keyword(server_style_path, rcsfile, f.rev)
os.makedirs(os.path.dirname(wtpath), exist_ok=True)
outfile = open(wtpath, mode="wb")
outfile.write(contents)
outfile.close()
def checkout_file_with_cvsclient(
self, k: ChangeSetKey, f: FileRevision, cvsclient: CVSClient
):
assert self.cvsroot_path
path = file_path(self.cvsroot_path, f.path)
- wtpath = os.path.join(self.worktree_path, path)
+ wtpath = os.path.join(self.tempdir_path, path)
self.log.info("rev %s state %s file %s" % (f.rev, f.state, f.path))
if f.state == "dead":
# remove this file from work tree
try:
os.remove(wtpath)
except FileNotFoundError:
pass
else:
dirname = os.path.dirname(wtpath)
os.makedirs(dirname, exist_ok=True)
self.log.debug("checkout to %s\n" % wtpath)
fp = cvsclient.checkout(path, f.rev, dirname, expand_keywords=True)
os.rename(fp.name, wtpath)
try:
fp.close()
except FileNotFoundError:
# Well, we have just renamed the file...
pass
def process_cvs_changesets(
self, cvs_changesets: List[ChangeSetKey], use_rcsparse: bool,
) -> Iterator[
Tuple[List[Content], List[SkippedContent], List[Directory], Revision]
]:
"""Process CVS revisions.
At each CVS revision, check out contents and compute swh hashes.
Yields:
tuple (contents, skipped-contents, directories, revision) of dict as a
dictionary with keys, sha1_git, sha1, etc...
"""
for k in cvs_changesets:
tstr = time.strftime("%c", time.gmtime(k.max_time))
self.log.info(
"changeset from %s by %s on branch %s", tstr, k.author, k.branch
)
logmsg: Optional[bytes] = b""
# Check out all files of this revision and get a log message.
#
# The log message is obtained from the first file in the changeset.
# The message will usually be the same for all affected files, and
# the SWH archive will only store one version of the log message.
for f in k.revs:
rcsfile = None
if use_rcsparse:
if rcsfile is None:
rcsfile = rcsparse.rcsfile(f.path)
if not logmsg:
logmsg = rcsfile.getlog(k.revs[0].rev)
self.checkout_file_with_rcsparse(k, f, rcsfile)
else:
if not logmsg:
logmsg = self.rlog.getlog(self.rlog_file, f.path, k.revs[0].rev)
self.checkout_file_with_cvsclient(k, f, self.cvsclient)
# TODO: prune empty directories?
(revision, swh_dir) = self.compute_swh_revision(k, logmsg)
(contents, skipped_contents, directories) = from_disk.iter_directory(
swh_dir
)
yield contents, skipped_contents, directories, revision
def prepare_origin_visit(self) -> None:
self.origin = Origin(
url=self.origin_url if self.origin_url else self.cvsroot_url
)
def pre_cleanup(self) -> None:
"""Cleanup potential dangling files from prior runs (e.g. OOM killed
tasks)
"""
clean_dangling_folders(
self.temp_directory,
pattern_check=TEMPORARY_DIR_PREFIX_PATTERN,
log=self.log,
)
def cleanup(self) -> None:
self.log.info("cleanup")
def fetch_cvs_repo_with_rsync(self, host: str, path: str) -> None:
# URL *must* end with a trailing slash in order to get CVSROOT listed
url = "rsync://%s%s/" % (host, os.path.dirname(path))
rsync = subprocess.run(["rsync", url], capture_output=True, encoding="ascii")
rsync.check_returncode()
have_cvsroot = False
have_module = False
for line in rsync.stdout.split("\n"):
self.log.debug("rsync server: %s", line)
if line.endswith(" CVSROOT"):
have_cvsroot = True
elif line.endswith(" %s" % self.cvs_module_name):
have_module = True
if have_module and have_cvsroot:
break
if not have_module:
raise NotFound(
"CVS module %s not found at %s" % (self.cvs_module_name, url)
)
if not have_cvsroot:
raise NotFound("No CVSROOT directory found at %s" % url)
- # mypy complains: List item 3 has incompatible type "Optional[str]";
- # because self.cvsroot_path is an optional argument. We do however
- # ensure that it is initialized if the loader is not passed a
- # corresponding argument. Better ideas than ignoring types on this line?
+ assert self.cvsroot_path
subprocess.run(
- ["rsync", "-a", url, self.cvsroot_path] # type: ignore
+ # Ensure that rsync will place files directly within our cvsroot
+ # directory by appending a "/" to our cvsroot path.
+ ["rsync", "-a", url, self.cvsroot_path + "/"]
).check_returncode()
def prepare(self) -> None:
self._last_revision = None
- self.worktree_path = tempfile.mkdtemp(
+ self.tempdir_path = tempfile.mkdtemp(
suffix="-%s" % os.getpid(),
prefix=TEMPORARY_DIR_PREFIX_PATTERN,
dir=self.temp_directory,
)
url = parse_url(self.origin_url)
self.log.debug(
"prepare; origin_url=%s scheme=%s path=%s",
self.origin_url,
url.scheme,
url.path,
)
if not url.path:
raise NotFound("Invalid CVS origin URL '%s'" % self.origin_url)
self.cvs_module_name = os.path.basename(url.path)
self.server_style_cvsroot = os.path.dirname(url.path)
- os.mkdir(os.path.join(self.worktree_path, self.cvs_module_name))
+ self.worktree_path = os.path.join(self.tempdir_path, self.cvs_module_name)
if url.scheme == "file" or url.scheme == "rsync":
# local CVS repository conversion
if not self.cvsroot_path:
self.cvsroot_path = tempfile.mkdtemp(
suffix="-%s" % os.getpid(),
prefix=TEMPORARY_DIR_PREFIX_PATTERN,
dir=self.temp_directory,
)
if url.scheme == "file":
if not os.path.exists(url.path):
raise NotFound
elif url.scheme == "rsync":
self.fetch_cvs_repo_with_rsync(url.host, url.path)
have_rcsfile = False
have_cvsroot = False
for root, dirs, files in os.walk(self.cvsroot_path):
if "CVSROOT" in dirs:
have_cvsroot = True
dirs.remove("CVSROOT")
continue
for f in files:
filepath = os.path.join(root, f)
if f[-2:] == ",v":
rcsfile = rcsparse.rcsfile(filepath) # noqa: F841
self.log.debug(
"Looks like we have data to convert; "
"found a valid RCS file at %s",
filepath,
)
have_rcsfile = True
break
if have_rcsfile:
break
if not have_rcsfile:
raise NotFound(
"Directory %s does not contain any valid RCS files %s",
self.cvsroot_path,
)
if not have_cvsroot:
self.log.warn(
"The CVS repository at '%s' lacks a CVSROOT directory; "
"we might be ingesting an incomplete copy of the repository",
self.cvsroot_path,
)
# Unfortunately, there is no way to convert CVS history in an
# iterative fashion because the data is not indexed by any kind
# of changeset ID. We need to walk the history of each and every
# RCS file in the repository during every visit, even if no new
# changes will be added to the SWH archive afterwards.
# "CVS’s repository is the software equivalent of a telephone book
# sorted by telephone number."
# https://corecursive.com/software-that-doesnt-suck-with-jim-blandy/
#
# An implicit assumption made here is that self.cvs_changesets will
# fit into memory in its entirety. If it won't fit then the CVS walker
# will need to be modified such that it spools the list of changesets
# to disk instead.
cvs = CvsConv(self.cvsroot_path, RcsKeywords(), False, CHANGESET_FUZZ_SEC)
self.log.info("Walking CVS module %s", self.cvs_module_name)
cvs.walk(self.cvs_module_name)
cvs_changesets = sorted(cvs.changesets)
self.log.info(
"CVS changesets found in %s: %d",
self.cvs_module_name,
len(cvs_changesets),
)
self.swh_revision_gen = self.process_cvs_changesets(
cvs_changesets, use_rcsparse=True
)
elif url.scheme == "pserver" or url.scheme == "fake" or url.scheme == "ssh":
# remote CVS repository conversion
if not self.cvsroot_path:
self.cvsroot_path = os.path.dirname(url.path)
self.cvsclient = CVSClient(url)
cvsroot_path = os.path.dirname(url.path)
self.log.info(
"Fetching CVS rlog from %s:%s/%s",
url.host,
cvsroot_path,
self.cvs_module_name,
)
self.rlog = RlogConv(cvsroot_path, CHANGESET_FUZZ_SEC)
main_rlog_file = self.cvsclient.fetch_rlog()
self.rlog.parse_rlog(main_rlog_file)
# Find file deletion events only visible in Attic directories.
main_changesets = self.rlog.changesets
attic_paths = []
attic_rlog_files = []
assert self.cvsroot_path
for k in main_changesets:
for changed_file in k.revs:
path = file_path(self.cvsroot_path, changed_file.path)
if path.startswith(self.cvsroot_path):
path = path[
len(os.path.commonpath([self.cvsroot_path, path])) + 1 :
]
parent_path = os.path.dirname(path)
if parent_path.split("/")[-1] == "Attic":
continue
attic_path = parent_path + "/Attic"
if attic_path in attic_paths:
continue
attic_paths.append(attic_path) # avoid multiple visits
# Try to fetch more rlog data from this Attic directory.
attic_rlog_file = self.cvsclient.fetch_rlog(
path=attic_path, state="dead",
)
if attic_rlog_file:
attic_rlog_files.append(attic_rlog_file)
if len(attic_rlog_files) == 0:
self.rlog_file = main_rlog_file
else:
# Combine all the rlog pieces we found and re-parse.
fp = tempfile.TemporaryFile()
for attic_rlog_file in attic_rlog_files:
for line in attic_rlog_file.readlines():
fp.write(line)
attic_rlog_file.close()
main_rlog_file.seek(0)
for line in main_rlog_file.readlines():
fp.write(line)
main_rlog_file.close()
fp.seek(0)
self.rlog.parse_rlog(cast(BinaryIO, fp))
self.rlog_file = cast(BinaryIO, fp)
cvs_changesets = sorted(self.rlog.changesets)
self.log.info(
"CVS changesets found for %s: %d",
self.cvs_module_name,
len(cvs_changesets),
)
self.swh_revision_gen = self.process_cvs_changesets(
cvs_changesets, use_rcsparse=False
)
else:
raise NotFound("Invalid CVS origin URL '%s'" % self.origin_url)
def fetch_data(self) -> bool:
"""Fetch the next CVS revision."""
try:
data = next(self.swh_revision_gen)
except StopIteration:
assert self._last_revision is not None
self.snapshot = self.generate_and_load_snapshot(self._last_revision)
self.log.info("SWH snapshot ID: %s", hashutil.hash_to_hex(self.snapshot.id))
self.flush()
self.loaded_snapshot_id = self.snapshot.id
return False
except Exception:
self.log.exception("Exception in fetch_data:")
return False # Stopping iteration
self._contents, self._skipped_contents, self._directories, rev = data
self._revisions = [rev]
return True
def build_swh_revision(
self,
k: ChangeSetKey,
logmsg: Optional[bytes],
dir_id: bytes,
parents: Sequence[bytes],
) -> Revision:
"""Given a CVS revision, build a swh revision.
Args:
k: changeset data
logmsg: the changeset's log message
dir_id: the tree's hash identifier
parents: the revision's parents identifier
Returns:
The swh revision dictionary.
"""
author = Person.from_fullname(k.author.encode("UTF-8"))
date = TimestampWithTimezone.from_dict(k.max_time)
return Revision(
type=RevisionType.CVS,
date=date,
committer_date=date,
directory=dir_id,
message=logmsg,
author=author,
committer=author,
synthetic=True,
extra_headers=[],
parents=tuple(parents),
)
def generate_and_load_snapshot(self, revision: Revision) -> Snapshot:
"""Create the snapshot either from existing revision.
Args:
revision (dict): Last revision seen if any (None by default)
Returns:
Optional[Snapshot] The newly created snapshot
"""
snap = Snapshot(
branches={
DEFAULT_BRANCH: SnapshotBranch(
target=revision.id, target_type=TargetType.REVISION
)
}
)
self.log.debug("snapshot: %s", snap)
self.storage.snapshot_add([snap])
return snap
def store_data(self) -> None:
"Add our current CVS changeset to the archive."
self.storage.skipped_content_add(self._skipped_contents)
self.storage.content_add(self._contents)
self.storage.directory_add(self._directories)
self.storage.revision_add(self._revisions)
self.flush()
self._skipped_contents = []
self._contents = []
self._directories = []
self._revisions = []
def load_status(self) -> Dict[str, Any]:
assert self.snapshot is not None
if self.last_snapshot == self.snapshot:
load_status = "uneventful"
else:
load_status = "eventful"
return {
"status": load_status,
}
def visit_status(self) -> str:
return self._visit_status
diff --git a/swh/loader/cvs/tests/test_loader.py b/swh/loader/cvs/tests/test_loader.py
index 8866000..c0b3f70 100644
--- a/swh/loader/cvs/tests/test_loader.py
+++ b/swh/loader/cvs/tests/test_loader.py
@@ -1,949 +1,949 @@
# Copyright (C) 2016-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
from typing import Any, Dict
from swh.loader.cvs.loader import CvsLoader
from swh.loader.tests import (
assert_last_visit_matches,
check_snapshot,
get_stats,
prepare_repository_from_archive,
)
from swh.model.hashutil import hash_to_bytes
from swh.model.model import Snapshot, SnapshotBranch, TargetType
RUNBABY_SNAPSHOT = Snapshot(
- id=hash_to_bytes("1cff69ab9bd70822d5e3006092f943ccaafdcf57"),
+ id=hash_to_bytes("e64667c400049f560a3856580e0d9e511ffa66c9"),
branches={
b"HEAD": SnapshotBranch(
- target=hash_to_bytes("ef511d258fa55035c2bc2a5b05cad233cee1d328"),
+ target=hash_to_bytes("0f6db8ce49472d7829ddd6141f71c68c0d563f0e"),
target_type=TargetType.REVISION,
)
},
)
def test_loader_cvs_not_found_no_mock(swh_storage, tmp_path):
"""Given an unknown repository, the loader visit ends up in status not_found"""
unknown_repo_url = "unknown-repository"
loader = CvsLoader(swh_storage, unknown_repo_url, cvsroot_path=tmp_path)
assert loader.load() == {"status": "uneventful"}
assert_last_visit_matches(
swh_storage, unknown_repo_url, status="not_found", type="cvs",
)
def test_loader_cvs_visit(swh_storage, datadir, tmp_path):
"""Eventful visit should yield 1 snapshot"""
archive_name = "runbaby"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="cvs",
snapshot=RUNBABY_SNAPSHOT.id,
)
stats = get_stats(loader.storage)
assert stats == {
"content": 5,
- "directory": 2,
+ "directory": 1,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 1,
"skipped_content": 0,
"snapshot": 1,
}
check_snapshot(RUNBABY_SNAPSHOT, loader.storage)
def test_loader_cvs_2_visits_no_change(swh_storage, datadir, tmp_path):
"""Eventful visit followed by uneventful visit should yield the same snapshot
"""
archive_name = "runbaby"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)
)
assert loader.load() == {"status": "eventful"}
visit_status1 = assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="cvs",
snapshot=RUNBABY_SNAPSHOT.id,
)
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)
)
assert loader.load() == {"status": "uneventful"}
visit_status2 = assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="cvs",
snapshot=RUNBABY_SNAPSHOT.id,
)
assert visit_status1.date < visit_status2.date
assert visit_status1.snapshot == visit_status2.snapshot
stats = get_stats(loader.storage)
assert stats["origin_visit"] == 1 + 1 # computed twice the same snapshot
assert stats["snapshot"] == 1
GREEK_SNAPSHOT = Snapshot(
- id=hash_to_bytes("5e74af67d69dfd7aea0eb118154d062f71f50120"),
+ id=hash_to_bytes("c76f8b58a6dfbe6fccb9a85b695f914aa5c4a95a"),
branches={
b"HEAD": SnapshotBranch(
- target=hash_to_bytes("e18b92f14cd5b3efb3fcb4ea46cfaf97f25f301b"),
+ target=hash_to_bytes("e138207ddd5e1965b5ab9a522bfc2e0ecd233b67"),
target_type=TargetType.REVISION,
)
},
)
def test_loader_cvs_with_file_additions_and_deletions(swh_storage, datadir, tmp_path):
"""Eventful conversion of history with file additions and deletions"""
archive_name = "greek-repository"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
repo_url += "/greek-tree" # CVS module name
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT.id,
)
stats = get_stats(loader.storage)
assert stats == {
"content": 8,
- "directory": 20,
+ "directory": 13,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 7,
"skipped_content": 0,
"snapshot": 1,
}
check_snapshot(GREEK_SNAPSHOT, loader.storage)
def test_loader_cvs_pserver_with_file_additions_and_deletions(
swh_storage, datadir, tmp_path
):
"""Eventful CVS pserver conversion with file additions and deletions"""
archive_name = "greek-repository"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
repo_url += "/greek-tree" # CVS module name
# Ask our cvsclient to connect via the 'cvs server' command
repo_url = f"fake://{repo_url[7:]}"
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT.id,
)
stats = get_stats(loader.storage)
assert stats == {
"content": 8,
- "directory": 20,
+ "directory": 13,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 7,
"skipped_content": 0,
"snapshot": 1,
}
check_snapshot(GREEK_SNAPSHOT, loader.storage)
GREEK_SNAPSHOT2 = Snapshot(
- id=hash_to_bytes("048885ae2145ffe81588aea95dcf75c536ecdf26"),
+ id=hash_to_bytes("e3d2e8860286000f546c01aa2a3e1630170eb3b6"),
branches={
b"HEAD": SnapshotBranch(
- target=hash_to_bytes("55eb1438c03588607ce4b8db8f45e8e23075951b"),
+ target=hash_to_bytes("f1ff9a3c7624b1be5e5d51f9ec0abf7dcddbf0b2"),
target_type=TargetType.REVISION,
)
},
)
def test_loader_cvs_2_visits_with_change(swh_storage, datadir, tmp_path):
"""Eventful visit followed by eventful visit should yield two snapshots"""
archive_name = "greek-repository"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
repo_url += "/greek-tree" # CVS module name
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)
)
assert loader.load() == {"status": "eventful"}
visit_status1 = assert_last_visit_matches(
loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT.id,
)
stats = get_stats(loader.storage)
assert stats == {
"content": 8,
- "directory": 20,
+ "directory": 13,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 7,
"skipped_content": 0,
"snapshot": 1,
}
archive_name2 = "greek-repository2"
archive_path2 = os.path.join(datadir, f"{archive_name2}.tgz")
repo_url = prepare_repository_from_archive(archive_path2, archive_name, tmp_path)
repo_url += "/greek-tree" # CVS module name
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)
)
assert loader.load() == {"status": "eventful"}
visit_status2 = assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="cvs",
snapshot=GREEK_SNAPSHOT2.id,
)
stats = get_stats(loader.storage)
assert stats == {
"content": 10,
- "directory": 23,
+ "directory": 15,
"origin": 1,
"origin_visit": 2,
"release": 0,
"revision": 8,
"skipped_content": 0,
"snapshot": 2,
}
check_snapshot(GREEK_SNAPSHOT2, loader.storage)
assert visit_status1.date < visit_status2.date
assert visit_status1.snapshot != visit_status2.snapshot
def test_loader_cvs_visit_pserver(swh_storage, datadir, tmp_path):
"""Eventful visit to CVS pserver should yield 1 snapshot"""
archive_name = "runbaby"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
repo_url += "/runbaby" # CVS module name
# Ask our cvsclient to connect via the 'cvs server' command
repo_url = f"fake://{repo_url[7:]}"
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="cvs",
snapshot=RUNBABY_SNAPSHOT.id,
)
stats = get_stats(loader.storage)
assert stats == {
"content": 5,
- "directory": 2,
+ "directory": 1,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 1,
"skipped_content": 0,
"snapshot": 1,
}
check_snapshot(RUNBABY_SNAPSHOT, loader.storage)
GREEK_SNAPSHOT3 = Snapshot(
- id=hash_to_bytes("cd801546b0137c82f01b9b67848ba8261d64ebbb"),
+ id=hash_to_bytes("6e9910ed072662cb482d9017cbf5e1973e6dc09f"),
branches={
b"HEAD": SnapshotBranch(
- target=hash_to_bytes("14980990790ce1921db953c4c9ae03dd8861e8d6"),
+ target=hash_to_bytes("d9f4837dc55a87d83730c6e277c88b67dae80272"),
target_type=TargetType.REVISION,
)
},
)
def test_loader_cvs_visit_pserver_no_eol(swh_storage, datadir, tmp_path):
"""Visit to CVS pserver with file that lacks trailing eol"""
archive_name = "greek-repository3"
extracted_name = "greek-repository"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path)
repo_url += "/greek-tree" # CVS module name
# Ask our cvsclient to connect via the 'cvs server' command
repo_url = f"fake://{repo_url[7:]}"
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="cvs",
snapshot=GREEK_SNAPSHOT3.id,
)
stats = get_stats(loader.storage)
assert stats == {
"content": 9,
- "directory": 23,
+ "directory": 15,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 8,
"skipped_content": 0,
"snapshot": 1,
}
check_snapshot(GREEK_SNAPSHOT3, loader.storage)
GREEK_SNAPSHOT4 = Snapshot(
- id=hash_to_bytes("26e943053ea9c5f961336a72328cac22026ed3b5"),
+ id=hash_to_bytes("a8593e9233601b31e012d36975f817d2c993d04b"),
branches={
b"HEAD": SnapshotBranch(
- target=hash_to_bytes("ed784aff0e0743244bb1f30ba21c8abcd0d460ab"),
+ target=hash_to_bytes("51bb99655225c810ee259087fcae505899725360"),
target_type=TargetType.REVISION,
)
},
)
def test_loader_cvs_visit_expand_id_keyword(swh_storage, datadir, tmp_path):
"""Visit to CVS repository with file with an RCS Id keyword"""
archive_name = "greek-repository4"
extracted_name = "greek-repository"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path)
repo_url += "/greek-tree" # CVS module name
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="cvs",
snapshot=GREEK_SNAPSHOT4.id,
)
stats = get_stats(loader.storage)
assert stats == {
"content": 12,
- "directory": 31,
+ "directory": 20,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 11,
"skipped_content": 0,
"snapshot": 1,
}
check_snapshot(GREEK_SNAPSHOT4, loader.storage)
def test_loader_cvs_visit_pserver_expand_id_keyword(swh_storage, datadir, tmp_path):
"""Visit to CVS pserver with file with an RCS Id keyword"""
archive_name = "greek-repository4"
extracted_name = "greek-repository"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path)
repo_url += "/greek-tree" # CVS module name
# Ask our cvsclient to connect via the 'cvs server' command
repo_url = f"fake://{repo_url[7:]}"
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="cvs",
snapshot=GREEK_SNAPSHOT4.id,
)
stats = get_stats(loader.storage)
assert stats == {
"content": 12,
- "directory": 31,
+ "directory": 20,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 11,
"skipped_content": 0,
"snapshot": 1,
}
check_snapshot(GREEK_SNAPSHOT4, loader.storage)
GREEK_SNAPSHOT5 = Snapshot(
- id=hash_to_bytes("ee6faeaf50aa513c53c8ba29194116a5ef88add6"),
+ id=hash_to_bytes("6484ec9bfff677731cbb6d2bd5058dabfae952ed"),
branches={
b"HEAD": SnapshotBranch(
- target=hash_to_bytes("4320f152cc61ed660d25fdeebc787b3099e55a96"),
+ target=hash_to_bytes("514b3bef07d56e393588ceda18cc1dfa2dc4e04a"),
target_type=TargetType.REVISION,
)
},
)
def test_loader_cvs_with_file_deleted_and_readded(swh_storage, datadir, tmp_path):
"""Eventful conversion of history with file deletion and re-addition"""
archive_name = "greek-repository5"
extracted_name = "greek-repository"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path)
repo_url += "/greek-tree" # CVS module name
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="cvs",
snapshot=GREEK_SNAPSHOT5.id,
)
stats = get_stats(loader.storage)
assert stats == {
"content": 9,
- "directory": 22,
+ "directory": 14,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 8,
"skipped_content": 0,
"snapshot": 1,
}
check_snapshot(GREEK_SNAPSHOT5, loader.storage)
def test_loader_cvs_pserver_with_file_deleted_and_readded(
swh_storage, datadir, tmp_path
):
"""Eventful pserver conversion with file deletion and re-addition"""
archive_name = "greek-repository5"
extracted_name = "greek-repository"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path)
repo_url += "/greek-tree" # CVS module name
# Ask our cvsclient to connect via the 'cvs server' command
repo_url = f"fake://{repo_url[7:]}"
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="cvs",
snapshot=GREEK_SNAPSHOT5.id,
)
stats = get_stats(loader.storage)
assert stats == {
"content": 9,
- "directory": 22,
+ "directory": 14,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 8,
"skipped_content": 0,
"snapshot": 1,
}
check_snapshot(GREEK_SNAPSHOT5, loader.storage)
DINO_SNAPSHOT = Snapshot(
- id=hash_to_bytes("417021c16e17c5e0038cf0e73dbf48a6142c8304"),
+ id=hash_to_bytes("6cf774cec1030ff3e9a301681303adb537855d09"),
branches={
b"HEAD": SnapshotBranch(
- target=hash_to_bytes("df61a776c401a178cc796545849fc87bdadb2001"),
+ target=hash_to_bytes("b7d3ea1fa878d51323b5200ad2c6ee9d5b656f10"),
target_type=TargetType.REVISION,
)
},
)
def test_loader_cvs_readded_file_in_attic(swh_storage, datadir, tmp_path):
"""Conversion of history with RCS files in the Attic"""
# This repository has some file revisions marked "dead" in the Attic only.
# This is different to the re-added file tests above, where the RCS file
# was moved out of the Attic again as soon as the corresponding deleted
# file was re-added. Failure to detect the "dead" file revisions in the
# Attic would result in errors in our converted history.
archive_name = "dino-readded-file"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
repo_url += "/src" # CVS module name
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage, repo_url, status="full", type="cvs", snapshot=DINO_SNAPSHOT.id,
)
stats = get_stats(loader.storage)
assert stats == {
"content": 38,
- "directory": 105,
+ "directory": 70,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 35,
"skipped_content": 0,
"snapshot": 1,
}
check_snapshot(DINO_SNAPSHOT, loader.storage)
def test_loader_cvs_pserver_readded_file_in_attic(swh_storage, datadir, tmp_path):
"""Conversion over pserver with RCS files in the Attic"""
# This repository has some file revisions marked "dead" in the Attic only.
# This is different to the re-added file tests above, where the RCS file
# was moved out of the Attic again as soon as the corresponding deleted
# file was re-added. Failure to detect the "dead" file revisions in the
# Attic would result in errors in our converted history.
# This has special implications for the pserver case, because the "dead"
# revisions will not appear in in the output of 'cvs rlog' by default.
archive_name = "dino-readded-file"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
repo_url += "/src" # CVS module name
# Ask our cvsclient to connect via the 'cvs server' command
repo_url = f"fake://{repo_url[7:]}"
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage, repo_url, status="full", type="cvs", snapshot=DINO_SNAPSHOT.id,
)
stats = get_stats(loader.storage)
assert stats == {
"content": 38,
- "directory": 105,
+ "directory": 70,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 35,
"skipped_content": 0,
"snapshot": 1,
}
check_snapshot(DINO_SNAPSHOT, loader.storage)
DINO_SNAPSHOT2 = Snapshot(
- id=hash_to_bytes("a9d6ce0b4f22dc4fd752ad4c25ec9ea71ed568d7"),
+ id=hash_to_bytes("afdeca6b8ec8f58367b4e014e2210233f1c5bf3d"),
branches={
b"HEAD": SnapshotBranch(
- target=hash_to_bytes("150616a2a3206f00a73f2d6a017dde22c52e4a83"),
+ target=hash_to_bytes("84e428103d42b84713c77afb9420d667062f8676"),
target_type=TargetType.REVISION,
)
},
)
def test_loader_cvs_split_commits_by_commitid(swh_storage, datadir, tmp_path):
"""Conversion of RCS history which needs to be split by commit ID"""
# This repository has some file revisions which use the same log message
# and can only be told apart by commit IDs. Without commit IDs, these commits
# would get merged into a single commit in our conversion result.
archive_name = "dino-commitid"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
repo_url += "/dino" # CVS module name
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage, repo_url, status="full", type="cvs", snapshot=DINO_SNAPSHOT2.id,
)
check_snapshot(DINO_SNAPSHOT2, loader.storage)
stats = get_stats(loader.storage)
assert stats == {
"content": 18,
- "directory": 36,
+ "directory": 18,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 18,
"skipped_content": 0,
"snapshot": 1,
}
def test_loader_cvs_pserver_split_commits_by_commitid(swh_storage, datadir, tmp_path):
"""Conversion via pserver which needs to be split by commit ID"""
# This repository has some file revisions which use the same log message
# and can only be told apart by commit IDs. Without commit IDs, these commits
# would get merged into a single commit in our conversion result.
archive_name = "dino-commitid"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
repo_url += "/dino" # CVS module name
# Ask our cvsclient to connect via the 'cvs server' command
repo_url = f"fake://{repo_url[7:]}"
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage, repo_url, status="full", type="cvs", snapshot=DINO_SNAPSHOT2.id,
)
check_snapshot(DINO_SNAPSHOT2, loader.storage)
stats = get_stats(loader.storage)
assert stats == {
"content": 18,
- "directory": 36,
+ "directory": 18,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 18,
"skipped_content": 0,
"snapshot": 1,
}
GREEK_SNAPSHOT6 = Snapshot(
- id=hash_to_bytes("b4c9423b2711c181251deb458d4ab4a3172948ac"),
+ id=hash_to_bytes("859ae7ca5b31fee594c98abecdd41eff17cae079"),
branches={
b"HEAD": SnapshotBranch(
- target=hash_to_bytes("f317c720e1929fec0afce10e6a8cfd24ef76dfc7"),
+ target=hash_to_bytes("fa48fb4551898cd8d3305cace971b3b95639e83e"),
target_type=TargetType.REVISION,
)
},
)
def test_loader_cvs_empty_lines_in_log_message(swh_storage, datadir, tmp_path):
"""Conversion of RCS history with empty lines in a log message"""
archive_name = "greek-repository6"
extracted_name = "greek-repository"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path)
repo_url += "/greek-tree" # CVS module name
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="cvs",
snapshot=GREEK_SNAPSHOT6.id,
)
check_snapshot(GREEK_SNAPSHOT6, loader.storage)
stats = get_stats(loader.storage)
assert stats == {
"content": 9,
- "directory": 22,
+ "directory": 14,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 8,
"skipped_content": 0,
"snapshot": 1,
}
def test_loader_cvs_pserver_empty_lines_in_log_message(swh_storage, datadir, tmp_path):
"""Conversion via pserver with empty lines in a log message"""
archive_name = "greek-repository6"
extracted_name = "greek-repository"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path)
repo_url += "/greek-tree" # CVS module name
# Ask our cvsclient to connect via the 'cvs server' command
repo_url = f"fake://{repo_url[7:]}"
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="cvs",
snapshot=GREEK_SNAPSHOT6.id,
)
check_snapshot(GREEK_SNAPSHOT6, loader.storage)
stats = get_stats(loader.storage)
assert stats == {
"content": 9,
- "directory": 22,
+ "directory": 14,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 8,
"skipped_content": 0,
"snapshot": 1,
}
def get_head_revision_paths_info(loader: CvsLoader) -> Dict[bytes, Dict[str, Any]]:
assert loader.snapshot is not None
root_dir = loader.snapshot.branches[b"HEAD"].target
revision = loader.storage.revision_get([root_dir])[0]
assert revision is not None
paths = {}
for entry in loader.storage.directory_ls(revision.directory, recursive=True):
paths[entry["name"]] = entry
return paths
def test_loader_cvs_with_header_keyword(swh_storage, datadir, tmp_path):
"""Eventful conversion of history with Header keyword in a file"""
archive_name = "greek-repository7"
extracted_name = "greek-repository"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path)
repo_url += "/greek-tree" # CVS module name
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name)
)
assert loader.load() == {"status": "eventful"}
repo_url = f"fake://{repo_url[7:]}"
loader2 = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name)
)
assert loader2.load() == {"status": "eventful"}
# We cannot verify the snapshot ID. It is unpredicable due to use of the $Header$
# RCS keyword which contains the temporary directory where the repository is stored.
expected_stats = {
"content": 9,
- "directory": 22,
+ "directory": 14,
"origin": 2,
"origin_visit": 2,
"release": 0,
"revision": 8,
"skipped_content": 0,
"snapshot": 1,
}
stats = get_stats(loader.storage)
assert stats == expected_stats
stats = get_stats(loader2.storage)
assert stats == expected_stats
# Ensure that file 'alpha', which contains a $Header$ keyword,
# was imported with equal content via file:// and fake:// URLs.
paths = get_head_revision_paths_info(loader)
paths2 = get_head_revision_paths_info(loader2)
- alpha = paths[b"greek-tree/alpha"]
- alpha2 = paths2[b"greek-tree/alpha"]
+ alpha = paths[b"alpha"]
+ alpha2 = paths2[b"alpha"]
assert alpha["sha1"] == alpha2["sha1"]
GREEK_SNAPSHOT8 = Snapshot(
- id=hash_to_bytes("b98a2744199723be827d48bad2f65ee1c2df7513"),
+ id=hash_to_bytes("5278a1f73ed0f804c68f72614a5f78ca5074ab9c"),
branches={
b"HEAD": SnapshotBranch(
- target=hash_to_bytes("ee8be88b458b7fbca3037ab05e56552578e66faa"),
+ target=hash_to_bytes("b389258fec8151d719e79da80b5e5355a48ec8bc"),
target_type=TargetType.REVISION,
)
},
)
def test_loader_cvs_expand_log_keyword(swh_storage, datadir, tmp_path):
"""Conversion of RCS history with Log keyword in files"""
archive_name = "greek-repository8"
extracted_name = "greek-repository"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path)
repo_url += "/greek-tree" # CVS module name
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="cvs",
snapshot=GREEK_SNAPSHOT8.id,
)
check_snapshot(GREEK_SNAPSHOT8, loader.storage)
stats = get_stats(loader.storage)
assert stats == {
"content": 14,
- "directory": 31,
+ "directory": 20,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 11,
"skipped_content": 0,
"snapshot": 1,
}
def test_loader_cvs_pserver_expand_log_keyword(swh_storage, datadir, tmp_path):
"""Conversion of RCS history with Log keyword in files"""
archive_name = "greek-repository8"
extracted_name = "greek-repository"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path)
repo_url += "/greek-tree" # CVS module name
# Ask our cvsclient to connect via the 'cvs server' command
repo_url = f"fake://{repo_url[7:]}"
loader = CvsLoader(
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name)
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="cvs",
snapshot=GREEK_SNAPSHOT8.id,
)
check_snapshot(GREEK_SNAPSHOT8, loader.storage)
stats = get_stats(loader.storage)
assert stats == {
"content": 14,
- "directory": 31,
+ "directory": 20,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 11,
"skipped_content": 0,
"snapshot": 1,
}

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 3:26 PM (6 d, 18 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3265756

Event Timeline