Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7163694
D2866.id10217.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
6 KB
Subscribers
None
D2866.id10217.diff
View Options
diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py
--- a/swh/storage/cassandra/cql.py
+++ b/swh/storage/cassandra/cql.py
@@ -271,21 +271,53 @@
_skipped_content_keys = [
'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length',
'ctime', 'status', 'reason', 'origin']
- _magic_null_pk = b''
+ _magic_null_pk = b'<null>'
"""
- NULLs are not allowed in primary keys; instead use an empty
- value
+ NULLs (or all-empty blobs) are not allowed in primary keys; instead use a
+ special value that can't possibly be a valid hash.
"""
+ def _skipped_content_add_finalize(self, statement: BoundStatement) -> None:
+ """Returned currified by skipped_content_add_prepare, to be called
+ when the content row should be added to the primary table."""
+ self._execute_with_retries(statement, None)
+ self._increment_counter('skipped_content', 1)
+
@_prepared_insert_statement('skipped_content', _skipped_content_keys)
- def skipped_content_add_one(self, content, *, statement) -> None:
+ def skipped_content_add_prepare(
+ self, content, *, statement) -> Tuple[int, Callable[[], None]]:
+ """Prepares insertion of a Content to the main 'skipped_content' table.
+ Returns a token (to be used in secondary tables), and a function to be
+ called to perform the insertion in the main table."""
+
+ # Replace NULLs (which are not allowed in the partition key) with
+ # an empty byte string
content = content.to_dict()
for key in self._skipped_content_pk:
if content[key] is None:
content[key] = self._magic_null_pk
- content = SkippedContent.from_dict(content)
- self._add_one(statement, 'skipped_content', content,
- self._skipped_content_keys)
+
+ statement = statement.bind([
+ content.get(key) for key in self._skipped_content_keys])
+
+ # Type used for hashing keys (usually, it will be
+ # cassandra.metadata.Murmur3Token)
+ token_class = self._cluster.metadata.token_map.token_class
+
+ # Token of the row when it will be inserted. This is equivalent to
+ # "SELECT token({', '.join(self._content_pk)})
+ # FROM skipped_content WHERE ..."
+ # after the row is inserted; but we need the token to insert in the
+ # index tables *before* inserting to the main 'skipped_content' table
+ token = token_class.from_key(statement.routing_key).value
+ assert TOKEN_BEGIN <= token <= TOKEN_END
+
+ # Function to be called after the indexes contain their respective
+ # row
+ finalizer = functools.partial(
+ self._skipped_content_add_finalize, statement)
+
+ return (token, finalizer)
@_prepared_statement('SELECT * FROM skipped_content WHERE ' +
' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS)))
@@ -307,15 +339,14 @@
##########################
def skipped_content_index_add_one(
- self, main_algo: str, content: Content) -> None:
- assert content.get_hash(main_algo) is not None
- query = ('INSERT INTO skipped_content_by_{algo} ({cols}) '
- 'VALUES ({values})').format(
- algo=main_algo, cols=', '.join(self._content_pk),
- values=', '.join('%s' for _ in self._content_pk))
+ self, algo: str, content: SkippedContent, token: int) -> None:
+ """Adds a row mapping content[algo] to the token of the SkippedContent
+ in the main 'skipped_content' table."""
+ query = (
+ f'INSERT INTO skipped_content_by_{algo} ({algo}, target_token) '
+ f'VALUES (%s, %s)')
self._execute_with_retries(
- query, [content.get_hash(algo) or self._magic_null_pk
- for algo in self._content_pk])
+ query, [content.get_hash(algo) or self._magic_null_pk, token])
##########################
# 'revision' table
diff --git a/swh/storage/cassandra/schema.py b/swh/storage/cassandra/schema.py
--- a/swh/storage/cassandra/schema.py
+++ b/swh/storage/cassandra/schema.py
@@ -200,11 +200,9 @@
);
CREATE TABLE IF NOT EXISTS skipped_content_by_{main_algo} (
- sha1 blob,
- sha1_git blob,
- sha256 blob,
- blake2s256 blob,
- PRIMARY KEY (({main_algo}), {other_algos})
+ {main_algo} blob,
+ target_token bigint, -- value of token(pk) on the "primary" table
+ PRIMARY KEY (({main_algo}), target_token)
);
'''
diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py
--- a/swh/storage/cassandra/storage.py
+++ b/swh/storage/cassandra/storage.py
@@ -251,6 +251,25 @@
def content_get_random(self):
return self._cql_runner.content_get_random().sha1_git
+ def _skipped_content_get_from_hash(self, algo, hash_) -> Iterable:
+ """From the name of a hash algorithm and a value of that hash,
+ looks up the "hash -> token" secondary table
+ (skipped_content_by_{algo}) to get tokens.
+ Then, looks up the main table (content) to get all contents with
+ that token, and filters out contents whose hash doesn't match."""
+ found_tokens = \
+ self._cql_runner.skipped_content_get_tokens_from_single_hash(
+ algo, hash_)
+
+ for token in found_tokens:
+ # Query the main table ('content').
+ res = self._cql_runner.skipped_content_get_from_token(token)
+
+ for row in res:
+ # re-check the the hash (in case of murmur3 collision)
+ if getattr(row, algo) == hash_:
+ yield row
+
def _skipped_content_add(self, contents: Iterable[SkippedContent]) -> Dict:
# Filter-out content already in the database.
contents = [
@@ -260,14 +279,17 @@
self.journal_writer.skipped_content_add(contents)
for content in contents:
- # Add to index tables
+ # Compute token of the row in the main table
+ (token, insertion_finalizer) = \
+ self._cql_runner.skipped_content_add_prepare(content)
+
+ # Then add to index tables
for algo in HASH_ALGORITHMS:
- if content.get_hash(algo) is not None:
- self._cql_runner.skipped_content_index_add_one(
- algo, content)
+ self._cql_runner.skipped_content_index_add_one(
+ algo, content, token)
# Then to the main table
- self._cql_runner.skipped_content_add_one(content)
+ insertion_finalizer()
return {
'skipped_content:add': len(contents)
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jan 30, 1:25 PM (9 h, 4 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3219521
Attached To
D2866: Store the value of token(partition_key) in skipped_content_by_* table, instead of three hashes.
Event Timeline
Log In to Comment