Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/pypi/tests/test_pypi.py
Show First 20 Lines • Show All 212 Lines • ▼ Show 20 Lines | def test_pypi_no_release_artifact(swh_storage, requests_mock_datadir_missing_all): | ||||
""" | """ | ||||
url = "https://pypi.org/project/0805nexter" | url = "https://pypi.org/project/0805nexter" | ||||
loader = PyPILoader(swh_storage, url) | loader = PyPILoader(swh_storage, url) | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
assert actual_load_status["status"] == "uneventful" | assert actual_load_status["status"] == "uneventful" | ||||
assert actual_load_status["snapshot_id"] is not None | assert actual_load_status["snapshot_id"] is not None | ||||
empty_snapshot = Snapshot(branches={}) | |||||
assert_last_visit_matches( | |||||
swh_storage, url, status="partial", type="pypi", snapshot=empty_snapshot.id | |||||
) | |||||
stats = get_stats(swh_storage) | stats = get_stats(swh_storage) | ||||
assert { | assert { | ||||
"content": 0, | "content": 0, | ||||
"directory": 0, | "directory": 0, | ||||
"origin": 1, | "origin": 1, | ||||
"origin_visit": 1, | "origin_visit": 1, | ||||
"release": 0, | "release": 0, | ||||
"revision": 0, | "revision": 0, | ||||
"skipped_content": 0, | "skipped_content": 0, | ||||
"snapshot": 1, | "snapshot": 1, | ||||
} == stats | } == stats | ||||
assert_last_visit_matches(swh_storage, url, status="partial", type="pypi") | |||||
def test_pypi_fail__load_snapshot(swh_storage, requests_mock_datadir): | def test_pypi_fail__load_snapshot(swh_storage, requests_mock_datadir): | ||||
"""problem during loading: {visit: failed, status: failed, no snapshot} | """problem during loading: {visit: failed, status: failed, no snapshot} | ||||
""" | """ | ||||
url = "https://pypi.org/project/0805nexter" | url = "https://pypi.org/project/0805nexter" | ||||
with patch( | with patch( | ||||
"swh.loader.package.pypi.loader.PyPILoader._load_snapshot", | "swh.loader.package.pypi.loader.PyPILoader._load_snapshot", | ||||
side_effect=ValueError("Fake problem to fail visit"), | side_effect=ValueError("Fake problem to fail visit"), | ||||
): | ): | ||||
loader = PyPILoader(swh_storage, url) | loader = PyPILoader(swh_storage, url) | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
assert actual_load_status == {"status": "failed"} | assert actual_load_status == {"status": "failed"} | ||||
assert_last_visit_matches(swh_storage, url, status="failed", type="pypi") | |||||
stats = get_stats(loader.storage) | stats = get_stats(loader.storage) | ||||
assert { | assert { | ||||
"content": 6, | "content": 6, | ||||
"directory": 4, | "directory": 4, | ||||
"origin": 1, | "origin": 1, | ||||
"origin_visit": 1, | "origin_visit": 1, | ||||
"release": 0, | "release": 0, | ||||
"revision": 2, | "revision": 2, | ||||
"skipped_content": 0, | "skipped_content": 0, | ||||
"snapshot": 0, | "snapshot": 0, | ||||
} == stats | } == stats | ||||
assert_last_visit_matches(swh_storage, url, status="failed", type="pypi") | |||||
# problem during loading: | # problem during loading: | ||||
# {visit: partial, status: uneventful, no snapshot} | # {visit: partial, status: uneventful, no snapshot} | ||||
def test_pypi_release_with_traceback(swh_storage, requests_mock_datadir): | def test_pypi_release_with_traceback(swh_storage, requests_mock_datadir): | ||||
url = "https://pypi.org/project/0805nexter" | url = "https://pypi.org/project/0805nexter" | ||||
with patch( | with patch( | ||||
"swh.loader.package.pypi.loader.PyPILoader.last_snapshot", | "swh.loader.package.pypi.loader.PyPILoader.last_snapshot", | ||||
side_effect=ValueError("Fake problem to fail the visit"), | side_effect=ValueError("Fake problem to fail the visit"), | ||||
): | ): | ||||
loader = PyPILoader(swh_storage, url) | loader = PyPILoader(swh_storage, url) | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
assert actual_load_status == {"status": "failed"} | assert actual_load_status == {"status": "failed"} | ||||
assert_last_visit_matches(swh_storage, url, status="failed", type="pypi") | |||||
stats = get_stats(swh_storage) | stats = get_stats(swh_storage) | ||||
assert { | assert { | ||||
"content": 0, | "content": 0, | ||||
"directory": 0, | "directory": 0, | ||||
"origin": 1, | "origin": 1, | ||||
"origin_visit": 1, | "origin_visit": 1, | ||||
"release": 0, | "release": 0, | ||||
"revision": 0, | "revision": 0, | ||||
"skipped_content": 0, | "skipped_content": 0, | ||||
"snapshot": 0, | "snapshot": 0, | ||||
} == stats | } == stats | ||||
assert_last_visit_matches(swh_storage, url, status="failed", type="pypi") | |||||
# problem during loading: failure early enough in between swh contents... | # problem during loading: failure early enough in between swh contents... | ||||
# some contents (contents, directories, etc...) have been written in storage | # some contents (contents, directories, etc...) have been written in storage | ||||
# {visit: partial, status: eventful, no snapshot} | # {visit: partial, status: eventful, no snapshot} | ||||
# problem during loading: failure late enough we can have snapshots (some | # problem during loading: failure late enough we can have snapshots (some | ||||
# revisions are written in storage already) | # revisions are written in storage already) | ||||
# {visit: partial, status: eventful, snapshot} | # {visit: partial, status: eventful, snapshot} | ||||
▲ Show 20 Lines • Show All 66 Lines • ▼ Show 20 Lines | ): | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
expected_snapshot_id = hash_to_bytes("dd0e4201a232b1c104433741dbf45895b8ac9355") | expected_snapshot_id = hash_to_bytes("dd0e4201a232b1c104433741dbf45895b8ac9355") | ||||
assert actual_load_status == { | assert actual_load_status == { | ||||
"status": "eventful", | "status": "eventful", | ||||
"snapshot_id": expected_snapshot_id.hex(), | "snapshot_id": expected_snapshot_id.hex(), | ||||
} | } | ||||
stats = get_stats(swh_storage) | assert_last_visit_matches( | ||||
swh_storage, url, status="partial", type="pypi", snapshot=expected_snapshot_id, | |||||
) | |||||
assert { | expected_snapshot = Snapshot( | ||||
"content": 3, | id=hash_to_bytes(expected_snapshot_id), | ||||
"directory": 2, | branches={ | ||||
"origin": 1, | b"releases/1.2.0": SnapshotBranch( | ||||
"origin_visit": 1, | target=hash_to_bytes("e445da4da22b31bfebb6ffc4383dbf839a074d21"), | ||||
"release": 0, | target_type=TargetType.REVISION, | ||||
"revision": 1, | ), | ||||
"skipped_content": 0, | b"HEAD": SnapshotBranch( | ||||
"snapshot": 1, | target=b"releases/1.2.0", target_type=TargetType.ALIAS, | ||||
} == stats | ), | ||||
}, | |||||
) | |||||
check_snapshot(expected_snapshot, storage=swh_storage) | |||||
expected_contents = map( | expected_contents = map( | ||||
hash_to_bytes, | hash_to_bytes, | ||||
[ | [ | ||||
"405859113963cb7a797642b45f171d6360425d16", | "405859113963cb7a797642b45f171d6360425d16", | ||||
"e5686aa568fdb1d19d7f1329267082fe40482d31", | "e5686aa568fdb1d19d7f1329267082fe40482d31", | ||||
"83ecf6ec1114fd260ca7a833a2d165e71258c338", | "83ecf6ec1114fd260ca7a833a2d165e71258c338", | ||||
], | ], | ||||
Show All 14 Lines | ): | ||||
# {revision hash: directory hash} | # {revision hash: directory hash} | ||||
expected_revs = { | expected_revs = { | ||||
hash_to_bytes("e445da4da22b31bfebb6ffc4383dbf839a074d21"): hash_to_bytes( | hash_to_bytes("e445da4da22b31bfebb6ffc4383dbf839a074d21"): hash_to_bytes( | ||||
"b178b66bd22383d5f16f4f5c923d39ca798861b4" | "b178b66bd22383d5f16f4f5c923d39ca798861b4" | ||||
), # noqa | ), # noqa | ||||
} | } | ||||
assert list(swh_storage.revision_missing(expected_revs)) == [] | assert list(swh_storage.revision_missing(expected_revs)) == [] | ||||
expected_snapshot = Snapshot( | stats = get_stats(swh_storage) | ||||
id=hash_to_bytes(expected_snapshot_id), | |||||
branches={ | |||||
b"releases/1.2.0": SnapshotBranch( | |||||
target=hash_to_bytes("e445da4da22b31bfebb6ffc4383dbf839a074d21"), | |||||
target_type=TargetType.REVISION, | |||||
), | |||||
b"HEAD": SnapshotBranch( | |||||
target=b"releases/1.2.0", target_type=TargetType.ALIAS, | |||||
), | |||||
}, | |||||
) | |||||
check_snapshot(expected_snapshot, storage=swh_storage) | |||||
assert_last_visit_matches( | assert { | ||||
swh_storage, url, status="partial", type="pypi", snapshot=expected_snapshot_id, | "content": 3, | ||||
) | "directory": 2, | ||||
"origin": 1, | |||||
"origin_visit": 1, | |||||
"release": 0, | |||||
"revision": 1, | |||||
"skipped_content": 0, | |||||
"snapshot": 1, | |||||
} == stats | |||||
def test_pypi_visit_with_1_release_artifact(swh_storage, requests_mock_datadir): | def test_pypi_visit_with_1_release_artifact(swh_storage, requests_mock_datadir): | ||||
"""With no prior visit, load a pypi project ends up with 1 snapshot | """With no prior visit, load a pypi project ends up with 1 snapshot | ||||
""" | """ | ||||
url = "https://pypi.org/project/0805nexter" | url = "https://pypi.org/project/0805nexter" | ||||
loader = PyPILoader(swh_storage, url) | loader = PyPILoader(swh_storage, url) | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
expected_snapshot_id = hash_to_bytes("ba6e158ada75d0b3cfb209ffdf6daa4ed34a227a") | expected_snapshot_id = hash_to_bytes("ba6e158ada75d0b3cfb209ffdf6daa4ed34a227a") | ||||
assert actual_load_status == { | assert actual_load_status == { | ||||
"status": "eventful", | "status": "eventful", | ||||
"snapshot_id": expected_snapshot_id.hex(), | "snapshot_id": expected_snapshot_id.hex(), | ||||
} | } | ||||
assert_last_visit_matches( | |||||
swh_storage, url, status="full", type="pypi", snapshot=expected_snapshot_id | |||||
) | |||||
expected_snapshot = Snapshot( | |||||
id=expected_snapshot_id, | |||||
branches={ | |||||
b"releases/1.1.0": SnapshotBranch( | |||||
target=hash_to_bytes("4c99891f93b81450385777235a37b5e966dd1571"), | |||||
target_type=TargetType.REVISION, | |||||
), | |||||
b"releases/1.2.0": SnapshotBranch( | |||||
target=hash_to_bytes("e445da4da22b31bfebb6ffc4383dbf839a074d21"), | |||||
target_type=TargetType.REVISION, | |||||
), | |||||
b"HEAD": SnapshotBranch( | |||||
target=b"releases/1.2.0", target_type=TargetType.ALIAS, | |||||
), | |||||
}, | |||||
) | |||||
check_snapshot(expected_snapshot, swh_storage) | |||||
stats = get_stats(swh_storage) | stats = get_stats(swh_storage) | ||||
assert { | assert { | ||||
"content": 6, | "content": 6, | ||||
"directory": 4, | "directory": 4, | ||||
"origin": 1, | "origin": 1, | ||||
"origin_visit": 1, | "origin_visit": 1, | ||||
"release": 0, | "release": 0, | ||||
"revision": 2, | "revision": 2, | ||||
Show All 33 Lines | expected_revs = { | ||||
"05219ba38bc542d4345d5638af1ed56c7d43ca7d" | "05219ba38bc542d4345d5638af1ed56c7d43ca7d" | ||||
), # noqa | ), # noqa | ||||
hash_to_bytes("e445da4da22b31bfebb6ffc4383dbf839a074d21"): hash_to_bytes( | hash_to_bytes("e445da4da22b31bfebb6ffc4383dbf839a074d21"): hash_to_bytes( | ||||
"b178b66bd22383d5f16f4f5c923d39ca798861b4" | "b178b66bd22383d5f16f4f5c923d39ca798861b4" | ||||
), # noqa | ), # noqa | ||||
} | } | ||||
assert list(swh_storage.revision_missing(expected_revs)) == [] | assert list(swh_storage.revision_missing(expected_revs)) == [] | ||||
expected_snapshot = Snapshot( | |||||
id=expected_snapshot_id, | |||||
branches={ | |||||
b"releases/1.1.0": SnapshotBranch( | |||||
target=hash_to_bytes("4c99891f93b81450385777235a37b5e966dd1571"), | |||||
target_type=TargetType.REVISION, | |||||
), | |||||
b"releases/1.2.0": SnapshotBranch( | |||||
target=hash_to_bytes("e445da4da22b31bfebb6ffc4383dbf839a074d21"), | |||||
target_type=TargetType.REVISION, | |||||
), | |||||
b"HEAD": SnapshotBranch( | |||||
target=b"releases/1.2.0", target_type=TargetType.ALIAS, | |||||
), | |||||
}, | |||||
) | |||||
check_snapshot(expected_snapshot, swh_storage) | |||||
assert_last_visit_matches( | |||||
swh_storage, url, status="full", type="pypi", snapshot=expected_snapshot_id | |||||
) | |||||
def test_pypi_multiple_visits_with_no_change(swh_storage, requests_mock_datadir): | def test_pypi_multiple_visits_with_no_change(swh_storage, requests_mock_datadir): | ||||
"""Multiple visits with no changes results in 1 same snapshot | """Multiple visits with no changes results in 1 same snapshot | ||||
""" | """ | ||||
url = "https://pypi.org/project/0805nexter" | url = "https://pypi.org/project/0805nexter" | ||||
loader = PyPILoader(swh_storage, url) | loader = PyPILoader(swh_storage, url) | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
snapshot_id = hash_to_bytes("ba6e158ada75d0b3cfb209ffdf6daa4ed34a227a") | snapshot_id = hash_to_bytes("ba6e158ada75d0b3cfb209ffdf6daa4ed34a227a") | ||||
assert actual_load_status == { | assert actual_load_status == { | ||||
"status": "eventful", | "status": "eventful", | ||||
"snapshot_id": snapshot_id.hex(), | "snapshot_id": snapshot_id.hex(), | ||||
} | } | ||||
assert_last_visit_matches( | assert_last_visit_matches( | ||||
swh_storage, url, status="full", type="pypi", snapshot=snapshot_id | swh_storage, url, status="full", type="pypi", snapshot=snapshot_id | ||||
) | ) | ||||
stats = get_stats(swh_storage) | |||||
assert { | |||||
"content": 6, | |||||
"directory": 4, | |||||
"origin": 1, | |||||
"origin_visit": 1, | |||||
"release": 0, | |||||
"revision": 2, | |||||
"skipped_content": 0, | |||||
"snapshot": 1, | |||||
} == stats | |||||
expected_snapshot = Snapshot( | expected_snapshot = Snapshot( | ||||
id=snapshot_id, | id=snapshot_id, | ||||
branches={ | branches={ | ||||
b"releases/1.1.0": SnapshotBranch( | b"releases/1.1.0": SnapshotBranch( | ||||
target=hash_to_bytes("4c99891f93b81450385777235a37b5e966dd1571"), | target=hash_to_bytes("4c99891f93b81450385777235a37b5e966dd1571"), | ||||
target_type=TargetType.REVISION, | target_type=TargetType.REVISION, | ||||
), | ), | ||||
b"releases/1.2.0": SnapshotBranch( | b"releases/1.2.0": SnapshotBranch( | ||||
target=hash_to_bytes("e445da4da22b31bfebb6ffc4383dbf839a074d21"), | target=hash_to_bytes("e445da4da22b31bfebb6ffc4383dbf839a074d21"), | ||||
target_type=TargetType.REVISION, | target_type=TargetType.REVISION, | ||||
), | ), | ||||
b"HEAD": SnapshotBranch( | b"HEAD": SnapshotBranch( | ||||
target=b"releases/1.2.0", target_type=TargetType.ALIAS, | target=b"releases/1.2.0", target_type=TargetType.ALIAS, | ||||
), | ), | ||||
}, | }, | ||||
) | ) | ||||
check_snapshot(expected_snapshot, swh_storage) | check_snapshot(expected_snapshot, swh_storage) | ||||
stats = get_stats(swh_storage) | |||||
assert { | |||||
"content": 6, | |||||
"directory": 4, | |||||
"origin": 1, | |||||
"origin_visit": 1, | |||||
"release": 0, | |||||
"revision": 2, | |||||
"skipped_content": 0, | |||||
"snapshot": 1, | |||||
} == stats | |||||
actual_load_status2 = loader.load() | actual_load_status2 = loader.load() | ||||
assert actual_load_status2 == { | assert actual_load_status2 == { | ||||
"status": "uneventful", | "status": "uneventful", | ||||
"snapshot_id": actual_load_status2["snapshot_id"], | "snapshot_id": actual_load_status2["snapshot_id"], | ||||
} | } | ||||
visit_status2 = assert_last_visit_matches( | visit_status2 = assert_last_visit_matches( | ||||
swh_storage, url, status="full", type="pypi" | swh_storage, url, status="full", type="pypi" | ||||
▲ Show 20 Lines • Show All 51 Lines • ▼ Show 20 Lines | assert visit2_actual_load_status == { | ||||
"status": "eventful", | "status": "eventful", | ||||
"snapshot_id": expected_snapshot_id2.hex(), | "snapshot_id": expected_snapshot_id2.hex(), | ||||
} | } | ||||
assert_last_visit_matches( | assert_last_visit_matches( | ||||
swh_storage, url, status="full", type="pypi", snapshot=expected_snapshot_id2 | swh_storage, url, status="full", type="pypi", snapshot=expected_snapshot_id2 | ||||
) | ) | ||||
expected_snapshot = Snapshot( | |||||
id=expected_snapshot_id2, | |||||
branches={ | |||||
b"releases/1.1.0": SnapshotBranch( | |||||
target=hash_to_bytes("4c99891f93b81450385777235a37b5e966dd1571"), | |||||
target_type=TargetType.REVISION, | |||||
), | |||||
b"releases/1.2.0": SnapshotBranch( | |||||
target=hash_to_bytes("e445da4da22b31bfebb6ffc4383dbf839a074d21"), | |||||
target_type=TargetType.REVISION, | |||||
), | |||||
b"releases/1.3.0": SnapshotBranch( | |||||
target=hash_to_bytes("51247143b01445c9348afa9edfae31bf7c5d86b1"), | |||||
target_type=TargetType.REVISION, | |||||
), | |||||
b"HEAD": SnapshotBranch( | |||||
target=b"releases/1.3.0", target_type=TargetType.ALIAS, | |||||
), | |||||
}, | |||||
) | |||||
assert_last_visit_matches( | |||||
swh_storage, url, status="full", type="pypi", snapshot=expected_snapshot.id | |||||
) | |||||
check_snapshot(expected_snapshot, swh_storage) | |||||
assert { | assert { | ||||
"content": 6 + 1, # 1 more content | "content": 6 + 1, # 1 more content | ||||
"directory": 4 + 2, # 2 more directories | "directory": 4 + 2, # 2 more directories | ||||
"origin": 1, | "origin": 1, | ||||
"origin_visit": 1 + 1, | "origin_visit": 1 + 1, | ||||
"release": 0, | "release": 0, | ||||
"revision": 2 + 1, # 1 more revision | "revision": 2 + 1, # 1 more revision | ||||
"skipped_content": 0, | "skipped_content": 0, | ||||
Show All 39 Lines | expected_revs = { | ||||
), # noqa | ), # noqa | ||||
hash_to_bytes("51247143b01445c9348afa9edfae31bf7c5d86b1"): hash_to_bytes( | hash_to_bytes("51247143b01445c9348afa9edfae31bf7c5d86b1"): hash_to_bytes( | ||||
"e226e7e4ad03b4fc1403d69a18ebdd6f2edd2b3a" | "e226e7e4ad03b4fc1403d69a18ebdd6f2edd2b3a" | ||||
), # noqa | ), # noqa | ||||
} | } | ||||
assert list(swh_storage.revision_missing(expected_revs)) == [] | assert list(swh_storage.revision_missing(expected_revs)) == [] | ||||
expected_snapshot = Snapshot( | |||||
id=expected_snapshot_id2, | |||||
branches={ | |||||
b"releases/1.1.0": SnapshotBranch( | |||||
target=hash_to_bytes("4c99891f93b81450385777235a37b5e966dd1571"), | |||||
target_type=TargetType.REVISION, | |||||
), | |||||
b"releases/1.2.0": SnapshotBranch( | |||||
target=hash_to_bytes("e445da4da22b31bfebb6ffc4383dbf839a074d21"), | |||||
target_type=TargetType.REVISION, | |||||
), | |||||
b"releases/1.3.0": SnapshotBranch( | |||||
target=hash_to_bytes("51247143b01445c9348afa9edfae31bf7c5d86b1"), | |||||
target_type=TargetType.REVISION, | |||||
), | |||||
b"HEAD": SnapshotBranch( | |||||
target=b"releases/1.3.0", target_type=TargetType.ALIAS, | |||||
), | |||||
}, | |||||
) | |||||
check_snapshot(expected_snapshot, swh_storage) | |||||
assert_last_visit_matches( | |||||
swh_storage, url, status="full", type="pypi", snapshot=expected_snapshot.id | |||||
) | |||||
urls = [ | urls = [ | ||||
m.url | m.url | ||||
for m in requests_mock_datadir_visits.request_history | for m in requests_mock_datadir_visits.request_history | ||||
if m.url.startswith("https://files.pythonhosted.org") | if m.url.startswith("https://files.pythonhosted.org") | ||||
] | ] | ||||
# visited each artifact once across 2 visits | # visited each artifact once across 2 visits | ||||
assert len(urls) == len(set(urls)) | assert len(urls) == len(set(urls)) | ||||
Show All 18 Lines | def test_pypi_visit_1_release_with_2_artifacts(swh_storage, requests_mock_datadir): | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
expected_snapshot_id = hash_to_bytes("a27e638a4dad6fbfa273c6ebec1c4bf320fb84c6") | expected_snapshot_id = hash_to_bytes("a27e638a4dad6fbfa273c6ebec1c4bf320fb84c6") | ||||
assert actual_load_status == { | assert actual_load_status == { | ||||
"status": "eventful", | "status": "eventful", | ||||
"snapshot_id": expected_snapshot_id.hex(), | "snapshot_id": expected_snapshot_id.hex(), | ||||
} | } | ||||
assert_last_visit_matches( | |||||
swh_storage, url, status="full", type="pypi", snapshot=expected_snapshot_id | |||||
) | |||||
expected_snapshot = Snapshot( | expected_snapshot = Snapshot( | ||||
id=expected_snapshot_id, | id=expected_snapshot_id, | ||||
branches={ | branches={ | ||||
b"releases/1.1.0/nexter-1.1.0.zip": SnapshotBranch( | b"releases/1.1.0/nexter-1.1.0.zip": SnapshotBranch( | ||||
target=hash_to_bytes("4c99891f93b81450385777235a37b5e966dd1571"), | target=hash_to_bytes("4c99891f93b81450385777235a37b5e966dd1571"), | ||||
target_type=TargetType.REVISION, | target_type=TargetType.REVISION, | ||||
), | ), | ||||
b"releases/1.1.0/nexter-1.1.0.tar.gz": SnapshotBranch( | b"releases/1.1.0/nexter-1.1.0.tar.gz": SnapshotBranch( | ||||
target=hash_to_bytes("0bf88f5760cca7665d0af4d6575d9301134fe11a"), | target=hash_to_bytes("0bf88f5760cca7665d0af4d6575d9301134fe11a"), | ||||
target_type=TargetType.REVISION, | target_type=TargetType.REVISION, | ||||
), | ), | ||||
}, | }, | ||||
) | ) | ||||
check_snapshot(expected_snapshot, swh_storage) | check_snapshot(expected_snapshot, swh_storage) | ||||
assert_last_visit_matches( | |||||
swh_storage, url, status="full", type="pypi", snapshot=expected_snapshot.id | |||||
) | |||||
def test_pypi_artifact_with_no_intrinsic_metadata(swh_storage, requests_mock_datadir): | def test_pypi_artifact_with_no_intrinsic_metadata(swh_storage, requests_mock_datadir): | ||||
"""Skip artifact with no intrinsic metadata during ingestion | """Skip artifact with no intrinsic metadata during ingestion | ||||
""" | """ | ||||
url = "https://pypi.org/project/upymenu" | url = "https://pypi.org/project/upymenu" | ||||
loader = PyPILoader(swh_storage, url) | loader = PyPILoader(swh_storage, url) | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
expected_snapshot_id = hash_to_bytes("1a8893e6a86f444e8be8e7bda6cb34fb1735a00e") | expected_snapshot_id = hash_to_bytes("1a8893e6a86f444e8be8e7bda6cb34fb1735a00e") | ||||
assert actual_load_status == { | assert actual_load_status == { | ||||
"status": "eventful", | "status": "eventful", | ||||
"snapshot_id": expected_snapshot_id.hex(), | "snapshot_id": expected_snapshot_id.hex(), | ||||
} | } | ||||
# no branch as one artifact without any intrinsic metadata | # no branch as one artifact without any intrinsic metadata | ||||
expected_snapshot = Snapshot(id=expected_snapshot_id, branches={}) | expected_snapshot = Snapshot(id=expected_snapshot_id, branches={}) | ||||
check_snapshot(expected_snapshot, swh_storage) | |||||
assert_last_visit_matches( | assert_last_visit_matches( | ||||
swh_storage, url, status="full", type="pypi", snapshot=expected_snapshot.id | swh_storage, url, status="full", type="pypi", snapshot=expected_snapshot.id | ||||
) | ) | ||||
check_snapshot(expected_snapshot, swh_storage) | |||||
def test_pypi_origin_not_found(swh_storage, requests_mock_datadir): | def test_pypi_origin_not_found(swh_storage, requests_mock_datadir): | ||||
url = "https://pypi.org/project/unknown" | url = "https://pypi.org/project/unknown" | ||||
loader = PyPILoader(swh_storage, url) | loader = PyPILoader(swh_storage, url) | ||||
assert loader.load() == {"status": "failed"} | assert loader.load() == {"status": "failed"} | ||||
assert_last_visit_matches( | assert_last_visit_matches( | ||||
▲ Show 20 Lines • Show All 77 Lines • Show Last 20 Lines |