diff --git a/swh/loader/package/debian/loader.py b/swh/loader/package/debian/loader.py --- a/swh/loader/package/debian/loader.py +++ b/swh/loader/package/debian/loader.py @@ -8,17 +8,7 @@ from os import path import re import subprocess -from typing import ( - Any, - Dict, - FrozenSet, - Iterator, - List, - Mapping, - Optional, - Sequence, - Tuple, -) +from typing import Any, Dict, Iterator, List, Mapping, Optional, Sequence, Tuple import attr from dateutil.parser import parse as parse_date @@ -253,24 +243,38 @@ if not artifacts_to_fetch: return None - def to_set(data: DebianPackageInfo) -> FrozenSet[Tuple[str, str, int]]: - return frozenset( - (name, meta.sha256, meta.size) for name, meta in data.files.items() + new_dsc_files = [ + file for (name, file) in p_info.files.items() if name.endswith(".dsc") + ] + + if len(new_dsc_files) != 1: + raise ValueError( + f"Expected exactly one new .dsc file for package {p_info.name}, " + f"got {len(new_dsc_files)}" ) - # what we want to avoid downloading back if we have them already - set_new_artifacts = to_set(p_info) + new_dsc_sha256 = new_dsc_files[0].sha256 - known_artifacts_revision_id = {} for rev_id, known_artifacts in known_package_artifacts.items(): extrinsic = known_artifacts.get("extrinsic") if not extrinsic: continue - s = to_set(DebianPackageInfo.from_metadata(extrinsic["raw"], url=p_info.url)) - known_artifacts_revision_id[s] = rev_id + known_p_info = DebianPackageInfo.from_metadata(extrinsic["raw"], url=p_info.url) + dsc = [ + file for (name, file) in known_p_info.files.items() if name.endswith(".dsc") + ] + + if len(dsc) != 1: + raise ValueError( + f"Expected exactly one known .dsc file for package {p_info.name}, " + f"got {len(dsc)}" + ) + + if new_dsc_sha256 == dsc[0].sha256: + return rev_id - return known_artifacts_revision_id.get(set_new_artifacts) + return None def uid_to_person(uid: str) -> Dict[str, str]: diff --git a/swh/loader/package/debian/tests/test_debian.py b/swh/loader/package/debian/tests/test_debian.py --- a/swh/loader/package/debian/tests/test_debian.py +++ b/swh/loader/package/debian/tests/test_debian.py @@ -497,3 +497,58 @@ actual_revision = resolve_revision_from(known_package_artifacts, p_info) assert actual_revision == expected_revision_id + + +def test_debian_resolve_revision_from_corrupt_known_artifact(): + """To many or not enough .dsc files in the known_artifacts dict""" + artifact_metadata = PACKAGE_FILES + p_info = DebianPackageInfo.from_metadata(artifact_metadata, url=URL) + expected_revision_id = ( + b"(\x07\xf5\xb3\xf8Ch\xb4\x88\x9a\x9a\xe8'\xfe\x85\x85O\xfe\xcf\x07" + ) + + files = dict(artifact_metadata["files"]) + package_files = { + "name": PACKAGE_FILES["name"], + "version": PACKAGE_FILES["version"], + "files": files, + } + + known_package_artifacts = { + expected_revision_id: { + "extrinsic": {"raw": package_files,}, + # ... removed the unnecessary intermediary data + } + } + + # Too many .dsc + files["another.dsc"] = files["cicero_0.7.2-3.dsc"] + with pytest.raises(ValueError, match="exactly one known .dsc"): + resolve_revision_from(known_package_artifacts, p_info) + + # Not enough .dsc + del files["another.dsc"] + del files["cicero_0.7.2-3.dsc"] + with pytest.raises(ValueError, match="exactly one known .dsc"): + resolve_revision_from(known_package_artifacts, p_info) + + +def test_debian_resolve_revision_from_corrupt_new_artifact(): + artifact_metadata = PACKAGE_FILES + + files = PACKAGE_FILES["files"] + files = {**files, "another.dsc": files["cicero_0.7.2-3.dsc"]} + artifact_metadata = {**PACKAGE_FILES, "files": files} + + # Too many .dsc + files["another.dsc"] = files["cicero_0.7.2-3.dsc"] + p_info = DebianPackageInfo.from_metadata(artifact_metadata, url=URL) + with pytest.raises(ValueError, match="exactly one new .dsc"): + resolve_revision_from(PACKAGE_FILES, p_info) + + # Not enough .dsc + del files["another.dsc"] + del files["cicero_0.7.2-3.dsc"] + p_info = DebianPackageInfo.from_metadata(artifact_metadata, url=URL) + with pytest.raises(ValueError, match="exactly one new .dsc"): + resolve_revision_from(PACKAGE_FILES, p_info)