Page MenuHomeSoftware Heritage

D2866.id10217.diff
No OneTemporary

D2866.id10217.diff

diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py
--- a/swh/storage/cassandra/cql.py
+++ b/swh/storage/cassandra/cql.py
@@ -271,21 +271,53 @@
_skipped_content_keys = [
'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length',
'ctime', 'status', 'reason', 'origin']
- _magic_null_pk = b''
+ _magic_null_pk = b'<null>'
"""
- NULLs are not allowed in primary keys; instead use an empty
- value
+ NULLs (or all-empty blobs) are not allowed in primary keys; instead use a
+ special value that can't possibly be a valid hash.
"""
+ def _skipped_content_add_finalize(self, statement: BoundStatement) -> None:
+ """Returned currified by skipped_content_add_prepare, to be called
+ when the content row should be added to the primary table."""
+ self._execute_with_retries(statement, None)
+ self._increment_counter('skipped_content', 1)
+
@_prepared_insert_statement('skipped_content', _skipped_content_keys)
- def skipped_content_add_one(self, content, *, statement) -> None:
+ def skipped_content_add_prepare(
+ self, content, *, statement) -> Tuple[int, Callable[[], None]]:
+ """Prepares insertion of a Content to the main 'skipped_content' table.
+ Returns a token (to be used in secondary tables), and a function to be
+ called to perform the insertion in the main table."""
+
+ # Replace NULLs (which are not allowed in the partition key) with
+ # an empty byte string
content = content.to_dict()
for key in self._skipped_content_pk:
if content[key] is None:
content[key] = self._magic_null_pk
- content = SkippedContent.from_dict(content)
- self._add_one(statement, 'skipped_content', content,
- self._skipped_content_keys)
+
+ statement = statement.bind([
+ content.get(key) for key in self._skipped_content_keys])
+
+ # Type used for hashing keys (usually, it will be
+ # cassandra.metadata.Murmur3Token)
+ token_class = self._cluster.metadata.token_map.token_class
+
+ # Token of the row when it will be inserted. This is equivalent to
+ # "SELECT token({', '.join(self._content_pk)})
+ # FROM skipped_content WHERE ..."
+ # after the row is inserted; but we need the token to insert in the
+ # index tables *before* inserting to the main 'skipped_content' table
+ token = token_class.from_key(statement.routing_key).value
+ assert TOKEN_BEGIN <= token <= TOKEN_END
+
+ # Function to be called after the indexes contain their respective
+ # row
+ finalizer = functools.partial(
+ self._skipped_content_add_finalize, statement)
+
+ return (token, finalizer)
@_prepared_statement('SELECT * FROM skipped_content WHERE ' +
' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS)))
@@ -307,15 +339,14 @@
##########################
def skipped_content_index_add_one(
- self, main_algo: str, content: Content) -> None:
- assert content.get_hash(main_algo) is not None
- query = ('INSERT INTO skipped_content_by_{algo} ({cols}) '
- 'VALUES ({values})').format(
- algo=main_algo, cols=', '.join(self._content_pk),
- values=', '.join('%s' for _ in self._content_pk))
+ self, algo: str, content: SkippedContent, token: int) -> None:
+ """Adds a row mapping content[algo] to the token of the SkippedContent
+ in the main 'skipped_content' table."""
+ query = (
+ f'INSERT INTO skipped_content_by_{algo} ({algo}, target_token) '
+ f'VALUES (%s, %s)')
self._execute_with_retries(
- query, [content.get_hash(algo) or self._magic_null_pk
- for algo in self._content_pk])
+ query, [content.get_hash(algo) or self._magic_null_pk, token])
##########################
# 'revision' table
diff --git a/swh/storage/cassandra/schema.py b/swh/storage/cassandra/schema.py
--- a/swh/storage/cassandra/schema.py
+++ b/swh/storage/cassandra/schema.py
@@ -200,11 +200,9 @@
);
CREATE TABLE IF NOT EXISTS skipped_content_by_{main_algo} (
- sha1 blob,
- sha1_git blob,
- sha256 blob,
- blake2s256 blob,
- PRIMARY KEY (({main_algo}), {other_algos})
+ {main_algo} blob,
+ target_token bigint, -- value of token(pk) on the "primary" table
+ PRIMARY KEY (({main_algo}), target_token)
);
'''
diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py
--- a/swh/storage/cassandra/storage.py
+++ b/swh/storage/cassandra/storage.py
@@ -251,6 +251,25 @@
def content_get_random(self):
return self._cql_runner.content_get_random().sha1_git
+ def _skipped_content_get_from_hash(self, algo, hash_) -> Iterable:
+ """From the name of a hash algorithm and a value of that hash,
+ looks up the "hash -> token" secondary table
+ (skipped_content_by_{algo}) to get tokens.
+ Then, looks up the main table (content) to get all contents with
+ that token, and filters out contents whose hash doesn't match."""
+ found_tokens = \
+ self._cql_runner.skipped_content_get_tokens_from_single_hash(
+ algo, hash_)
+
+ for token in found_tokens:
+ # Query the main table ('content').
+ res = self._cql_runner.skipped_content_get_from_token(token)
+
+ for row in res:
+ # re-check the the hash (in case of murmur3 collision)
+ if getattr(row, algo) == hash_:
+ yield row
+
def _skipped_content_add(self, contents: Iterable[SkippedContent]) -> Dict:
# Filter-out content already in the database.
contents = [
@@ -260,14 +279,17 @@
self.journal_writer.skipped_content_add(contents)
for content in contents:
- # Add to index tables
+ # Compute token of the row in the main table
+ (token, insertion_finalizer) = \
+ self._cql_runner.skipped_content_add_prepare(content)
+
+ # Then add to index tables
for algo in HASH_ALGORITHMS:
- if content.get_hash(algo) is not None:
- self._cql_runner.skipped_content_index_add_one(
- algo, content)
+ self._cql_runner.skipped_content_index_add_one(
+ algo, content, token)
# Then to the main table
- self._cql_runner.skipped_content_add_one(content)
+ insertion_finalizer()
return {
'skipped_content:add': len(contents)

File Metadata

Mime Type
text/plain
Expires
Thu, Jan 30, 1:25 PM (9 h, 4 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3219521

Event Timeline