Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/mercurial/tests/test_from_disk.py
Show All 12 Lines | |||||
from swh.loader.mercurial.utils import parse_visit_date | from swh.loader.mercurial.utils import parse_visit_date | ||||
from swh.loader.tests import ( | from swh.loader.tests import ( | ||||
assert_last_visit_matches, | assert_last_visit_matches, | ||||
check_snapshot, | check_snapshot, | ||||
get_stats, | get_stats, | ||||
prepare_repository_from_archive, | prepare_repository_from_archive, | ||||
) | ) | ||||
from swh.model.from_disk import Content, DentryPerms | from swh.model.from_disk import Content, DentryPerms | ||||
from swh.model.hashutil import hash_to_bytes | from swh.model.hashutil import hash_to_bytes, hash_to_hex | ||||
from swh.model.identifiers import ObjectType | from swh.model.identifiers import ObjectType | ||||
from swh.model.model import RevisionType, Snapshot, SnapshotBranch, TargetType | from swh.model.model import RevisionType, Snapshot, SnapshotBranch, TargetType | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.algos.snapshot import snapshot_get_latest | from swh.storage.algos.snapshot import snapshot_get_latest | ||||
from ..from_disk import HgDirectory, HgLoaderFromDisk | from ..from_disk import HgDirectory, HgLoaderFromDisk | ||||
from .loader_checker import ExpectedSwhids, LoaderChecker | from .loader_checker import ExpectedSwhids, LoaderChecker | ||||
▲ Show 20 Lines • Show All 207 Lines • ▼ Show 20 Lines | for branch in snapshot.branches.values(): | ||||
if branch.target_type.value != "revision": | if branch.target_type.value != "revision": | ||||
continue | continue | ||||
revisions.append(branch.target) | revisions.append(branch.target) | ||||
# extract original changesets info and the transplant sources | # extract original changesets info and the transplant sources | ||||
hg_changesets = set() | hg_changesets = set() | ||||
transplant_sources = set() | transplant_sources = set() | ||||
for rev in loader.storage.revision_log(revisions): | for rev in loader.storage.revision_log(revisions): | ||||
hg_changesets.add(rev["metadata"]["node"]) | extids = list( | ||||
loader.storage.extid_get_from_target(ObjectType.REVISION, [rev["id"]]) | |||||
) | |||||
assert len(extids) == 1 | |||||
hg_changesets.add(hash_to_hex(extids[0].extid)) | |||||
for k, v in rev["extra_headers"]: | for k, v in rev["extra_headers"]: | ||||
if k == b"transplant_source": | if k == b"transplant_source": | ||||
transplant_sources.add(v.decode("ascii")) | transplant_sources.add(v.decode("ascii")) | ||||
# check extracted data are valid | # check extracted data are valid | ||||
assert len(hg_changesets) > 0 | assert len(hg_changesets) > 0 | ||||
assert len(transplant_sources) > 0 | assert len(transplant_sources) > 0 | ||||
assert transplant_sources.issubset(hg_changesets) | assert transplant_sources <= hg_changesets | ||||
def _partial_copy_storage( | def _partial_copy_storage( | ||||
old_storage, origin_url: str, mechanism: str, copy_revisions: bool | old_storage, origin_url: str, mechanism: str, copy_revisions: bool | ||||
): | ): | ||||
"""Create a new storage, and only copy ExtIDs or head revisions to it.""" | """Create a new storage, and only copy ExtIDs or head revisions to it.""" | ||||
new_storage = get_storage(cls="memory") | new_storage = get_storage(cls="memory") | ||||
snapshot = snapshot_get_latest(old_storage, origin_url) | snapshot = snapshot_get_latest(old_storage, origin_url) | ||||
assert snapshot | assert snapshot | ||||
heads = [branch.target for branch in snapshot.branches.values()] | heads = [branch.target for branch in snapshot.branches.values()] | ||||
if mechanism == "extid": | if mechanism == "extid": | ||||
extids = old_storage.extid_get_from_target(ObjectType.REVISION, heads) | extids = old_storage.extid_get_from_target(ObjectType.REVISION, heads) | ||||
new_storage.extid_add(extids) | new_storage.extid_add(extids) | ||||
if copy_revisions: | if copy_revisions: | ||||
# copy revisions, but erase their metadata to make sure the loader doesn't | # copy revisions, but erase their metadata to make sure the loader doesn't | ||||
# fallback to revision.metadata["nodeid"] | # fallback to revision.metadata["nodeid"] | ||||
revisions = [ | revisions = [ | ||||
attr.evolve(rev, metadata={}) | attr.evolve(rev, metadata={}) | ||||
for rev in old_storage.revision_get(heads) | for rev in old_storage.revision_get(heads) | ||||
if rev | if rev | ||||
] | ] | ||||
new_storage.revision_add(revisions) | new_storage.revision_add(revisions) | ||||
elif mechanism == "revision metadata": | |||||
assert ( | |||||
copy_revisions | |||||
), "copy_revisions must be True if mechanism='revision metadata'" | |||||
revisions = [rev for rev in old_storage.revision_get(heads) if rev] | |||||
new_storage.revision_add(revisions) | |||||
else: | else: | ||||
assert mechanism == "same storage" | assert mechanism == "same storage" | ||||
return old_storage | return old_storage | ||||
# copy origin, visit, status | # copy origin, visit, status | ||||
new_storage.origin_add(old_storage.origin_get([origin_url])) | new_storage.origin_add(old_storage.origin_get([origin_url])) | ||||
visit = old_storage.origin_visit_get_latest(origin_url) | visit = old_storage.origin_visit_get_latest(origin_url) | ||||
new_storage.origin_visit_add([visit]) | new_storage.origin_visit_add([visit]) | ||||
statuses = old_storage.origin_visit_status_get(origin_url, visit.visit).results | statuses = old_storage.origin_visit_status_get(origin_url, visit.visit).results | ||||
new_storage.origin_visit_status_add(statuses) | new_storage.origin_visit_status_add(statuses) | ||||
new_storage.snapshot_add([snapshot]) | new_storage.snapshot_add([snapshot]) | ||||
return new_storage | return new_storage | ||||
@pytest.mark.parametrize("mechanism", ("extid", "revision metadata", "same storage")) | @pytest.mark.parametrize("mechanism", ("extid", "same storage")) | ||||
def test_load_unchanged_repo_should_be_uneventful( | def test_load_unchanged_repo_should_be_uneventful( | ||||
swh_storage, datadir, tmp_path, mechanism | swh_storage, datadir, tmp_path, mechanism | ||||
): | ): | ||||
"""Checks the loader can find which revisions it already loaded, using either | """Checks the loader can find which revisions it already loaded, using ExtIDs.""" | ||||
ExtIDs or revision metadata.""" | |||||
archive_name = "hello" | archive_name = "hello" | ||||
archive_path = os.path.join(datadir, f"{archive_name}.tgz") | archive_path = os.path.join(datadir, f"{archive_name}.tgz") | ||||
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | ||||
repo_path = repo_url.replace("file://", "") | repo_path = repo_url.replace("file://", "") | ||||
loader = HgLoaderFromDisk(swh_storage, repo_path) | loader = HgLoaderFromDisk(swh_storage, repo_path) | ||||
assert loader.load() == {"status": "eventful"} | assert loader.load() == {"status": "eventful"} | ||||
▲ Show 20 Lines • Show All 127 Lines • Show Last 20 Lines |