Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/debian/tests/test_debian.py
Show All 13 Lines | from swh.loader.package.debian.loader import ( | ||||
DebianPackageChangelog, | DebianPackageChangelog, | ||||
DebianPackageInfo, | DebianPackageInfo, | ||||
IntrinsicPackageMetadata, | IntrinsicPackageMetadata, | ||||
download_package, | download_package, | ||||
dsc_information, | dsc_information, | ||||
extract_package, | extract_package, | ||||
get_intrinsic_package_metadata, | get_intrinsic_package_metadata, | ||||
prepare_person, | prepare_person, | ||||
resolve_revision_from, | |||||
uid_to_person, | uid_to_person, | ||||
) | ) | ||||
from swh.loader.tests import assert_last_visit_matches, check_snapshot, get_stats | from swh.loader.tests import assert_last_visit_matches, check_snapshot, get_stats | ||||
from swh.model.hashutil import hash_to_bytes | from swh.model.hashutil import hash_to_bytes | ||||
from swh.model.model import Person, Snapshot, SnapshotBranch, TargetType | from swh.model.model import Person, Snapshot, SnapshotBranch, TargetType | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
▲ Show 20 Lines • Show All 382 Lines • ▼ Show 20 Lines | def test_debian_multiple_packages(swh_storage, requests_mock_datadir): | ||||
check_snapshot(expected_snapshot, swh_storage) | check_snapshot(expected_snapshot, swh_storage) | ||||
def test_debian_resolve_revision_from_edge_cases(): | def test_debian_resolve_revision_from_edge_cases(): | ||||
"""Solving revision with empty data will result in unknown revision | """Solving revision with empty data will result in unknown revision | ||||
""" | """ | ||||
loader = DebianLoader(None, None, None, None) | |||||
empty_artifact = { | empty_artifact = { | ||||
"name": PACKAGE_FILES["name"], | "name": PACKAGE_FILES["name"], | ||||
"version": PACKAGE_FILES["version"], | "version": PACKAGE_FILES["version"], | ||||
} | } | ||||
for package_artifacts in [empty_artifact, PACKAGE_FILES]: | for package_artifacts in [empty_artifact, PACKAGE_FILES]: | ||||
p_info = DebianPackageInfo.from_metadata(package_artifacts, url=URL) | p_info = DebianPackageInfo.from_metadata(package_artifacts, url=URL) | ||||
actual_revision = resolve_revision_from({}, p_info) | actual_revision = loader.resolve_revision_from({}, p_info) | ||||
assert actual_revision is None | assert actual_revision is None | ||||
for known_artifacts in [{}, PACKAGE_FILES]: | for known_artifacts in [{}, PACKAGE_FILES]: | ||||
actual_revision = resolve_revision_from( | actual_revision = loader.resolve_revision_from( | ||||
known_artifacts, DebianPackageInfo.from_metadata(empty_artifact, url=URL) | known_artifacts, DebianPackageInfo.from_metadata(empty_artifact, url=URL) | ||||
) | ) | ||||
assert actual_revision is None | assert actual_revision is None | ||||
known_package_artifacts = { | known_package_artifacts = { | ||||
b"(\x07\xf5\xb3\xf8Ch\xb4\x88\x9a\x9a\xe8'\xfe\x85\x85O\xfe\xcf\x07": { | b"(\x07\xf5\xb3\xf8Ch\xb4\x88\x9a\x9a\xe8'\xfe\x85\x85O\xfe\xcf\x07": { | ||||
"extrinsic": { | "extrinsic": { | ||||
# empty | # empty | ||||
}, | }, | ||||
# ... removed the unnecessary intermediary data | # ... removed the unnecessary intermediary data | ||||
} | } | ||||
} | } | ||||
assert not resolve_revision_from( | assert not loader.resolve_revision_from( | ||||
known_package_artifacts, DebianPackageInfo.from_metadata(PACKAGE_FILES, url=URL) | known_package_artifacts, DebianPackageInfo.from_metadata(PACKAGE_FILES, url=URL) | ||||
) | ) | ||||
def test_debian_resolve_revision_from_edge_cases_hit_and_miss(): | def test_debian_resolve_revision_from_edge_cases_hit_and_miss(): | ||||
"""Solving revision with inconsistent data will result in unknown revision | """Solving revision with inconsistent data will result in unknown revision | ||||
""" | """ | ||||
loader = DebianLoader(None, None, None, None) | |||||
artifact_metadata = PACKAGE_FILES2 | artifact_metadata = PACKAGE_FILES2 | ||||
p_info = DebianPackageInfo.from_metadata(artifact_metadata, url=URL) | p_info = DebianPackageInfo.from_metadata(artifact_metadata, url=URL) | ||||
expected_revision_id = ( | expected_revision_id = ( | ||||
b"(\x08\xf5\xb3\xf8Ch\xb4\x88\x9a\x9a\xe8'\xff\x85\x85O\xfe\xcf\x07" # noqa | b"(\x08\xf5\xb3\xf8Ch\xb4\x88\x9a\x9a\xe8'\xff\x85\x85O\xfe\xcf\x07" # noqa | ||||
) | ) | ||||
known_package_artifacts = { | known_package_artifacts = { | ||||
expected_revision_id: { | expected_revision_id: { | ||||
"extrinsic": {"raw": PACKAGE_FILES,}, | "extrinsic": {"raw": PACKAGE_FILES,}, | ||||
# ... removed the unnecessary intermediary data | # ... removed the unnecessary intermediary data | ||||
} | } | ||||
} | } | ||||
actual_revision = resolve_revision_from(known_package_artifacts, p_info) | actual_revision = loader.resolve_revision_from(known_package_artifacts, p_info) | ||||
assert actual_revision is None | assert actual_revision is None | ||||
def test_debian_resolve_revision_from(): | def test_debian_resolve_revision_from(): | ||||
"""Solving revision with consistent data will solve the revision | """Solving revision with consistent data will solve the revision | ||||
""" | """ | ||||
loader = DebianLoader(None, None, None, None) | |||||
artifact_metadata = PACKAGE_FILES | artifact_metadata = PACKAGE_FILES | ||||
p_info = DebianPackageInfo.from_metadata(artifact_metadata, url=URL) | p_info = DebianPackageInfo.from_metadata(artifact_metadata, url=URL) | ||||
expected_revision_id = ( | expected_revision_id = ( | ||||
b"(\x07\xf5\xb3\xf8Ch\xb4\x88\x9a\x9a\xe8'\xfe\x85\x85O\xfe\xcf\x07" # noqa | b"(\x07\xf5\xb3\xf8Ch\xb4\x88\x9a\x9a\xe8'\xfe\x85\x85O\xfe\xcf\x07" # noqa | ||||
) | ) | ||||
files = artifact_metadata["files"] | files = artifact_metadata["files"] | ||||
# shuffling dict's keys | # shuffling dict's keys | ||||
keys = list(files.keys()) | keys = list(files.keys()) | ||||
random.shuffle(keys) | random.shuffle(keys) | ||||
package_files = { | package_files = { | ||||
"name": PACKAGE_FILES["name"], | "name": PACKAGE_FILES["name"], | ||||
"version": PACKAGE_FILES["version"], | "version": PACKAGE_FILES["version"], | ||||
"files": {k: files[k] for k in keys}, | "files": {k: files[k] for k in keys}, | ||||
} | } | ||||
known_package_artifacts = { | known_package_artifacts = { | ||||
expected_revision_id: { | expected_revision_id: { | ||||
"extrinsic": {"raw": package_files,}, | "extrinsic": {"raw": package_files,}, | ||||
# ... removed the unnecessary intermediary data | # ... removed the unnecessary intermediary data | ||||
} | } | ||||
} | } | ||||
actual_revision = resolve_revision_from(known_package_artifacts, p_info) | actual_revision = loader.resolve_revision_from(known_package_artifacts, p_info) | ||||
assert actual_revision == expected_revision_id | assert actual_revision == expected_revision_id | ||||
def test_debian_resolve_revision_from_corrupt_known_artifact(): | def test_debian_resolve_revision_from_corrupt_known_artifact(): | ||||
"""To many or not enough .dsc files in the known_artifacts dict""" | """To many or not enough .dsc files in the known_artifacts dict""" | ||||
loader = DebianLoader(None, None, None, None) | |||||
artifact_metadata = PACKAGE_FILES | artifact_metadata = PACKAGE_FILES | ||||
p_info = DebianPackageInfo.from_metadata(artifact_metadata, url=URL) | p_info = DebianPackageInfo.from_metadata(artifact_metadata, url=URL) | ||||
expected_revision_id = ( | expected_revision_id = ( | ||||
b"(\x07\xf5\xb3\xf8Ch\xb4\x88\x9a\x9a\xe8'\xfe\x85\x85O\xfe\xcf\x07" | b"(\x07\xf5\xb3\xf8Ch\xb4\x88\x9a\x9a\xe8'\xfe\x85\x85O\xfe\xcf\x07" | ||||
) | ) | ||||
files = dict(artifact_metadata["files"]) | files = dict(artifact_metadata["files"]) | ||||
package_files = { | package_files = { | ||||
"name": PACKAGE_FILES["name"], | "name": PACKAGE_FILES["name"], | ||||
"version": PACKAGE_FILES["version"], | "version": PACKAGE_FILES["version"], | ||||
"files": files, | "files": files, | ||||
} | } | ||||
known_package_artifacts = { | known_package_artifacts = { | ||||
expected_revision_id: { | expected_revision_id: { | ||||
"extrinsic": {"raw": package_files,}, | "extrinsic": {"raw": package_files,}, | ||||
# ... removed the unnecessary intermediary data | # ... removed the unnecessary intermediary data | ||||
} | } | ||||
} | } | ||||
# Too many .dsc | # Too many .dsc | ||||
files["another.dsc"] = files["cicero_0.7.2-3.dsc"] | files["another.dsc"] = files["cicero_0.7.2-3.dsc"] | ||||
with pytest.raises(ValueError, match="exactly one known .dsc"): | assert loader.resolve_revision_from(known_package_artifacts, p_info) is None | ||||
resolve_revision_from(known_package_artifacts, p_info) | |||||
# Not enough .dsc | # Not enough .dsc | ||||
del files["another.dsc"] | del files["another.dsc"] | ||||
del files["cicero_0.7.2-3.dsc"] | del files["cicero_0.7.2-3.dsc"] | ||||
with pytest.raises(ValueError, match="exactly one known .dsc"): | assert loader.resolve_revision_from(known_package_artifacts, p_info) is None | ||||
resolve_revision_from(known_package_artifacts, p_info) | |||||
def test_debian_resolve_revision_from_corrupt_new_artifact(): | def test_debian_resolve_revision_from_corrupt_new_artifact(): | ||||
loader = DebianLoader(None, None, None, None) | |||||
artifact_metadata = PACKAGE_FILES | artifact_metadata = PACKAGE_FILES | ||||
files = PACKAGE_FILES["files"] | files = PACKAGE_FILES["files"] | ||||
files = {**files, "another.dsc": files["cicero_0.7.2-3.dsc"]} | files = {**files, "another.dsc": files["cicero_0.7.2-3.dsc"]} | ||||
artifact_metadata = {**PACKAGE_FILES, "files": files} | artifact_metadata = {**PACKAGE_FILES, "files": files} | ||||
# Too many .dsc | # Too many .dsc | ||||
files["another.dsc"] = files["cicero_0.7.2-3.dsc"] | files["another.dsc"] = files["cicero_0.7.2-3.dsc"] | ||||
p_info = DebianPackageInfo.from_metadata(artifact_metadata, url=URL) | p_info = DebianPackageInfo.from_metadata(artifact_metadata, url=URL) | ||||
with pytest.raises(ValueError, match="exactly one new .dsc"): | assert loader.resolve_revision_from(PACKAGE_FILES, p_info) is None | ||||
resolve_revision_from(PACKAGE_FILES, p_info) | |||||
# Not enough .dsc | # Not enough .dsc | ||||
del files["another.dsc"] | del files["another.dsc"] | ||||
del files["cicero_0.7.2-3.dsc"] | del files["cicero_0.7.2-3.dsc"] | ||||
p_info = DebianPackageInfo.from_metadata(artifact_metadata, url=URL) | p_info = DebianPackageInfo.from_metadata(artifact_metadata, url=URL) | ||||
with pytest.raises(ValueError, match="exactly one new .dsc"): | assert loader.resolve_revision_from(PACKAGE_FILES, p_info) is None | ||||
resolve_revision_from(PACKAGE_FILES, p_info) |