Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/pypi/tests/test_pypi.py
Show All 13 Lines | |||||
from swh.core.tarball import uncompress | from swh.core.tarball import uncompress | ||||
from swh.loader.package import __version__ | from swh.loader.package import __version__ | ||||
from swh.loader.package.pypi.loader import ( | from swh.loader.package.pypi.loader import ( | ||||
PyPILoader, | PyPILoader, | ||||
author, | author, | ||||
extract_intrinsic_metadata, | extract_intrinsic_metadata, | ||||
pypi_api_url, | pypi_api_url, | ||||
) | ) | ||||
from swh.loader.package.tests.common import check_metadata_paths | |||||
from swh.loader.tests import assert_last_visit_matches, check_snapshot, get_stats | from swh.loader.tests import assert_last_visit_matches, check_snapshot, get_stats | ||||
from swh.model.hashutil import hash_to_bytes | from swh.model.hashutil import hash_to_bytes | ||||
from swh.model.identifiers import ( | from swh.model.identifiers import ( | ||||
CoreSWHID, | CoreSWHID, | ||||
ExtendedObjectType, | ExtendedObjectType, | ||||
ExtendedSWHID, | ExtendedSWHID, | ||||
ObjectType, | ObjectType, | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 298 Lines • ▼ Show 20 Lines | ): | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
assert actual_load_status["status"] == "eventful" | assert actual_load_status["status"] == "eventful" | ||||
assert actual_load_status["snapshot_id"] is not None | assert actual_load_status["snapshot_id"] is not None | ||||
expected_revision_id = hash_to_bytes("e445da4da22b31bfebb6ffc4383dbf839a074d21") | expected_revision_id = hash_to_bytes("e445da4da22b31bfebb6ffc4383dbf839a074d21") | ||||
revision = swh_storage.revision_get([expected_revision_id])[0] | revision = swh_storage.revision_get([expected_revision_id])[0] | ||||
assert revision is not None | assert revision is not None | ||||
check_metadata_paths( | |||||
revision.metadata, | |||||
paths=[ | |||||
("intrinsic.tool", str), | |||||
("intrinsic.raw", dict), | |||||
("extrinsic.provider", str), | |||||
("extrinsic.when", str), | |||||
("extrinsic.raw", dict), | |||||
("original_artifact", list), | |||||
], | |||||
) | |||||
for original_artifact in revision.metadata["original_artifact"]: | |||||
check_metadata_paths( | |||||
original_artifact, | |||||
paths=[("filename", str), ("length", int), ("checksums", dict),], | |||||
) | |||||
revision_swhid = CoreSWHID( | revision_swhid = CoreSWHID( | ||||
object_type=ObjectType.REVISION, object_id=expected_revision_id | object_type=ObjectType.REVISION, object_id=expected_revision_id | ||||
) | ) | ||||
directory_swhid = ExtendedSWHID( | directory_swhid = ExtendedSWHID( | ||||
object_type=ExtendedObjectType.DIRECTORY, object_id=revision.directory | object_type=ExtendedObjectType.DIRECTORY, object_id=revision.directory | ||||
) | ) | ||||
metadata_authority = MetadataAuthority( | metadata_authority = MetadataAuthority( | ||||
type=MetadataAuthorityType.FORGE, url="https://pypi.org/", | type=MetadataAuthorityType.FORGE, url="https://pypi.org/", | ||||
▲ Show 20 Lines • Show All 428 Lines • ▼ Show 20 Lines | def test_pypi_visit_1_release_with_2_artifacts(swh_storage, requests_mock_datadir): | ||||
) | ) | ||||
check_snapshot(expected_snapshot, swh_storage) | check_snapshot(expected_snapshot, swh_storage) | ||||
assert_last_visit_matches( | assert_last_visit_matches( | ||||
swh_storage, url, status="full", type="pypi", snapshot=expected_snapshot.id | swh_storage, url, status="full", type="pypi", snapshot=expected_snapshot.id | ||||
) | ) | ||||
def test_pypi__known_artifact_to_extid__old_loader_version(): | |||||
"""Current loader version should solve old metadata scheme | |||||
""" | |||||
assert ( | |||||
PyPILoader.known_artifact_to_extid( | |||||
{"original_artifact": {"sha256": "something-wrong",},} | |||||
) | |||||
is None | |||||
) | |||||
sha256 = "6975816f2c5ad4046acc676ba112f2fff945b01522d63948531f11f11e0892ec" | |||||
assert PyPILoader.known_artifact_to_extid( | |||||
{"original_artifact": {"sha256": sha256}} | |||||
) == ("pypi-archive-sha256", hash_to_bytes(sha256)) | |||||
def test_pypi__known_artifact_to_extid__current_loader_version(): | |||||
"""Current loader version should be able to solve current metadata scheme | |||||
""" | |||||
sha256 = "6975816f2c5ad4046acc676ba112f2fff945b01522d63948531f11f11e0892ec" | |||||
assert PyPILoader.known_artifact_to_extid( | |||||
{"original_artifact": [{"checksums": {"sha256": sha256,},}],} | |||||
) == ("pypi-archive-sha256", hash_to_bytes(sha256)) | |||||
assert ( | |||||
PyPILoader.known_artifact_to_extid( | |||||
{"original_artifact": [{"checksums": {"sha256": "something-wrong"},}],}, | |||||
) | |||||
is None | |||||
) | |||||
# there should not be more than one artifact | |||||
with pytest.raises(ValueError): | |||||
PyPILoader.known_artifact_to_extid( | |||||
{ | |||||
"original_artifact": [ | |||||
{"checksums": {"sha256": sha256,},}, | |||||
{"checksums": {"sha256": sha256,},}, | |||||
], | |||||
} | |||||
) | |||||
def test_pypi_artifact_with_no_intrinsic_metadata(swh_storage, requests_mock_datadir): | def test_pypi_artifact_with_no_intrinsic_metadata(swh_storage, requests_mock_datadir): | ||||
"""Skip artifact with no intrinsic metadata during ingestion | """Skip artifact with no intrinsic metadata during ingestion | ||||
""" | """ | ||||
url = "https://pypi.org/project/upymenu" | url = "https://pypi.org/project/upymenu" | ||||
loader = PyPILoader(swh_storage, url) | loader = PyPILoader(swh_storage, url) | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
Show All 24 Lines |