Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/cql.py
Show First 20 Lines • Show All 265 Lines • ▼ Show 20 Lines | class CqlRunner: | ||||
########################## | ########################## | ||||
# 'skipped_content' table | # 'skipped_content' table | ||||
########################## | ########################## | ||||
_skipped_content_pk = ['sha1', 'sha1_git', 'sha256', 'blake2s256'] | _skipped_content_pk = ['sha1', 'sha1_git', 'sha256', 'blake2s256'] | ||||
_skipped_content_keys = [ | _skipped_content_keys = [ | ||||
'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', | 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', | ||||
'ctime', 'status', 'reason', 'origin'] | 'ctime', 'status', 'reason', 'origin'] | ||||
_magic_null_pk = b'' | _magic_null_pk = b'<null>' | ||||
""" | """ | ||||
NULLs are not allowed in primary keys; instead use an empty | NULLs (or all-empty blobs) are not allowed in primary keys; instead use a | ||||
value | special value that can't possibly be a valid hash. | ||||
""" | """ | ||||
def _skipped_content_add_finalize(self, statement: BoundStatement) -> None: | |||||
"""Returned currified by skipped_content_add_prepare, to be called | |||||
when the content row should be added to the primary table.""" | |||||
self._execute_with_retries(statement, None) | |||||
self._increment_counter('skipped_content', 1) | |||||
@_prepared_insert_statement('skipped_content', _skipped_content_keys) | @_prepared_insert_statement('skipped_content', _skipped_content_keys) | ||||
def skipped_content_add_one(self, content, *, statement) -> None: | def skipped_content_add_prepare( | ||||
self, content, *, statement) -> Tuple[int, Callable[[], None]]: | |||||
"""Prepares insertion of a Content to the main 'skipped_content' table. | |||||
Returns a token (to be used in secondary tables), and a function to be | |||||
called to perform the insertion in the main table.""" | |||||
# Replace NULLs (which are not allowed in the partition key) with | |||||
# an empty byte string | |||||
content = content.to_dict() | content = content.to_dict() | ||||
for key in self._skipped_content_pk: | for key in self._skipped_content_pk: | ||||
if content[key] is None: | if content[key] is None: | ||||
content[key] = self._magic_null_pk | content[key] = self._magic_null_pk | ||||
content = SkippedContent.from_dict(content) | |||||
self._add_one(statement, 'skipped_content', content, | statement = statement.bind([ | ||||
self._skipped_content_keys) | content.get(key) for key in self._skipped_content_keys]) | ||||
# Type used for hashing keys (usually, it will be | |||||
# cassandra.metadata.Murmur3Token) | |||||
token_class = self._cluster.metadata.token_map.token_class | |||||
# Token of the row when it will be inserted. This is equivalent to | |||||
# "SELECT token({', '.join(self._content_pk)}) | |||||
# FROM skipped_content WHERE ..." | |||||
# after the row is inserted; but we need the token to insert in the | |||||
# index tables *before* inserting to the main 'skipped_content' table | |||||
token = token_class.from_key(statement.routing_key).value | |||||
assert TOKEN_BEGIN <= token <= TOKEN_END | |||||
# Function to be called after the indexes contain their respective | |||||
# row | |||||
finalizer = functools.partial( | |||||
self._skipped_content_add_finalize, statement) | |||||
return (token, finalizer) | |||||
@_prepared_statement('SELECT * FROM skipped_content WHERE ' + | @_prepared_statement('SELECT * FROM skipped_content WHERE ' + | ||||
' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))) | ' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))) | ||||
def skipped_content_get_from_pk( | def skipped_content_get_from_pk( | ||||
self, content_hashes: Dict[str, bytes], *, statement | self, content_hashes: Dict[str, bytes], *, statement | ||||
) -> Optional[Row]: | ) -> Optional[Row]: | ||||
rows = list(self._execute_with_retries( | rows = list(self._execute_with_retries( | ||||
statement, [content_hashes[algo] or self._magic_null_pk | statement, [content_hashes[algo] or self._magic_null_pk | ||||
for algo in HASH_ALGORITHMS])) | for algo in HASH_ALGORITHMS])) | ||||
assert len(rows) <= 1 | assert len(rows) <= 1 | ||||
if rows: | if rows: | ||||
# TODO: convert _magic_null_pk back to None? | # TODO: convert _magic_null_pk back to None? | ||||
return rows[0] | return rows[0] | ||||
else: | else: | ||||
return None | return None | ||||
########################## | ########################## | ||||
# 'skipped_content_by_*' tables | # 'skipped_content_by_*' tables | ||||
########################## | ########################## | ||||
def skipped_content_index_add_one( | def skipped_content_index_add_one( | ||||
self, main_algo: str, content: Content) -> None: | self, algo: str, content: SkippedContent, token: int) -> None: | ||||
assert content.get_hash(main_algo) is not None | """Adds a row mapping content[algo] to the token of the SkippedContent | ||||
query = ('INSERT INTO skipped_content_by_{algo} ({cols}) ' | in the main 'skipped_content' table.""" | ||||
'VALUES ({values})').format( | query = ( | ||||
algo=main_algo, cols=', '.join(self._content_pk), | f'INSERT INTO skipped_content_by_{algo} ({algo}, target_token) ' | ||||
values=', '.join('%s' for _ in self._content_pk)) | f'VALUES (%s, %s)') | ||||
self._execute_with_retries( | self._execute_with_retries( | ||||
query, [content.get_hash(algo) or self._magic_null_pk | query, [content.get_hash(algo) or self._magic_null_pk, token]) | ||||
for algo in self._content_pk]) | |||||
########################## | ########################## | ||||
# 'revision' table | # 'revision' table | ||||
########################## | ########################## | ||||
_revision_keys = [ | _revision_keys = [ | ||||
'id', 'date', 'committer_date', 'type', 'directory', 'message', | 'id', 'date', 'committer_date', 'type', 'directory', 'message', | ||||
'author', 'committer', | 'author', 'committer', | ||||
▲ Show 20 Lines • Show All 397 Lines • Show Last 20 Lines |