Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/tests/test_init.py
Show First 20 Lines • Show All 331 Lines • ▼ Show 20 Lines | for branch, target in SNAPSHOT.branches.items(): | ||||
if branch == b"alias": | if branch == b"alias": | ||||
assert target.target in SNAPSHOT.branches | assert target.target in SNAPSHOT.branches | ||||
elif branch == b"evaluation": | elif branch == b"evaluation": | ||||
# this one does not exist and we are safelisting its check below | # this one does not exist and we are safelisting its check below | ||||
continue | continue | ||||
else: | else: | ||||
assert target.target in [REVISION.id, RELEASE.id] | assert target.target in [REVISION.id, RELEASE.id] | ||||
swh_storage.content_add([CONTENT.to_dict()]) | swh_storage.content_add([CONTENT]) | ||||
swh_storage.directory_add([DIRECTORY.to_dict()]) | swh_storage.directory_add([DIRECTORY]) | ||||
swh_storage.revision_add([REVISION.to_dict()]) | swh_storage.revision_add([REVISION]) | ||||
swh_storage.release_add([RELEASE.to_dict()]) | swh_storage.release_add([RELEASE]) | ||||
s = swh_storage.snapshot_add([SNAPSHOT.to_dict()]) | s = swh_storage.snapshot_add([SNAPSHOT]) | ||||
assert s == { | assert s == { | ||||
"snapshot:add": 1, | "snapshot:add": 1, | ||||
} | } | ||||
for snap in [SNAPSHOT, SNAPSHOT.to_dict()]: | for snap in [SNAPSHOT, SNAPSHOT.to_dict()]: | ||||
# all should be fine! | # all should be fine! | ||||
check_snapshot( | check_snapshot( | ||||
snap, swh_storage, allowed_empty=[(TargetType.REVISION, b"evaluation")] | snap, swh_storage, allowed_empty=[(TargetType.REVISION, b"evaluation")] | ||||
▲ Show 20 Lines • Show All 86 Lines • ▼ Show 20 Lines | def test_check_snapshot_failures(swh_storage): | ||||
swh_storage.snapshot_add([snapshot1]) | swh_storage.snapshot_add([snapshot1]) | ||||
with pytest.raises(InexistentObjectsError, match="Branch/Revision"): | with pytest.raises(InexistentObjectsError, match="Branch/Revision"): | ||||
check_snapshot(snapshot1, swh_storage) | check_snapshot(snapshot1, swh_storage) | ||||
# 5. snapshot is found in storage, targeted revision exists but the directory the | # 5. snapshot is found in storage, targeted revision exists but the directory the | ||||
# revision targets does not exist | # revision targets does not exist | ||||
swh_storage.revision_add([REVISION.to_dict()]) | swh_storage.revision_add([REVISION]) | ||||
dir_not_found = list(swh_storage.directory_missing([REVISION.directory])) | dir_not_found = list(swh_storage.directory_missing([REVISION.directory])) | ||||
assert len(dir_not_found) == 1 | assert len(dir_not_found) == 1 | ||||
snapshot2 = Snapshot( | snapshot2 = Snapshot( | ||||
id=hash_to_bytes("987123f535f882bc7f9a18fb16c9ad27fda7bab7"), | id=hash_to_bytes("987123f535f882bc7f9a18fb16c9ad27fda7bab7"), | ||||
branches={ | branches={ | ||||
b"alias": SnapshotBranch(target=b"HEAD", target_type=TargetType.ALIAS,), | b"alias": SnapshotBranch(target=b"HEAD", target_type=TargetType.ALIAS,), | ||||
b"HEAD": SnapshotBranch( | b"HEAD": SnapshotBranch( | ||||
target=REVISION.id, target_type=TargetType.REVISION, | target=REVISION.id, target_type=TargetType.REVISION, | ||||
), | ), | ||||
}, | }, | ||||
) | ) | ||||
swh_storage.snapshot_add([snapshot2.to_dict()]) | swh_storage.snapshot_add([snapshot2]) | ||||
with pytest.raises(InexistentObjectsError, match="Missing directories"): | with pytest.raises(InexistentObjectsError, match="Missing directories"): | ||||
check_snapshot(snapshot2, swh_storage) | check_snapshot(snapshot2, swh_storage) | ||||
assert DIRECTORY.id == REVISION.directory | assert DIRECTORY.id == REVISION.directory | ||||
swh_storage.directory_add([DIRECTORY]) | swh_storage.directory_add([DIRECTORY]) | ||||
# 6. snapshot is found in storage, target revision exists, targeted directory by the | # 6. snapshot is found in storage, target revision exists, targeted directory by the | ||||
# revision exist. Content targeted by the directory does not exist. | # revision exist. Content targeted by the directory does not exist. | ||||
assert DIRECTORY.entries[0].target == CONTENT.sha1_git | assert DIRECTORY.entries[0].target == CONTENT.sha1_git | ||||
not_found = list(swh_storage.content_missing_per_sha1_git([CONTENT.sha1_git])) | not_found = list(swh_storage.content_missing_per_sha1_git([CONTENT.sha1_git])) | ||||
assert len(not_found) == 1 | assert len(not_found) == 1 | ||||
swh_storage.directory_add([DIRECTORY.to_dict()]) | swh_storage.directory_add([DIRECTORY]) | ||||
snapshot3 = Snapshot( | snapshot3 = Snapshot( | ||||
id=hash_to_bytes("091456f535f882bc7f9a18fb16c9ad27fda7bab7"), | id=hash_to_bytes("091456f535f882bc7f9a18fb16c9ad27fda7bab7"), | ||||
branches={ | branches={ | ||||
b"alias": SnapshotBranch(target=b"HEAD", target_type=TargetType.ALIAS,), | b"alias": SnapshotBranch(target=b"HEAD", target_type=TargetType.ALIAS,), | ||||
b"HEAD": SnapshotBranch( | b"HEAD": SnapshotBranch( | ||||
target=REVISION.id, target_type=TargetType.REVISION, | target=REVISION.id, target_type=TargetType.REVISION, | ||||
), | ), | ||||
}, | }, | ||||
) | ) | ||||
swh_storage.snapshot_add([snapshot3.to_dict()]) | swh_storage.snapshot_add([snapshot3]) | ||||
with pytest.raises(InexistentObjectsError, match="Missing content(s)"): | with pytest.raises(InexistentObjectsError, match="Missing content(s)"): | ||||
check_snapshot(snapshot3, swh_storage) | check_snapshot(snapshot3, swh_storage) | ||||
# 7. snapshot is found in storage, targeted release does not exist | # 7. snapshot is found in storage, targeted release does not exist | ||||
# release targets the revisions which exists | # release targets the revisions which exists | ||||
assert RELEASE.target == REVISION.id | assert RELEASE.target == REVISION.id | ||||
Show All 17 Lines |