diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -28,6 +28,7 @@ from cassandra import ConsistencyLevel, CoordinationFailure from cassandra.cluster import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, ResultSet from cassandra.concurrent import execute_concurrent_with_args +from cassandra.metadata import group_keys_by_replica from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy from cassandra.query import BoundStatement, PreparedStatement, dict_factory from mypy_extensions import NamedArg @@ -89,6 +90,12 @@ """ BATCH_INSERT_MAX_SIZE = 1000 +SELECT_MISSING_ALGOS = [ + "concurrent", + "grouped-naive", + "grouped-pk-serial", + "grouped-pk-concurrent", +] logger = logging.getLogger(__name__) @@ -256,8 +263,14 @@ to Cassandra.""" def __init__( - self, hosts: List[str], keyspace: str, port: int, consistency_level: str + self, + hosts: List[str], + keyspace: str, + port: int, + consistency_level: str, + select_missing_algo: str, ): + self._keyspace = keyspace self._cluster = Cluster( hosts, port=port, @@ -277,6 +290,7 @@ self._prepared_statements: Dict[ str, Union[PreparedStatement, Dict[Any, PreparedStatement]] ] = {} + self._select_missing_algo = select_missing_algo ########################## # Common utility functions @@ -298,7 +312,7 @@ retry=retry_if_exception_type(CoordinationFailure), ) def _execute_many_with_retries( - self, statement, args_list: List[Tuple] + self, statement, args_list: Sequence[Tuple] ) -> Iterable[Dict[str, Any]]: for res in execute_concurrent_with_args(self._session, statement, args_list): yield from res.result_or_exc @@ -331,11 +345,46 @@ else: return None - def _missing(self, statement, ids): + def _missing(self, statement: PreparedStatement, ids): found_ids = set() - for id_group in grouper(ids, PARTITION_KEY_RESTRICTION_MAX_SIZE): - rows = self._execute_with_retries(statement, [list(id_group)]) - found_ids.update(row["id"] for row in rows) + + if not ids: + return [] + + if self._select_missing_algo == "concurrent": + # One statement per id + for row in self._execute_many_with_retries( + statement, [([id_],) for id_ in ids] + ): + found_ids.add(row["id"]) + elif self._select_missing_algo == "grouped-naive": + # Grouped in the order they were given + for id_group in grouper(ids, PARTITION_KEY_RESTRICTION_MAX_SIZE): + rows = self._execute_with_retries(statement, [list(id_group)]) + found_ids.update(row["id"] for row in rows) + else: + # Grouped smartly, so each query only fetches data from a single server + (first_col, *_) = statement.column_metadata + table = first_col.table_name + + groups = group_keys_by_replica(self._session, self._keyspace, table, ids) + subgroups = [ + (list(subgroup),) + for (host, group) in groups.items() + for subgroup in grouper(group, PARTITION_KEY_RESTRICTION_MAX_SIZE) + ] + + if self._select_missing_algo == "grouped-pk-serial": + # Send queries for each subgroup, one-by-one + for subgroup in subgroups: + rows = self._execute_with_retries(statement, subgroup) + found_ids.update(row["id"] for row in rows) + elif self._select_missing_algo == "grouped-pk-concurrent": + # Same as above, but we send all queries in parallel + for row in self._execute_many_with_retries(statement, subgroups): + found_ids.add(row["id"]) + else: + assert False, self._select_missing_algo return [id_ for id_ in ids if id_ not in found_ids] ########################## diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -66,7 +66,7 @@ from ..exc import HashCollision, StorageArgumentException from ..utils import remove_keys from .common import TOKEN_BEGIN, TOKEN_END, hash_url -from .cql import CqlRunner +from .cql import SELECT_MISSING_ALGOS, CqlRunner from .model import ( ContentRow, DirectoryEntryRow, @@ -104,6 +104,7 @@ allow_overwrite=False, consistency_level="ONE", directory_entries_insert_algo="one-by-one", + select_missing_algo="grouped-naive", ): """ A backend of swh-storage backed by Cassandra @@ -128,14 +129,18 @@ * one-by-one: naive, one INSERT per directory entry, serialized * concurrent: one INSERT per directory entry, concurrent * batch: using UNLOGGED BATCH to insert many entries in a few statements + select_missing_algo: Must be one of: + * concurrent: one SELECT per key, concurrent + * grouped-naive: group keys, run SELECT on each server, serially + * grouped-pk-serial: group keys per server, run SELECT on each server, + server-by-server + * grouped-pk-concurrent: same as before, but send all server queries + in parallel """ self._hosts = hosts self._keyspace = keyspace self._port = port self._consistency_level = consistency_level - self._set_cql_runner() - self.journal_writer: JournalWriter = JournalWriter(journal_writer) - self.objstorage: ObjStorage = ObjStorage(objstorage) self._allow_overwrite = allow_overwrite if directory_entries_insert_algo not in DIRECTORY_ENTRIES_INSERT_ALGOS: @@ -144,11 +149,26 @@ f"{', '.join(DIRECTORY_ENTRIES_INSERT_ALGOS)}" ) self._directory_entries_insert_algo = directory_entries_insert_algo + if select_missing_algo not in SELECT_MISSING_ALGOS: + raise ValueError( + f"Configuration error: select_missing_algo has unknown value: " + f"{self._select_missing_algo}, expected one of: " + f"{', '.join(SELECT_MISSING_ALGOS)}" + ) + self._select_missing_algo = select_missing_algo + + self._set_cql_runner() + self.journal_writer: JournalWriter = JournalWriter(journal_writer) + self.objstorage: ObjStorage = ObjStorage(objstorage) def _set_cql_runner(self): """Used by tests when they need to reset the CqlRunner""" self._cql_runner: CqlRunner = CqlRunner( - self._hosts, self._keyspace, self._port, self._consistency_level + self._hosts, + self._keyspace, + self._port, + self._consistency_level, + select_missing_algo=self._select_missing_algo, ) @timed diff --git a/swh/storage/tests/test_cassandra.py b/swh/storage/tests/test_cassandra.py --- a/swh/storage/tests/test_cassandra.py +++ b/swh/storage/tests/test_cassandra.py @@ -22,7 +22,7 @@ from swh.model.model import Directory, DirectoryEntry, Snapshot, SnapshotBranch from swh.storage import get_storage from swh.storage.cassandra import create_keyspace -from swh.storage.cassandra.cql import BATCH_INSERT_MAX_SIZE +from swh.storage.cassandra.cql import BATCH_INSERT_MAX_SIZE, SELECT_MISSING_ALGOS from swh.storage.cassandra.model import ContentRow, ExtIDRow from swh.storage.cassandra.schema import HASH_ALGORITHMS, TABLES from swh.storage.cassandra.storage import DIRECTORY_ENTRIES_INSERT_ALGOS @@ -216,8 +216,8 @@ # below -@pytest.fixture -def swh_storage_backend_config(cassandra_cluster, keyspace): +@pytest.fixture(params=SELECT_MISSING_ALGOS) +def swh_storage_backend_config(cassandra_cluster, keyspace, request): (hosts, port) = cassandra_cluster storage_config = dict( @@ -227,6 +227,7 @@ keyspace=keyspace, journal_writer={"cls": "memory"}, objstorage={"cls": "memory"}, + select_missing_algo=request.param, ) yield storage_config