diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -299,7 +299,7 @@ ) def _execute_many_with_retries( self, statement, args_list: List[Tuple] - ) -> Iterable[BaseRow]: + ) -> Iterable[Dict[str, Any]]: for res in execute_concurrent_with_args(self._session, statement, args_list): yield from res.result_or_exc @@ -424,8 +424,11 @@ @_prepared_select_statement( ContentRow, f"WHERE token({', '.join(ContentRow.PARTITION_KEY)}) = ?" ) - def content_get_from_token(self, token, *, statement) -> Iterable[ContentRow]: - return map(ContentRow.from_dict, self._execute_with_retries(statement, [token])) + def content_get_from_tokens(self, tokens, *, statement) -> Iterable[ContentRow]: + return map( + ContentRow.from_dict, + self._execute_many_with_retries(statement, [(token,) for token in tokens]), + ) @_prepared_select_statement( ContentRow, f"WHERE token({', '.join(ContentRow.PARTITION_KEY)}) > ? LIMIT 1" diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -163,19 +163,17 @@ to get tokens. Then, looks up the main table (content) to get all contents with that token, and filters out contents whose hash doesn't match.""" - found_tokens = self._cql_runner.content_get_tokens_from_single_algo( - algo, hashes + found_tokens = list( + self._cql_runner.content_get_tokens_from_single_algo(algo, hashes) ) + assert all(isinstance(token, int) for token in found_tokens) - for token in found_tokens: - assert isinstance(token, int), found_tokens - # Query the main table ('content'). - res = self._cql_runner.content_get_from_token(token) - - for row in res: - # re-check the the hash (in case of murmur3 collision) - if getattr(row, algo) in hashes: - yield row + # Query the main table ('content'). + rows = self._cql_runner.content_get_from_tokens(found_tokens) + for row in rows: + # re-check the the hash (in case of murmur3 collision) + if getattr(row, algo) in hashes: + yield row def _content_add(self, contents: List[Content], with_data: bool) -> Dict[str, int]: # Filter-out content already in the database.