Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/tests/test_pypi.py
Show All 10 Lines | |||||
from unittest.mock import patch | from unittest.mock import patch | ||||
from swh.core.tarball import uncompress | from swh.core.tarball import uncompress | ||||
from swh.core.pytest_plugin import requests_mock_datadir_factory | from swh.core.pytest_plugin import requests_mock_datadir_factory | ||||
from swh.model.hashutil import hash_to_bytes | from swh.model.hashutil import hash_to_bytes | ||||
from swh.loader.package.pypi import ( | from swh.loader.package.pypi import ( | ||||
PyPILoader, pypi_api_url, author, extract_intrinsic_metadata | PyPILoader, pypi_api_url, author, extract_intrinsic_metadata, | ||||
artifact_to_revision_id | |||||
) | ) | ||||
from swh.loader.package.tests.common import ( | from swh.loader.package.tests.common import ( | ||||
check_snapshot, check_metadata_paths, get_stats | check_snapshot, check_metadata_paths, get_stats | ||||
) | ) | ||||
def test_author_basic(): | def test_author_basic(): | ||||
data = { | data = { | ||||
▲ Show 20 Lines • Show All 672 Lines • ▼ Show 20 Lines | expected_snapshot = { | ||||
'id': expected_snapshot_id, | 'id': expected_snapshot_id, | ||||
'branches': expected_branches, | 'branches': expected_branches, | ||||
} | } | ||||
check_snapshot(expected_snapshot, loader.storage) | check_snapshot(expected_snapshot, loader.storage) | ||||
origin_visit = next(loader.storage.origin_visit_get(url)) | origin_visit = next(loader.storage.origin_visit_get(url)) | ||||
assert origin_visit['status'] == 'full' | assert origin_visit['status'] == 'full' | ||||
assert origin_visit['type'] == 'pypi' | assert origin_visit['type'] == 'pypi' | ||||
def test_pypi_artifact_to_revision_id_none(): | |||||
"""Current loader version should stop soon if nothing can be found | |||||
""" | |||||
artifact_metadata = { | |||||
'digests': { | |||||
'sha256': '6975816f2c5ad4046acc676ba112f2fff945b01522d63948531f11f11e0892ec', # noqa | |||||
}, | |||||
} | |||||
assert artifact_to_revision_id({}, artifact_metadata) is None | |||||
known_artifacts = { | |||||
'b11ebac8c9d0c9e5063a2df693a18e3aba4b2f92': { | |||||
'original_artifact': { | |||||
'sha256': 'something-irrelevant', | |||||
}, | |||||
}, | |||||
} | |||||
assert artifact_to_revision_id(known_artifacts, artifact_metadata) is None | |||||
def test_pypi_artifact_to_revision_id_old_loader_version(): | |||||
"""Current loader version should solve old metadata scheme | |||||
""" | |||||
artifact_metadata = { | |||||
'digests': { | |||||
'sha256': '6975816f2c5ad4046acc676ba112f2fff945b01522d63948531f11f11e0892ec', # noqa | |||||
} | |||||
} | |||||
known_artifacts = { | |||||
hash_to_bytes('b11ebac8c9d0c9e5063a2df693a18e3aba4b2f92'): { | |||||
'original_artifact': { | |||||
'sha256': "something-wrong", | |||||
}, | |||||
}, | |||||
hash_to_bytes('845673bfe8cbd31b1eaf757745a964137e6f9116'): { | |||||
'original_artifact': { | |||||
'sha256': '6975816f2c5ad4046acc676ba112f2fff945b01522d63948531f11f11e0892ec', # noqa | |||||
}, | |||||
} | |||||
} | |||||
assert artifact_to_revision_id(known_artifacts, artifact_metadata) \ | |||||
== hash_to_bytes('845673bfe8cbd31b1eaf757745a964137e6f9116') | |||||
def test_pypi_artifact_to_revision_id_current_loader_version(): | |||||
"""Current loader version should be able to solve current metadata scheme | |||||
""" | |||||
artifact_metadata = { | |||||
'digests': { | |||||
'sha256': '6975816f2c5ad4046acc676ba112f2fff945b01522d63948531f11f11e0892ec', # noqa | |||||
} | |||||
} | |||||
known_artifacts = { | |||||
hash_to_bytes('b11ebac8c9d0c9e5063a2df693a18e3aba4b2f92'): { | |||||
'original_artifact': [{ | |||||
'checksums': { | |||||
'sha256': '6975816f2c5ad4046acc676ba112f2fff945b01522d63948531f11f11e0892ec', # noqa | |||||
}, | |||||
}], | |||||
}, | |||||
hash_to_bytes('845673bfe8cbd31b1eaf757745a964137e6f9116'): { | |||||
'original_artifact': [{ | |||||
'checksums': { | |||||
'sha256': 'something-wrong' | |||||
}, | |||||
}], | |||||
}, | |||||
} | |||||
assert artifact_to_revision_id(known_artifacts, artifact_metadata) \ | |||||
== hash_to_bytes('b11ebac8c9d0c9e5063a2df693a18e3aba4b2f92') | |||||
def test_pypi_artifact_to_revision_id_failures(): | |||||
with pytest.raises(KeyError, match='sha256'): | |||||
artifact_metadata = { | |||||
'digests': {}, | |||||
} | |||||
assert artifact_to_revision_id({}, artifact_metadata) | |||||
with pytest.raises(KeyError, match='digests'): | |||||
artifact_metadata = { | |||||
'something': 'wrong', | |||||
} | |||||
assert artifact_to_revision_id({}, artifact_metadata) |