Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/cql.py
Show First 20 Lines • Show All 367 Lines • ▼ Show 20 Lines | ) -> Optional[ContentRow]: | ||||
) | ) | ||||
) | ) | ||||
assert len(rows) <= 1 | assert len(rows) <= 1 | ||||
if rows: | if rows: | ||||
return ContentRow(**rows[0]) | return ContentRow(**rows[0]) | ||||
else: | else: | ||||
return None | return None | ||||
def content_missing_from_hashes( | |||||
self, contents_hashes: List[Dict[str, bytes]] | |||||
) -> Iterator[Dict[str, bytes]]: | |||||
for group in grouper(contents_hashes, PARTITION_KEY_RESTRICTION_MAX_SIZE): | |||||
group = list(group) | |||||
# Get all contents that share a sha256 with one of the contents in the group | |||||
present = set( | |||||
self._content_get_hashes_from_sha256( | |||||
[content["sha256"] for content in group] | |||||
) | |||||
) | |||||
for content in group: | |||||
for algo in HASH_ALGORITHMS: | |||||
assert content.get(algo) is not None, ( | |||||
"content_missing_from_hashes must not be called with " | |||||
"partial hashes." | |||||
) | |||||
if tuple(content[algo] for algo in HASH_ALGORITHMS) not in present: | |||||
yield content | |||||
@_prepared_statement( | |||||
f"SELECT {', '.join(HASH_ALGORITHMS)} FROM content WHERE sha256 IN ?" | |||||
) | |||||
def _content_get_hashes_from_sha256( | |||||
self, ids: List[bytes], *, statement | |||||
) -> Iterator[Tuple[bytes, bytes, bytes, bytes]]: | |||||
for row in self._execute_with_retries(statement, [ids]): | |||||
yield tuple(row[algo] for algo in HASH_ALGORITHMS) # type: ignore | |||||
@_prepared_select_statement( | @_prepared_select_statement( | ||||
ContentRow, f"WHERE token({', '.join(ContentRow.PARTITION_KEY)}) = ?" | ContentRow, f"WHERE token({', '.join(ContentRow.PARTITION_KEY)}) = ?" | ||||
) | ) | ||||
def content_get_from_token(self, token, *, statement) -> Iterable[ContentRow]: | def content_get_from_token(self, token, *, statement) -> Iterable[ContentRow]: | ||||
return map(ContentRow.from_dict, self._execute_with_retries(statement, [token])) | return map(ContentRow.from_dict, self._execute_with_retries(statement, [token])) | ||||
@_prepared_select_statement( | @_prepared_select_statement( | ||||
ContentRow, f"WHERE token({', '.join(ContentRow.PARTITION_KEY)}) > ? LIMIT 1" | ContentRow, f"WHERE token({', '.join(ContentRow.PARTITION_KEY)}) > ? LIMIT 1" | ||||
▲ Show 20 Lines • Show All 909 Lines • Show Last 20 Lines |