Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/tests/test_init.py
Show First 20 Lines • Show All 355 Lines • ▼ Show 20 Lines | |||||
def test_check_snapshot_failures(swh_storage): | def test_check_snapshot_failures(swh_storage): | ||||
"""Failure scenarios: | """Failure scenarios: | ||||
0. snapshot parameter is not a snapshot | 0. snapshot parameter is not a snapshot | ||||
1. snapshot id is correct but branches mismatched | 1. snapshot id is correct but branches mismatched | ||||
2. snapshot id is not correct, it's not found in the storage | 2. snapshot id is not correct, it's not found in the storage | ||||
3. snapshot reference an alias which does not exist | 3. snapshot reference an alias which does not exist | ||||
4. snapshot is found in storage, targeted revision does not exist | 4. snapshot is found in storage, targeted revision does not exist | ||||
5. snapshot is found in storage, targeted release does not exist | 5. snapshot is found in storage, targeted revision exists but the directory the | ||||
revision targets does not exist | |||||
6. snapshot is found in storage, targeted release does not exist | |||||
The following are not dealt with yet: | The following are not dealt with yet: | ||||
6. snapshot is found in storage, targeted directory does not exist | 7. snapshot is found in storage, nested targeted directories does not exist | ||||
7. snapshot is found in storage, targeted content does not exist | 8. snapshot is found in storage, nested targeted contents does not exist | ||||
""" | """ | ||||
snap_id_hex = "2498dbf535f882bc7f9a18fb16c9ad27fda7bab7" | snap_id_hex = "2498dbf535f882bc7f9a18fb16c9ad27fda7bab7" | ||||
snapshot = Snapshot( | snapshot = Snapshot( | ||||
id=hash_to_bytes(snap_id_hex), | id=hash_to_bytes(snap_id_hex), | ||||
branches={ | branches={ | ||||
b"master": SnapshotBranch( | b"master": SnapshotBranch( | ||||
target=hash_to_bytes(hash_hex), target_type=TargetType.REVISION, | target=hash_to_bytes(hash_hex), target_type=TargetType.REVISION, | ||||
Show All 40 Lines | snapshot0 = Snapshot( | ||||
b"alias": SnapshotBranch(target=b"HEAD", target_type=TargetType.ALIAS,), | b"alias": SnapshotBranch(target=b"HEAD", target_type=TargetType.ALIAS,), | ||||
}, | }, | ||||
) | ) | ||||
swh_storage.snapshot_add([snapshot0]) | swh_storage.snapshot_add([snapshot0]) | ||||
with pytest.raises(InconsistentAliasBranchError, match="Alias branch HEAD"): | with pytest.raises(InconsistentAliasBranchError, match="Alias branch HEAD"): | ||||
check_snapshot(snapshot0, swh_storage) | check_snapshot(snapshot0, swh_storage) | ||||
# 4. snapshot exists, revision exists but referenced an unknown directory | |||||
not_yet = list(swh_storage.directory_missing([DIRECTORY.id])) | |||||
assert len(not_yet) == 1 | |||||
# 4. snapshot is found in storage, targeted revision does not exist | # 4. snapshot is found in storage, targeted revision does not exist | ||||
snapshot1 = Snapshot( | snapshot1 = Snapshot( | ||||
id=hash_to_bytes("456666f535f882bc7f9a18fb16c9ad27fda7bab7"), | id=hash_to_bytes("456666f535f882bc7f9a18fb16c9ad27fda7bab7"), | ||||
branches={ | branches={ | ||||
b"alias": SnapshotBranch(target=b"HEAD", target_type=TargetType.ALIAS,), | b"alias": SnapshotBranch(target=b"HEAD", target_type=TargetType.ALIAS,), | ||||
b"HEAD": SnapshotBranch( | b"HEAD": SnapshotBranch( | ||||
target=REVISION.id, target_type=TargetType.REVISION, | target=REVISION.id, target_type=TargetType.REVISION, | ||||
), | ), | ||||
}, | }, | ||||
) | ) | ||||
not_yet = list(swh_storage.directory_missing([DIRECTORY.id])) | |||||
assert len(not_yet) == 1 | |||||
swh_storage.snapshot_add([snapshot1]) | swh_storage.snapshot_add([snapshot1]) | ||||
with pytest.raises(InexistentObjectsError, match="Branch/Revision"): | with pytest.raises(InexistentObjectsError, match="Branch/Revision"): | ||||
check_snapshot(snapshot1, swh_storage) | check_snapshot(snapshot1, swh_storage) | ||||
# 5. snapshot is found in storage, targeted revision exists but the directory the | |||||
# revision targets does not exist | |||||
not_found = list(swh_storage.directory_missing([REVISION.directory])) | |||||
assert len(not_found) == 1 | |||||
swh_storage.revision_add([REVISION.to_dict()]) | swh_storage.revision_add([REVISION.to_dict()]) | ||||
snapshot2 = Snapshot( | snapshot2 = Snapshot( | ||||
id=hash_to_bytes("987123f535f882bc7f9a18fb16c9ad27fda7bab7"), | |||||
branches={ | |||||
b"alias": SnapshotBranch(target=b"HEAD", target_type=TargetType.ALIAS,), | |||||
b"HEAD": SnapshotBranch( | |||||
target=REVISION.id, target_type=TargetType.REVISION, | |||||
), | |||||
}, | |||||
) | |||||
swh_storage.snapshot_add([snapshot2.to_dict()]) | |||||
with pytest.raises(InexistentObjectsError, match="Missing directories"): | |||||
check_snapshot(snapshot2, swh_storage) | |||||
assert DIRECTORY.id == REVISION.directory | |||||
swh_storage.directory_add([DIRECTORY]) | |||||
# 6. snapshot is found in storage, targeted release does not exist | |||||
snapshot3 = Snapshot( | |||||
id=hash_to_bytes("789666f535f882bc7f9a18fb16c9ad27fda7bab7"), | id=hash_to_bytes("789666f535f882bc7f9a18fb16c9ad27fda7bab7"), | ||||
branches={ | branches={ | ||||
b"alias": SnapshotBranch(target=b"HEAD", target_type=TargetType.ALIAS,), | b"alias": SnapshotBranch(target=b"HEAD", target_type=TargetType.ALIAS,), | ||||
b"HEAD": SnapshotBranch( | b"HEAD": SnapshotBranch( | ||||
target=REVISION.id, target_type=TargetType.REVISION, | target=REVISION.id, target_type=TargetType.REVISION, | ||||
), | ), | ||||
b"release/0.1.0": SnapshotBranch( | b"release/0.1.0": SnapshotBranch( | ||||
target=RELEASE.id, target_type=TargetType.RELEASE, | target=RELEASE.id, target_type=TargetType.RELEASE, | ||||
), | ), | ||||
}, | }, | ||||
) | ) | ||||
swh_storage.snapshot_add([snapshot2]) | swh_storage.snapshot_add([snapshot3]) | ||||
with pytest.raises(InexistentObjectsError, match="Branch/Release"): | with pytest.raises(InexistentObjectsError, match="Branch/Release"): | ||||
check_snapshot(snapshot2, swh_storage) | check_snapshot(snapshot3, swh_storage) |