Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/cql.py
Show First 20 Lines • Show All 45 Lines • ▼ Show 20 Lines | |||||
from swh.storage.interface import ListOrder | from swh.storage.interface import ListOrder | ||||
from .common import TOKEN_BEGIN, TOKEN_END, hash_url, remove_keys | from .common import TOKEN_BEGIN, TOKEN_END, hash_url, remove_keys | ||||
from .model import ( | from .model import ( | ||||
BaseRow, | BaseRow, | ||||
ContentRow, | ContentRow, | ||||
DirectoryEntryRow, | DirectoryEntryRow, | ||||
DirectoryRow, | DirectoryRow, | ||||
MAGIC_NULL_PK, | |||||
MetadataAuthorityRow, | MetadataAuthorityRow, | ||||
MetadataFetcherRow, | MetadataFetcherRow, | ||||
ObjectCountRow, | ObjectCountRow, | ||||
OriginRow, | OriginRow, | ||||
OriginVisitRow, | OriginVisitRow, | ||||
OriginVisitStatusRow, | OriginVisitStatusRow, | ||||
RawExtrinsicMetadataRow, | RawExtrinsicMetadataRow, | ||||
ReleaseRow, | ReleaseRow, | ||||
▲ Show 20 Lines • Show All 318 Lines • ▼ Show 20 Lines | ) -> Iterable[int]: | ||||
return ( | return ( | ||||
row["target_token"] for row in self._execute_with_retries(query, [hash_]) | row["target_token"] for row in self._execute_with_retries(query, [hash_]) | ||||
) | ) | ||||
########################## | ########################## | ||||
# 'skipped_content' table | # 'skipped_content' table | ||||
########################## | ########################## | ||||
_magic_null_pk = b"<null>" | |||||
""" | |||||
NULLs (or all-empty blobs) are not allowed in primary keys; instead use a | |||||
special value that can't possibly be a valid hash. | |||||
""" | |||||
def _skipped_content_add_finalize(self, statement: BoundStatement) -> None: | def _skipped_content_add_finalize(self, statement: BoundStatement) -> None: | ||||
"""Returned currified by skipped_content_add_prepare, to be called | """Returned currified by skipped_content_add_prepare, to be called | ||||
when the content row should be added to the primary table.""" | when the content row should be added to the primary table.""" | ||||
self._execute_with_retries(statement, None) | self._execute_with_retries(statement, None) | ||||
self._increment_counter("skipped_content", 1) | self._increment_counter("skipped_content", 1) | ||||
@_prepared_insert_statement(SkippedContentRow) | @_prepared_insert_statement(SkippedContentRow) | ||||
def skipped_content_add_prepare( | def skipped_content_add_prepare( | ||||
self, content, *, statement | self, content, *, statement | ||||
) -> Tuple[int, Callable[[], None]]: | ) -> Tuple[int, Callable[[], None]]: | ||||
"""Prepares insertion of a Content to the main 'skipped_content' table. | """Prepares insertion of a Content to the main 'skipped_content' table. | ||||
Returns a token (to be used in secondary tables), and a function to be | Returns a token (to be used in secondary tables), and a function to be | ||||
called to perform the insertion in the main table.""" | called to perform the insertion in the main table.""" | ||||
# Replace NULLs (which are not allowed in the partition key) with | # Replace NULLs (which are not allowed in the partition key) with | ||||
# an empty byte string | # an empty byte string | ||||
for key in SkippedContentRow.PARTITION_KEY: | for key in SkippedContentRow.PARTITION_KEY: | ||||
if getattr(content, key) is None: | if getattr(content, key) is None: | ||||
setattr(content, key, self._magic_null_pk) | setattr(content, key, MAGIC_NULL_PK) | ||||
statement = statement.bind(dataclasses.astuple(content)) | statement = statement.bind(dataclasses.astuple(content)) | ||||
# Type used for hashing keys (usually, it will be | # Type used for hashing keys (usually, it will be | ||||
# cassandra.metadata.Murmur3Token) | # cassandra.metadata.Murmur3Token) | ||||
token_class = self._cluster.metadata.token_map.token_class | token_class = self._cluster.metadata.token_map.token_class | ||||
# Token of the row when it will be inserted. This is equivalent to | # Token of the row when it will be inserted. This is equivalent to | ||||
Show All 15 Lines | @_prepared_select_statement( | ||||
f"WHERE {' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))}", | f"WHERE {' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))}", | ||||
) | ) | ||||
def skipped_content_get_from_pk( | def skipped_content_get_from_pk( | ||||
self, content_hashes: Dict[str, bytes], *, statement | self, content_hashes: Dict[str, bytes], *, statement | ||||
) -> Optional[SkippedContentRow]: | ) -> Optional[SkippedContentRow]: | ||||
rows = list( | rows = list( | ||||
self._execute_with_retries( | self._execute_with_retries( | ||||
statement, | statement, | ||||
[ | [content_hashes[algo] or MAGIC_NULL_PK for algo in HASH_ALGORITHMS], | ||||
content_hashes[algo] or self._magic_null_pk | |||||
for algo in HASH_ALGORITHMS | |||||
], | |||||
) | ) | ||||
) | ) | ||||
assert len(rows) <= 1 | assert len(rows) <= 1 | ||||
if rows: | if rows: | ||||
# TODO: convert _magic_null_pk back to None? | |||||
return SkippedContentRow.from_dict(rows[0]) | return SkippedContentRow.from_dict(rows[0]) | ||||
else: | else: | ||||
return None | return None | ||||
@_prepared_select_statement( | |||||
SkippedContentRow, | |||||
f"WHERE token({', '.join(SkippedContentRow.PARTITION_KEY)}) = ?", | |||||
) | |||||
def skipped_content_get_from_token( | |||||
self, token, *, statement | |||||
) -> Iterable[SkippedContentRow]: | |||||
return map( | |||||
SkippedContentRow.from_dict, self._execute_with_retries(statement, [token]) | |||||
) | |||||
########################## | ########################## | ||||
# 'skipped_content_by_*' tables | # 'skipped_content_by_*' tables | ||||
########################## | ########################## | ||||
def skipped_content_index_add_one( | def skipped_content_index_add_one( | ||||
self, algo: str, content: SkippedContent, token: int | self, algo: str, content: SkippedContent, token: int | ||||
) -> None: | ) -> None: | ||||
"""Adds a row mapping content[algo] to the token of the SkippedContent | """Adds a row mapping content[algo] to the token of the SkippedContent | ||||
in the main 'skipped_content' table.""" | in the main 'skipped_content' table.""" | ||||
query = ( | query = ( | ||||
f"INSERT INTO skipped_content_by_{algo} ({algo}, target_token) " | f"INSERT INTO skipped_content_by_{algo} ({algo}, target_token) " | ||||
f"VALUES (%s, %s)" | f"VALUES (%s, %s)" | ||||
) | ) | ||||
self._execute_with_retries( | self._execute_with_retries( | ||||
query, [content.get_hash(algo) or self._magic_null_pk, token] | query, [content.get_hash(algo) or MAGIC_NULL_PK, token] | ||||
) | |||||
def skipped_content_get_tokens_from_single_hash( | |||||
self, algo: str, hash_: bytes | |||||
) -> Iterable[int]: | |||||
assert algo in HASH_ALGORITHMS | |||||
query = f"SELECT target_token FROM skipped_content_by_{algo} WHERE {algo} = %s" | |||||
return ( | |||||
row["target_token"] for row in self._execute_with_retries(query, [hash_]) | |||||
) | ) | ||||
########################## | ########################## | ||||
# 'revision' table | # 'revision' table | ||||
########################## | ########################## | ||||
@_prepared_exists_statement("revision") | @_prepared_exists_statement("revision") | ||||
def revision_missing(self, ids: List[bytes], *, statement) -> List[bytes]: | def revision_missing(self, ids: List[bytes], *, statement) -> List[bytes]: | ||||
▲ Show 20 Lines • Show All 485 Lines • Show Last 20 Lines |