diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -271,21 +271,53 @@ _skipped_content_keys = [ 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'ctime', 'status', 'reason', 'origin'] - _magic_null_pk = b'' + _magic_null_pk = b'' """ - NULLs are not allowed in primary keys; instead use an empty - value + NULLs (or all-empty blobs) are not allowed in primary keys; instead use a + special value that can't possibly be a valid hash. """ + def _skipped_content_add_finalize(self, statement: BoundStatement) -> None: + """Returned currified by skipped_content_add_prepare, to be called + when the content row should be added to the primary table.""" + self._execute_with_retries(statement, None) + self._increment_counter('skipped_content', 1) + @_prepared_insert_statement('skipped_content', _skipped_content_keys) - def skipped_content_add_one(self, content, *, statement) -> None: + def skipped_content_add_prepare( + self, content, *, statement) -> Tuple[int, Callable[[], None]]: + """Prepares insertion of a Content to the main 'skipped_content' table. + Returns a token (to be used in secondary tables), and a function to be + called to perform the insertion in the main table.""" + + # Replace NULLs (which are not allowed in the partition key) with + # an empty byte string content = content.to_dict() for key in self._skipped_content_pk: if content[key] is None: content[key] = self._magic_null_pk - content = SkippedContent.from_dict(content) - self._add_one(statement, 'skipped_content', content, - self._skipped_content_keys) + + statement = statement.bind([ + content.get(key) for key in self._skipped_content_keys]) + + # Type used for hashing keys (usually, it will be + # cassandra.metadata.Murmur3Token) + token_class = self._cluster.metadata.token_map.token_class + + # Token of the row when it will be inserted. This is equivalent to + # "SELECT token({', '.join(self._content_pk)}) + # FROM skipped_content WHERE ..." + # after the row is inserted; but we need the token to insert in the + # index tables *before* inserting to the main 'skipped_content' table + token = token_class.from_key(statement.routing_key).value + assert TOKEN_BEGIN <= token <= TOKEN_END + + # Function to be called after the indexes contain their respective + # row + finalizer = functools.partial( + self._skipped_content_add_finalize, statement) + + return (token, finalizer) @_prepared_statement('SELECT * FROM skipped_content WHERE ' + ' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))) @@ -307,15 +339,14 @@ ########################## def skipped_content_index_add_one( - self, main_algo: str, content: Content) -> None: - assert content.get_hash(main_algo) is not None - query = ('INSERT INTO skipped_content_by_{algo} ({cols}) ' - 'VALUES ({values})').format( - algo=main_algo, cols=', '.join(self._content_pk), - values=', '.join('%s' for _ in self._content_pk)) + self, algo: str, content: SkippedContent, token: int) -> None: + """Adds a row mapping content[algo] to the token of the SkippedContent + in the main 'skipped_content' table.""" + query = ( + f'INSERT INTO skipped_content_by_{algo} ({algo}, target_token) ' + f'VALUES (%s, %s)') self._execute_with_retries( - query, [content.get_hash(algo) or self._magic_null_pk - for algo in self._content_pk]) + query, [content.get_hash(algo) or self._magic_null_pk, token]) ########################## # 'revision' table diff --git a/swh/storage/cassandra/schema.py b/swh/storage/cassandra/schema.py --- a/swh/storage/cassandra/schema.py +++ b/swh/storage/cassandra/schema.py @@ -200,11 +200,9 @@ ); CREATE TABLE IF NOT EXISTS skipped_content_by_{main_algo} ( - sha1 blob, - sha1_git blob, - sha256 blob, - blake2s256 blob, - PRIMARY KEY (({main_algo}), {other_algos}) + {main_algo} blob, + target_token bigint, -- value of token(pk) on the "primary" table + PRIMARY KEY (({main_algo}), target_token) ); ''' diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -251,6 +251,25 @@ def content_get_random(self): return self._cql_runner.content_get_random().sha1_git + def _skipped_content_get_from_hash(self, algo, hash_) -> Iterable: + """From the name of a hash algorithm and a value of that hash, + looks up the "hash -> token" secondary table + (skipped_content_by_{algo}) to get tokens. + Then, looks up the main table (content) to get all contents with + that token, and filters out contents whose hash doesn't match.""" + found_tokens = \ + self._cql_runner.skipped_content_get_tokens_from_single_hash( + algo, hash_) + + for token in found_tokens: + # Query the main table ('content'). + res = self._cql_runner.skipped_content_get_from_token(token) + + for row in res: + # re-check the the hash (in case of murmur3 collision) + if getattr(row, algo) == hash_: + yield row + def _skipped_content_add(self, contents: Iterable[SkippedContent]) -> Dict: # Filter-out content already in the database. contents = [ @@ -260,14 +279,17 @@ self.journal_writer.skipped_content_add(contents) for content in contents: - # Add to index tables + # Compute token of the row in the main table + (token, insertion_finalizer) = \ + self._cql_runner.skipped_content_add_prepare(content) + + # Then add to index tables for algo in HASH_ALGORITHMS: - if content.get_hash(algo) is not None: - self._cql_runner.skipped_content_index_add_one( - algo, content) + self._cql_runner.skipped_content_index_add_one( + algo, content, token) # Then to the main table - self._cql_runner.skipped_content_add_one(content) + insertion_finalizer() return { 'skipped_content:add': len(contents)