diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -373,6 +373,37 @@ else: return None + def content_missing_from_hashes( + self, contents_hashes: List[Dict[str, bytes]] + ) -> Iterator[Dict[str, bytes]]: + for group in grouper(contents_hashes, PARTITION_KEY_RESTRICTION_MAX_SIZE): + group = list(group) + + # Get all contents that share a sha256 with one of the contents in the group + present = set( + self._content_get_hashes_from_sha256( + [content["sha256"] for content in group] + ) + ) + + for content in group: + for algo in HASH_ALGORITHMS: + assert content.get(algo) is not None, ( + "content_missing_from_hashes must not be called with " + "partial hashes." + ) + if tuple(content[algo] for algo in HASH_ALGORITHMS) not in present: + yield content + + @_prepared_statement( + f"SELECT {', '.join(HASH_ALGORITHMS)} FROM content WHERE sha256 IN ?" + ) + def _content_get_hashes_from_sha256( + self, ids: List[bytes], *, statement + ) -> Iterator[Tuple[bytes, bytes, bytes, bytes]]: + for row in self._execute_with_retries(statement, [ids]): + yield tuple(row[algo] for algo in HASH_ALGORITHMS) # type: ignore + @_prepared_select_statement( ContentRow, f"WHERE token({', '.join(ContentRow.PARTITION_KEY)}) = ?" ) diff --git a/swh/storage/cassandra/model.py b/swh/storage/cassandra/model.py --- a/swh/storage/cassandra/model.py +++ b/swh/storage/cassandra/model.py @@ -68,10 +68,10 @@ @dataclasses.dataclass class ContentRow(BaseRow): TABLE = "content" - PARTITION_KEY: ClassVar[Tuple[str, ...]] = ( + PARTITION_KEY: ClassVar[Tuple[str, ...]] = ("sha256",) + CLUSTERING_KEY = ( "sha1", "sha1_git", - "sha256", "blake2s256", ) diff --git a/swh/storage/cassandra/schema.py b/swh/storage/cassandra/schema.py --- a/swh/storage/cassandra/schema.py +++ b/swh/storage/cassandra/schema.py @@ -80,7 +80,7 @@ ctime timestamp, -- creation time, i.e. time of (first) injection into the storage status ascii, - PRIMARY KEY ((sha1, sha1_git, sha256, blake2s256)) + PRIMARY KEY ((sha256), sha1, sha1_git, blake2s256) );""", """ CREATE TABLE IF NOT EXISTS skipped_content ( diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -355,7 +355,23 @@ "key_hash should be one of {','.join(DEFAULT_ALGORITHMS)}" ) + contents_with_all_hashes = [] + contents_with_missing_hashes = [] for content in contents: + if DEFAULT_ALGORITHMS <= set(content): + contents_with_all_hashes.append(content) + else: + contents_with_missing_hashes.append(content) + + # These contents can be queried efficiently directly in the main table + # These contents can be queried efficiently directly in the main table + for content in self._cql_runner.content_missing_from_hashes( + contents_with_all_hashes + ): + yield content[key_hash] + + # For these, we need the expensive index lookups + main table. + for content in contents_with_missing_hashes: res = self.content_find(content) if not res: yield content[key_hash] diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -216,6 +216,13 @@ matches.sort() return matches[0:limit] + def content_missing_from_hashes( + self, contents_hashes: List[Dict[str, bytes]] + ) -> Iterator[Dict[str, bytes]]: + for content_hashes in contents_hashes: + if not self.content_get_from_pk(content_hashes): + yield content_hashes + ########################## # 'content_by_*' tables ##########################