diff --git a/swh/loader/mercurial/from_disk.py b/swh/loader/mercurial/from_disk.py --- a/swh/loader/mercurial/from_disk.py +++ b/swh/loader/mercurial/from_disk.py @@ -13,9 +13,11 @@ from swh.loader.core.loader import BaseLoader from swh.loader.core.utils import clean_dangling_folders from swh.loader.mercurial.utils import parse_visit_date +from swh.model import identifiers from swh.model.from_disk import Content, DentryPerms, Directory from swh.model.hashutil import hash_to_bytehex, hash_to_bytes from swh.model.model import ( + ExtID, ObjectType, Origin, Person, @@ -44,6 +46,8 @@ TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.mercurial.from_disk" +EXTID_TYPE = "hg-nodeid" + T = TypeVar("T") @@ -198,19 +202,63 @@ latest_snapshot = snapshot_get_latest(self.storage, self.origin_url) if latest_snapshot: - # TODO: add support for releases - snapshot_branches = [ - branch.target - for branch in latest_snapshot.branches.values() - if branch.target_type == TargetType.REVISION - ] + self._set_latest_heads(latest_snapshot) + + def _set_latest_heads(self, latest_snapshot: Snapshot) -> None: + """ + Looks up the nodeid for all revisions in the snapshot, and adds them to + self._latest_heads. + + This works in two steps: + + 1. Query the revisions with extid_get_from_target, to find nodeids from + revision ids, using the new ExtID architecture + 2. For all revisions that were not found this way, fetch the revision + and look for the nodeid in its metadata. + + This is a temporary process. When we are done migrating away from revision + metadata, step 2 will be removed. + """ + # TODO: add support for releases + snapshot_branches = [ + branch.target + for branch in latest_snapshot.branches.values() + if branch.target_type == TargetType.REVISION + ] + + # Get all ExtIDs for revisions in the latest snapshot + extids = self.storage.extid_get_from_target( + identifiers.ObjectType.REVISION, snapshot_branches + ) - self._latest_heads = [ - hash_to_bytes(revision.metadata["node"]) - for revision in self.storage.revision_get(snapshot_branches) - if revision and revision.metadata + # Filter out extids not specific to Mercurial + extids = [extid for extid in extids if extid.extid_type == EXTID_TYPE] + + if extids: + # Filter out dangling extids, we need to load their target again + revisions_missing = self.storage.revision_missing( + [extid.target.object_id for extid in extids] + ) + extids = [ + extid + for extid in extids + if extid.target.object_id not in revisions_missing ] + # Add the found nodeids to self.latest_heads + self._latest_heads.extend(extid.extid for extid in extids) + + # For each revision without a nodeid, get the revision metadata + # to see if it is found there. + found_revisions = {extid.target.object_id for extid in extids if extid} + revisions_without_extid = list(set(snapshot_branches) - found_revisions) + + self._latest_heads.extend( + hash_to_bytes(revision.metadata["node"]) + for revision in self.storage.revision_get(revisions_without_extid) + if revision and revision.metadata + ) + def fetch_data(self) -> bool: """Fetch the data from the source the loader is currently loading @@ -298,6 +346,8 @@ snapshot_branches: Dict[bytes, SnapshotBranch] = {} + extids = [] + for hg_nodeid, revision_swhid in self._revision_nodeid_to_swhid.items(): tag_name = tags_by_hg_nodeid.get(hg_nodeid) @@ -322,9 +372,26 @@ target=name, target_type=TargetType.ALIAS, ) + # TODO: do not write an ExtID if we got this branch from an ExtID that + # already exists. + # When we are done migrating away from revision metadata, this will + # be as simple as checking if the target is in self._latest_heads + extids.append( + ExtID( + extid_type=EXTID_TYPE, + extid=hg_nodeid, + target=identifiers.CoreSWHID( + object_type=identifiers.ObjectType.REVISION, + object_id=revision_swhid, + ), + ) + ) + snapshot = Snapshot(branches=snapshot_branches) self.storage.snapshot_add([snapshot]) + self.storage.extid_add(extids) + self.flush() self.loaded_snapshot_id = snapshot.id diff --git a/swh/loader/mercurial/tests/test_from_disk.py b/swh/loader/mercurial/tests/test_from_disk.py --- a/swh/loader/mercurial/tests/test_from_disk.py +++ b/swh/loader/mercurial/tests/test_from_disk.py @@ -7,6 +7,9 @@ from hashlib import sha1 import os +import attr +import pytest + from swh.loader.mercurial.utils import parse_visit_date from swh.loader.tests import ( assert_last_visit_matches, @@ -16,7 +19,9 @@ ) from swh.model.from_disk import Content, DentryPerms from swh.model.hashutil import hash_to_bytes +from swh.model.identifiers import ObjectType from swh.model.model import RevisionType, Snapshot, SnapshotBranch, TargetType +from swh.storage import get_storage from swh.storage.algos.snapshot import snapshot_get_latest from ..from_disk import HgDirectory, HgLoaderFromDisk @@ -248,7 +253,56 @@ assert transplant_sources.issubset(hg_changesets) -def test_load_unchanged_repo_should_be_uneventfull(swh_storage, datadir, tmp_path): +def _partial_copy_storage( + old_storage, origin_url: str, mechanism: str, copy_revisions: bool +): + """Create a new storage, and only copy ExtIDs or head revisions to it.""" + new_storage = get_storage(cls="memory") + snapshot = snapshot_get_latest(old_storage, origin_url) + assert snapshot + heads = [branch.target for branch in snapshot.branches.values()] + + if mechanism == "extid": + extids = old_storage.extid_get_from_target(ObjectType.REVISION, heads) + new_storage.extid_add(extids) + if copy_revisions: + # copy revisions, but erase their metadata to make sure the loader doesn't + # fallback to revision.metadata["nodeid"] + revisions = [ + attr.evolve(rev, metadata={}) + for rev in old_storage.revision_get(heads) + if rev + ] + new_storage.revision_add(revisions) + + elif mechanism == "revision metadata": + assert ( + copy_revisions + ), "copy_revisions must be True if mechanism='revision metadata'" + revisions = [rev for rev in old_storage.revision_get(heads) if rev] + new_storage.revision_add(revisions) + + else: + assert mechanism == "same storage" + return old_storage + + # copy origin, visit, status + new_storage.origin_add(old_storage.origin_get([origin_url])) + visit = old_storage.origin_visit_get_latest(origin_url) + new_storage.origin_visit_add([visit]) + statuses = old_storage.origin_visit_status_get(origin_url, visit.visit).results + new_storage.origin_visit_status_add(statuses) + new_storage.snapshot_add([snapshot]) + + return new_storage + + +@pytest.mark.parametrize("mechanism", ("extid", "revision metadata", "same storage")) +def test_load_unchanged_repo_should_be_uneventful( + swh_storage, datadir, tmp_path, mechanism +): + """Checks the loader can find which revisions it already loaded, using either + ExtIDs or revision metadata.""" archive_name = "hello" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) @@ -268,7 +322,93 @@ "snapshot": 1, } + old_storage = swh_storage + + # Create a new storage, and only copy ExtIDs or head revisions to it. + # This should be enough for the loader to know revisions were already loaded + new_storage = _partial_copy_storage( + old_storage, repo_path, mechanism=mechanism, copy_revisions=True + ) + + # Create a new loader (to start with a clean slate, eg. remove the caches), + # with the new, partial, storage + loader = HgLoaderFromDisk(new_storage, repo_path) assert loader.load() == {"status": "uneventful"} + + if mechanism == "same storage": + # Should have all the objects + assert get_stats(loader.storage) == { + "content": 3, + "directory": 3, + "origin": 1, + "origin_visit": 2, + "release": 1, + "revision": 3, + "skipped_content": 0, + "snapshot": 1, + } + else: + # Should have only the objects we directly inserted from the test, plus + # a new visit + assert get_stats(loader.storage) == { + "content": 0, + "directory": 0, + "origin": 1, + "origin_visit": 2, + "release": 0, + "revision": 1, + "skipped_content": 0, + "snapshot": 1, + } + + +def test_load_unchanged_repo__dangling_extid(swh_storage, datadir, tmp_path): + """Checks the loader will load revisions targeted by an ExtID if the + revisions are missing from the storage""" + archive_name = "hello" + archive_path = os.path.join(datadir, f"{archive_name}.tgz") + repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) + repo_path = repo_url.replace("file://", "") + + loader = HgLoaderFromDisk(swh_storage, repo_path) + + assert loader.load() == {"status": "eventful"} + assert get_stats(loader.storage) == { + "content": 3, + "directory": 3, + "origin": 1, + "origin_visit": 1, + "release": 1, + "revision": 3, + "skipped_content": 0, + "snapshot": 1, + } + + old_storage = swh_storage + + # Create a new storage, and only copy ExtIDs or head revisions to it. + # This should be enough for the loader to know revisions were already loaded + new_storage = _partial_copy_storage( + old_storage, repo_path, mechanism="extid", copy_revisions=False + ) + + # Create a new loader (to start with a clean slate, eg. remove the caches), + # with the new, partial, storage + loader = HgLoaderFromDisk(new_storage, repo_path) + + assert get_stats(loader.storage) == { + "content": 0, + "directory": 0, + "origin": 1, + "origin_visit": 1, + "release": 0, + "revision": 0, + "skipped_content": 0, + "snapshot": 1, + } + + assert loader.load() == {"status": "eventful"} + assert get_stats(loader.storage) == { "content": 3, "directory": 3,