diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -16,6 +16,7 @@ hooks: - id: codespell exclude: ^(swh/loader/package/.*[/]+tests/data/.*)$ + entry: codespell --ignore-words-list=iff - repo: local hooks: diff --git a/swh/loader/package/loader.py b/swh/loader/package/loader.py --- a/swh/loader/package/loader.py +++ b/swh/loader/package/loader.py @@ -22,6 +22,7 @@ Mapping, Optional, Sequence, + Set, Tuple, TypeVar, ) @@ -287,6 +288,93 @@ return None + def _get_known_extids( + self, packages_info: List[TPackageInfo] + ) -> Dict[PartialExtID, List[CoreSWHID]]: + """Compute the ExtIDs from new PackageInfo objects, searches which are already + loaded in the archive, and returns them if any.""" + + # Compute the ExtIDs of all the new packages, grouped by extid type + new_extids: Dict[str, List[bytes]] = {} + for p_info in packages_info: + res = p_info.extid() + if res is not None: + (extid_type, extid_extid) = res + new_extids.setdefault(extid_type, []).append(extid_extid) + + # For each extid type, call extid_get_from_extid() with all the extids of + # that type, and store them in the '(type, extid) -> target' map. + known_extids: Dict[PartialExtID, List[CoreSWHID]] = {} + for (extid_type, extids) in new_extids.items(): + for extid in self.storage.extid_get_from_extid(extid_type, extids): + if extid is not None: + key = (extid.extid_type, extid.extid) + known_extids.setdefault(key, []).append(extid.target) + + return known_extids + + def resolve_revision_from_extids( + self, + known_extids: Dict[PartialExtID, List[CoreSWHID]], + p_info: TPackageInfo, + revision_whitelist: Set[Sha1Git], + ) -> Optional[Sha1Git]: + """Resolve the revision from known ExtIDs and a package info object. + + If the artifact has already been downloaded, this will return the + existing revision targeting that uncompressed artifact directory. + Otherwise, this returns None. + + Args: + known_extids: Dict built from a list of ExtID, with the target as value + p_info: Package information + revision_whitelist: Any ExtID with target not in this set is filtered out + + Returns: + None or revision identifier + + """ + new_extid = p_info.extid() + if new_extid is None: + return None + + for extid_target in known_extids.get(new_extid, []): + if extid_target.object_id not in revision_whitelist: + # There is a known ExtID for this package, but its target is not + # in the snapshot. + # This can happen for three reasons: + # + # 1. a loader crashed after writing the ExtID, but before writing + # the snapshot + # 2. some other loader loaded the same artifact, but produced + # a different revision, causing an additional ExtID object + # to be written. We will probably find this loader's ExtID + # in a future iteration of this loop. + # Note that for now, this is impossible, as each loader has a + # completely different extid_type, but this is an implementation + # detail of each loader. + # 3. we took a snapshot, then the package disappeared, + # then we took another snapshot, and the package reappeared + # + # In case of 1, we must actually load the package now, + # so let's do it. + # TODO: detect when we are in case 3 using revision_missing instead + # of the snapshot. + continue + elif extid_target.object_type != ObjectType.REVISION: + # We only support revisions for now. + # Note that this case should never be reached unless there is a + # collision between a revision hash and some non-revision object's + # hash, but better safe than sorry. + logger.warning( + "%s is in the revision whitelist, but is not a revision.", + hash_to_hex(extid_target.object_type), + ) + continue + return extid_target.object_id + + return None + def download_package( self, p_info: TPackageInfo, tmpdir: str ) -> List[Tuple[str, Mapping]]: @@ -441,6 +529,8 @@ sentry_sdk.capture_exception(e) return {"status": "failed"} + # Get the previous snapshot for this origin. It is then used to see which + # of the package's versions are already loaded in the archive. try: last_snapshot = self.last_snapshot() logger.debug("last snapshot: %s", last_snapshot) @@ -459,6 +549,7 @@ load_exceptions: List[Exception] = [] + # Get the list of all version names try: versions = self.get_versions() except NotFound: @@ -478,19 +569,46 @@ status_load="failed", ) - packages_info = [ + # Get the metadata of each version's package + packages_info: List[Tuple[str, str, TPackageInfo]] = [ (version, branch_name, p_info) for version in versions for (branch_name, p_info) in self.get_package_info(version) ] + # Compute the ExtID of each of these packages + known_extids = self._get_known_extids( + [p_info for (_, _, p_info) in packages_info] + ) + + if last_snapshot is None: + last_snapshot_targets: Set[Sha1Git] = set() + else: + last_snapshot_targets = { + branch.target for branch in last_snapshot.branches.values() + } + tmp_revisions: Dict[str, List[Tuple[str, Sha1Git]]] = { version: [] for version in versions } for (version, branch_name, p_info) in packages_info: logger.debug("package_info: %s", p_info) - revision_id = self.resolve_revision_from_artifacts(known_artifacts, p_info) + + # Check if the package was already loaded, using its ExtID + revision_id = self.resolve_revision_from_extids( + known_extids, p_info, last_snapshot_targets + ) + + if revision_id is None: + # No existing revision found from an acceptable ExtID, + # search in the artifact data instead. + # TODO: remove this after we finished migrating to ExtIDs. + revision_id = self.resolve_revision_from_artifacts( + known_artifacts, p_info + ) + if revision_id is None: + # No matching revision found in the last snapshot, load it. try: res = self._load_revision(p_info, origin) if res: diff --git a/swh/loader/package/tests/test_loader.py b/swh/loader/package/tests/test_loader.py --- a/swh/loader/package/tests/test_loader.py +++ b/swh/loader/package/tests/test_loader.py @@ -3,14 +3,27 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import datetime import hashlib import string -from unittest.mock import Mock +from unittest.mock import Mock, call, patch import attr import pytest from swh.loader.package.loader import BasePackageInfo, PackageLoader +from swh.model.identifiers import CoreSWHID, ObjectType +from swh.model.model import ( + ExtID, + Origin, + OriginVisit, + OriginVisitStatus, + Snapshot, + SnapshotBranch, + TargetType, +) +from swh.storage import get_storage +from swh.storage.algos.snapshot import snapshot_get_latest class FakeStorage: @@ -29,6 +42,31 @@ raise ValueError("We refuse to add an origin visit") +class StubPackageInfo(BasePackageInfo): + pass + + +class StubPackageLoader(PackageLoader[StubPackageInfo]): + def get_versions(self): + return ["v1.0", "v2.0", "v3.0", "v4.0"] + + def get_package_info(self, version): + p_info = StubPackageInfo("http://example.org", f"example-{version}.tar") + extid_type = "extid-type1" if version in ("v1.0", "v2.0") else "extid-type2" + # Versions 1.0 and 2.0 have an extid of a given type, v3.0 has an extid + # of a different type + patch.object( + p_info, + "extid", + return_value=(extid_type, f"extid-of-{version}".encode()), + autospec=True, + ).start() + yield (f"branch-{version}", p_info) + + def _load_revision(self, p_info, origin): + return None + + def test_loader_origin_visit_failure(swh_storage): """Failure to add origin or origin visit should failed immediately @@ -90,6 +128,174 @@ loader.known_artifact_to_extid.assert_called_once_with({"key": "extid-of-aaaa"}) +def test_resolve_revision_from_extids() -> None: + loader = PackageLoader(None, None) # type: ignore + + p_info = Mock(wraps=BasePackageInfo(None, None)) # type: ignore + + # The PackageInfo does not support extids + p_info.extid.return_value = None + known_extids = { + ("extid-type", b"extid-of-aaaa"): [ + CoreSWHID(object_type=ObjectType.REVISION, object_id=b"a" * 20), + ] + } + revision_whitelist = {b"unused"} + assert ( + loader.resolve_revision_from_extids(known_extids, p_info, revision_whitelist) + is None + ) + + # Some known extid, and the PackageInfo is not one of them (ie. cache miss) + p_info.extid.return_value = ("extid-type", b"extid-of-cccc") + assert ( + loader.resolve_revision_from_extids(known_extids, p_info, revision_whitelist) + is None + ) + + # Some known extid, and the PackageInfo is one of them (ie. cache hit), + # but the target revision was not in the previous snapshot + p_info.extid.return_value = ("extid-type", b"extid-of-aaaa") + assert ( + loader.resolve_revision_from_extids(known_extids, p_info, revision_whitelist) + is None + ) + + # Some known extid, and the PackageInfo is one of them (ie. cache hit), + # and the target revision was in the previous snapshot + revision_whitelist = {b"a" * 20} + assert ( + loader.resolve_revision_from_extids(known_extids, p_info, revision_whitelist) + == b"a" * 20 + ) + + # Same as before, but there is more than one extid, and only one is an allowed + # revision + revision_whitelist = {b"a" * 20} + known_extids = { + ("extid-type", b"extid-of-aaaa"): [ + CoreSWHID(object_type=ObjectType.REVISION, object_id=b"b" * 20), + CoreSWHID(object_type=ObjectType.REVISION, object_id=b"a" * 20), + ] + } + assert ( + loader.resolve_revision_from_extids(known_extids, p_info, revision_whitelist) + == b"a" * 20 + ) + + +def test_load_get_known_extids() -> None: + """Checks PackageLoader.load() fetches known extids efficiently""" + storage = Mock(wraps=get_storage("memory")) + + loader = StubPackageLoader(storage, "http://example.org") + + loader.load() + + # Calls should be grouped by extid type + storage.extid_get_from_extid.assert_has_calls( + [ + call("extid-type1", [b"extid-of-v1.0", b"extid-of-v2.0"]), + call("extid-type2", [b"extid-of-v3.0", b"extid-of-v4.0"]), + ], + any_order=True, + ) + + +def test_load_skip_extids() -> None: + """Checks PackageLoader.load() skips iff it should.""" + storage = get_storage("memory") + + origin = "http://example.org" + rev1_swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=b"a" * 20) + rev2_swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=b"b" * 20) + rev3_swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=b"c" * 20) + rev4_swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=b"d" * 20) + dir_swhid = CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=b"e" * 20) + + loader = StubPackageLoader(storage, "http://example.org") + patch.object( + loader, + "_load_revision", + return_value=(rev4_swhid.object_id, dir_swhid.object_id), + autospec=True, + ).start() + + # Results of a previous load + storage.extid_add( + [ + ExtID("extid-type1", b"extid-of-v1.0", rev1_swhid), + ExtID("extid-type2", b"extid-of-v2.0", rev2_swhid), + ] + ) + last_snapshot = Snapshot( + branches={ + b"v1.0": SnapshotBranch( + target_type=TargetType.REVISION, target=rev1_swhid.object_id + ), + b"v2.0": SnapshotBranch( + target_type=TargetType.REVISION, target=rev2_swhid.object_id + ), + b"v3.0": SnapshotBranch( + target_type=TargetType.REVISION, target=rev3_swhid.object_id + ), + } + ) + storage.snapshot_add([last_snapshot]) + date = datetime.datetime.now(tz=datetime.timezone.utc) + storage.origin_add([Origin(url=origin)]) + storage.origin_visit_add( + [OriginVisit(origin="http://example.org", visit=1, date=date, type="tar")] + ) + storage.origin_visit_status_add( + [ + OriginVisitStatus( + origin=origin, + visit=1, + status="full", + date=date, + snapshot=last_snapshot.id, + ) + ] + ) + + loader.load() + + assert loader._load_revision.mock_calls == [ # type: ignore + # v1.0: not loaded because there is already its (extid_type, extid, rev) + # in the storage. + # v2.0: loaded, because there is already a similar extid, but different type + call(StubPackageInfo(origin, "example-v2.0.tar"), Origin(url=origin)), + # v3.0: loaded despite having an (extid_type, extid) in storage, because + # the target of the extid is not in the previous snapshot + call(StubPackageInfo(origin, "example-v3.0.tar"), Origin(url=origin)), + # v4.0: loaded, because there isn't its extid + call(StubPackageInfo(origin, "example-v4.0.tar"), Origin(url=origin)), + ] + + # then check the snapshot has all the branches. + # versions 2.0 to 4.0 all point to rev4_swhid (instead of the value of the last + # snapshot), because they had to be loaded (mismatched extid), and the mocked + # _load_revision always returns rev4_swhid. + snapshot = Snapshot( + branches={ + b"branch-v1.0": SnapshotBranch( + target_type=TargetType.REVISION, target=rev1_swhid.object_id + ), + b"branch-v2.0": SnapshotBranch( + target_type=TargetType.REVISION, target=rev4_swhid.object_id + ), + b"branch-v3.0": SnapshotBranch( + target_type=TargetType.REVISION, target=rev4_swhid.object_id + ), + b"branch-v4.0": SnapshotBranch( + target_type=TargetType.REVISION, target=rev4_swhid.object_id + ), + } + ) + assert snapshot_get_latest(storage, origin) == snapshot + + def test_manifest_extid(): """Compute primary key should return the right identity