Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/storage.py
Show First 20 Lines • Show All 272 Lines • ▼ Show 20 Lines | ) -> Iterable[Optional[Dict[str, bytes]]]: | ||||
if len(contents) > BULK_BLOCK_CONTENT_LEN_MAX: | if len(contents) > BULK_BLOCK_CONTENT_LEN_MAX: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
f"Send at maximum {BULK_BLOCK_CONTENT_LEN_MAX} contents." | f"Send at maximum {BULK_BLOCK_CONTENT_LEN_MAX} contents." | ||||
) | ) | ||||
yield from self.objstorage.content_get(contents) | yield from self.objstorage.content_get(contents) | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def content_get_range( | |||||
self, start: bytes, end: bytes, limit: int = 1000, db=None, cur=None | |||||
) -> Dict[str, Any]: | |||||
if limit is None: | |||||
raise StorageArgumentException("limit should not be None") | |||||
contents = [] | |||||
next_content = None | |||||
for counter, content_row in enumerate( | |||||
db.content_get_range(start, end, limit + 1, cur) | |||||
): | |||||
content = dict(zip(db.content_get_metadata_keys, content_row)) | |||||
if counter >= limit: | |||||
# take the last commit for the next page starting from this | |||||
next_content = content["sha1"] | |||||
break | |||||
contents.append(content) | |||||
return { | |||||
"contents": contents, | |||||
"next": next_content, | |||||
} | |||||
@timed | |||||
def content_get_partition( | def content_get_partition( | ||||
self, | self, | ||||
partition_id: int, | partition_id: int, | ||||
nb_partitions: int, | nb_partitions: int, | ||||
limit: int = 1000, | |||||
page_token: Optional[str] = None, | page_token: Optional[str] = None, | ||||
) -> Dict[str, Any]: | limit: int = 1000, | ||||
db=None, | |||||
cur=None, | |||||
) -> PagedResult[Content]: | |||||
if limit is None: | if limit is None: | ||||
raise StorageArgumentException("limit should not be None") | raise StorageArgumentException("limit should not be None") | ||||
(start, end) = get_partition_bounds_bytes( | (start, end) = get_partition_bounds_bytes( | ||||
partition_id, nb_partitions, SHA1_SIZE | partition_id, nb_partitions, SHA1_SIZE | ||||
) | ) | ||||
if page_token: | if page_token: | ||||
start = hash_to_bytes(page_token) | start = hash_to_bytes(page_token) | ||||
if end is None: | if end is None: | ||||
end = b"\xff" * SHA1_SIZE | end = b"\xff" * SHA1_SIZE | ||||
result = self.content_get_range(start, end, limit) | |||||
result2 = { | next_page_token: Optional[str] = None | ||||
"contents": result["contents"], | contents = [] | ||||
"next_page_token": None, | for counter, row in enumerate(db.content_get_range(start, end, limit + 1, cur)): | ||||
} | row_d = dict(zip(db.content_get_metadata_keys, row)) | ||||
if result["next"]: | content = Content(**row_d) | ||||
result2["next_page_token"] = hash_to_hex(result["next"]) | if counter >= limit: | ||||
return result2 | # take the last content for the next page starting from this | ||||
next_page_token = hash_to_hex(content.sha1) | |||||
break | |||||
contents.append(content) | |||||
assert len(contents) <= limit | |||||
return PagedResult(results=contents, next_page_token=next_page_token) | |||||
@timed | @timed | ||||
@db_transaction(statement_timeout=500) | @db_transaction(statement_timeout=500) | ||||
def content_get_metadata( | def content_get_metadata( | ||||
self, contents: List[bytes], db=None, cur=None | self, contents: List[bytes], db=None, cur=None | ||||
) -> Dict[bytes, List[Dict]]: | ) -> Dict[bytes, List[Dict]]: | ||||
result: Dict[bytes, List[Dict]] = {sha1: [] for sha1 in contents} | result: Dict[bytes, List[Dict]] = {sha1: [] for sha1 in contents} | ||||
for row in db.content_get_metadata_from_sha1s(contents, cur): | for row in db.content_get_metadata_from_sha1s(contents, cur): | ||||
▲ Show 20 Lines • Show All 1,113 Lines • Show Last 20 Lines |