Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/tests/__init__.py
Show First 20 Lines • Show All 154 Lines • ▼ Show 20 Lines | if snapshot_dict is None: | ||||
raise AssertionError(f"Snapshot {expected_snapshot.id.hex()} is not found") | raise AssertionError(f"Snapshot {expected_snapshot.id.hex()} is not found") | ||||
snapshot_dict.pop("next_branch") | snapshot_dict.pop("next_branch") | ||||
actual_snaphot = Snapshot.from_dict(snapshot_dict) | actual_snaphot = Snapshot.from_dict(snapshot_dict) | ||||
assert isinstance(actual_snaphot, Snapshot) | assert isinstance(actual_snaphot, Snapshot) | ||||
assert expected_snapshot == actual_snaphot | assert expected_snapshot == actual_snaphot | ||||
branches_by_target_type = defaultdict(list) | objects_by_target_type = defaultdict(list) | ||||
object_to_branch = {} | object_to_branch = {} | ||||
for branch, target in actual_snaphot.branches.items(): | for branch, target in actual_snaphot.branches.items(): | ||||
if (target.target_type, branch) in allowed_empty: | if (target.target_type, branch) in allowed_empty: | ||||
# safe for those elements to not be checked for existence | # safe for those elements to not be checked for existence | ||||
continue | continue | ||||
branches_by_target_type[target.target_type].append(target.target) | objects_by_target_type[target.target_type].append(target.target) | ||||
object_to_branch[target.target] = branch | object_to_branch[target.target] = branch | ||||
# check that alias references target something that exists, otherwise raise | # check that alias references target something that exists, otherwise raise | ||||
aliases: List[bytes] = branches_by_target_type.get(TargetType.ALIAS, []) | aliases: List[bytes] = objects_by_target_type.get(TargetType.ALIAS, []) | ||||
for alias in aliases: | for alias in aliases: | ||||
if alias not in actual_snaphot.branches: | if alias not in actual_snaphot.branches: | ||||
raise InconsistentAliasBranchError( | raise InconsistentAliasBranchError( | ||||
f"Alias branch {alias.decode('utf-8')} " | f"Alias branch {alias.decode('utf-8')} " | ||||
f"should be in {list(actual_snaphot.branches)}" | f"should be in {list(actual_snaphot.branches)}" | ||||
) | ) | ||||
revs = branches_by_target_type.get(TargetType.REVISION) | revs = objects_by_target_type.get(TargetType.REVISION) | ||||
if revs: | if revs: | ||||
revisions = list(storage.revision_get(revs)) | revisions = list(storage.revision_get(revs)) | ||||
not_found = [rev_id for rev_id, rev in zip(revs, revisions) if rev is None] | not_found = [rev_id for rev_id, rev in zip(revs, revisions) if rev is None] | ||||
if not_found: | if not_found: | ||||
missing_objs = ", ".join( | missing_objs = ", ".join( | ||||
str((object_to_branch[rev], rev.hex())) for rev in not_found | str((object_to_branch[rev], rev.hex())) for rev in not_found | ||||
) | ) | ||||
raise InexistentObjectsError( | raise InexistentObjectsError( | ||||
f"Branch/Revision(s) {missing_objs} should exist in storage" | f"Branch/Revision(s) {missing_objs} should exist in storage" | ||||
) | ) | ||||
# retrieve information from revision | |||||
for rev in revisions: | |||||
objects_by_target_type[TargetType.DIRECTORY].append(rev["directory"]) | |||||
object_to_branch[rev["directory"]] = rev["id"] | |||||
rels = branches_by_target_type.get(TargetType.RELEASE) | rels = objects_by_target_type.get(TargetType.RELEASE) | ||||
if rels: | if rels: | ||||
releases = list(storage.release_get(rels)) | not_found = list(storage.release_missing(rels)) | ||||
not_found = [rel_id for rel_id, rel in zip(rels, releases) if rel is None] | |||||
if not_found: | if not_found: | ||||
missing_objs = ", ".join( | missing_objs = ", ".join( | ||||
str((object_to_branch[rel], rel.hex())) for rel in not_found | str((object_to_branch[rel], rel.hex())) for rel in not_found | ||||
) | ) | ||||
raise InexistentObjectsError( | raise InexistentObjectsError( | ||||
f"Branch/Release(s) {missing_objs} should exist in storage" | f"Branch/Release(s) {missing_objs} should exist in storage" | ||||
) | ) | ||||
dirs = objects_by_target_type.get(TargetType.DIRECTORY) | |||||
if dirs: | |||||
not_found = list(storage.directory_missing(dirs)) | |||||
if not_found: | |||||
missing_objs = ", ".join( | |||||
str((object_to_branch[dir_].hex(), dir_.hex())) for dir_ in not_found | |||||
) | |||||
raise InexistentObjectsError( | |||||
f"Missing directories {missing_objs}: " | |||||
"(revision exists, directory target does not)" | |||||
) | |||||
# for retro compat, returned the dict, remove when clients are migrated | # for retro compat, returned the dict, remove when clients are migrated | ||||
return snapshot_dict | return snapshot_dict | ||||
def get_stats(storage) -> Dict: | def get_stats(storage) -> Dict: | ||||
"""Adaptation utils to unify the stats counters across storage | """Adaptation utils to unify the stats counters across storage | ||||
implementation. | implementation. | ||||
Show All 16 Lines |