Changeset View
Changeset View
Standalone View
Standalone View
swh/web/common/archive.py
Show First 20 Lines • Show All 1,400 Lines • ▼ Show 20 Lines | def lookup_object(object_type: ObjectType, object_id: str) -> Dict[str, Any]: | ||||
elif object_type == ObjectType.REVISION: | elif object_type == ObjectType.REVISION: | ||||
return lookup_revision(object_id) | return lookup_revision(object_id) | ||||
elif object_type == ObjectType.SNAPSHOT: | elif object_type == ObjectType.SNAPSHOT: | ||||
return lookup_snapshot(object_id) | return lookup_snapshot(object_id) | ||||
else: | else: | ||||
raise ValueError(f"Unexpected object type variant: {object_type}") | raise ValueError(f"Unexpected object type variant: {object_type}") | ||||
def lookup_missing_hashes(grouped_swhids: Dict[str, List[bytes]]) -> Set[str]: | def _identifiers_missing(obj_type: ObjectType, obj_ids: List[bytes]) -> Set[bytes]: | ||||
return { | |||||
ObjectType.CONTENT: storage.content_missing_per_sha1_git, | |||||
ObjectType.DIRECTORY: storage.directory_missing, | |||||
ObjectType.REVISION: storage.revision_missing, | |||||
ObjectType.RELEASE: storage.release_missing, | |||||
ObjectType.SNAPSHOT: storage.snapshot_missing, | |||||
}[obj_type](obj_ids) | |||||
def lookup_missing_hashes( | |||||
grouped_swhids: Dict[ObjectType, List[bytes]] | |||||
) -> Dict[ObjectType, Set[bytes]]: | |||||
"""Lookup missing Software Heritage persistent identifier hash, using | """Lookup missing Software Heritage persistent identifier hash, using | ||||
batch processing. | batch processing. | ||||
Args: | Args: | ||||
A dictionary with: | A dictionary with: | ||||
keys: object types | keys: object types | ||||
values: object hashes | values: object hashes | ||||
Returns: | Returns: | ||||
A set(hexadecimal) of the hashes not found in the storage | A dictionary per type with set(bytes) of the hashes not found in the storage | ||||
""" | """ | ||||
missing_hashes = [] | return { | ||||
_identifiers_missing(obj_type, obj_ids) | |||||
for obj_type, obj_ids in grouped_swhids.items(): | for obj_type, obj_ids in grouped_swhids.items() | ||||
if obj_type == ObjectType.CONTENT: | } | ||||
missing_hashes.append(storage.content_missing_per_sha1_git(obj_ids)) | |||||
elif obj_type == ObjectType.DIRECTORY: | |||||
missing_hashes.append(storage.directory_missing(obj_ids)) | |||||
elif obj_type == ObjectType.REVISION: | |||||
missing_hashes.append(storage.revision_missing(obj_ids)) | |||||
elif obj_type == ObjectType.RELEASE: | |||||
missing_hashes.append(storage.release_missing(obj_ids)) | |||||
elif obj_type == ObjectType.SNAPSHOT: | |||||
missing_hashes.append(storage.snapshot_missing(obj_ids)) | |||||
missing = set( | |||||
map(lambda x: hashutil.hash_to_hex(x), itertools.chain(*missing_hashes)) | |||||
) | |||||
return missing | |||||
def lookup_origins_by_sha1s(sha1s: List[str]) -> Iterator[Optional[OriginInfo]]: | def lookup_origins_by_sha1s(sha1s: List[str]) -> Iterator[Optional[OriginInfo]]: | ||||
"""Lookup origins from the sha1 hash values of their URLs. | """Lookup origins from the sha1 hash values of their URLs. | ||||
Args: | Args: | ||||
sha1s: list of sha1s hexadecimal representation | sha1s: list of sha1s hexadecimal representation | ||||
Yields: | Yields: | ||||
origin information as dict | origin information as dict | ||||
""" | """ | ||||
sha1s_bytes = [hashutil.hash_to_bytes(sha1) for sha1 in sha1s] | sha1s_bytes = [hashutil.hash_to_bytes(sha1) for sha1 in sha1s] | ||||
origins = storage.origin_get_by_sha1(sha1s_bytes) | origins = storage.origin_get_by_sha1(sha1s_bytes) | ||||
for origin in origins: | for origin in origins: | ||||
yield converters.from_origin(origin) | yield converters.from_origin(origin) |