Page MenuHomeSoftware Heritage

D6423.diff
No OneTemporary

D6423.diff

diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py
--- a/swh/storage/cassandra/cql.py
+++ b/swh/storage/cassandra/cql.py
@@ -28,6 +28,7 @@
from cassandra import ConsistencyLevel, CoordinationFailure
from cassandra.cluster import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, ResultSet
from cassandra.concurrent import execute_concurrent_with_args
+from cassandra.metadata import group_keys_by_replica
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy
from cassandra.query import BoundStatement, PreparedStatement, dict_factory
from mypy_extensions import NamedArg
@@ -89,6 +90,12 @@
"""
BATCH_INSERT_MAX_SIZE = 1000
+SELECT_MISSING_ALGOS = [
+ "concurrent",
+ "grouped-naive",
+ "grouped-pk-serial",
+ "grouped-pk-concurrent",
+]
logger = logging.getLogger(__name__)
@@ -256,8 +263,14 @@
to Cassandra."""
def __init__(
- self, hosts: List[str], keyspace: str, port: int, consistency_level: str
+ self,
+ hosts: List[str],
+ keyspace: str,
+ port: int,
+ consistency_level: str,
+ select_missing_algo: str,
):
+ self._keyspace = keyspace
self._cluster = Cluster(
hosts,
port=port,
@@ -277,6 +290,7 @@
self._prepared_statements: Dict[
str, Union[PreparedStatement, Dict[Any, PreparedStatement]]
] = {}
+ self._select_missing_algo = select_missing_algo
##########################
# Common utility functions
@@ -298,7 +312,7 @@
retry=retry_if_exception_type(CoordinationFailure),
)
def _execute_many_with_retries(
- self, statement, args_list: List[Tuple]
+ self, statement, args_list: Sequence[Tuple]
) -> Iterable[Dict[str, Any]]:
for res in execute_concurrent_with_args(self._session, statement, args_list):
yield from res.result_or_exc
@@ -331,11 +345,46 @@
else:
return None
- def _missing(self, statement, ids):
+ def _missing(self, statement: PreparedStatement, ids):
found_ids = set()
- for id_group in grouper(ids, PARTITION_KEY_RESTRICTION_MAX_SIZE):
- rows = self._execute_with_retries(statement, [list(id_group)])
- found_ids.update(row["id"] for row in rows)
+
+ if not ids:
+ return []
+
+ if self._select_missing_algo == "concurrent":
+ # One statement per id
+ for row in self._execute_many_with_retries(
+ statement, [([id_],) for id_ in ids]
+ ):
+ found_ids.add(row["id"])
+ elif self._select_missing_algo == "grouped-naive":
+ # Grouped in the order they were given
+ for id_group in grouper(ids, PARTITION_KEY_RESTRICTION_MAX_SIZE):
+ rows = self._execute_with_retries(statement, [list(id_group)])
+ found_ids.update(row["id"] for row in rows)
+ else:
+ # Grouped smartly, so each query only fetches data from a single server
+ (first_col, *_) = statement.column_metadata
+ table = first_col.table_name
+
+ groups = group_keys_by_replica(self._session, self._keyspace, table, ids)
+ subgroups = [
+ (list(subgroup),)
+ for (host, group) in groups.items()
+ for subgroup in grouper(group, PARTITION_KEY_RESTRICTION_MAX_SIZE)
+ ]
+
+ if self._select_missing_algo == "grouped-pk-serial":
+ # Send queries for each subgroup, one-by-one
+ for subgroup in subgroups:
+ rows = self._execute_with_retries(statement, subgroup)
+ found_ids.update(row["id"] for row in rows)
+ elif self._select_missing_algo == "grouped-pk-concurrent":
+ # Same as above, but we send all queries in parallel
+ for row in self._execute_many_with_retries(statement, subgroups):
+ found_ids.add(row["id"])
+ else:
+ assert False, self._select_missing_algo
return [id_ for id_ in ids if id_ not in found_ids]
##########################
diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py
--- a/swh/storage/cassandra/storage.py
+++ b/swh/storage/cassandra/storage.py
@@ -66,7 +66,7 @@
from ..exc import HashCollision, StorageArgumentException
from ..utils import remove_keys
from .common import TOKEN_BEGIN, TOKEN_END, hash_url
-from .cql import CqlRunner
+from .cql import SELECT_MISSING_ALGOS, CqlRunner
from .model import (
ContentRow,
DirectoryEntryRow,
@@ -104,6 +104,7 @@
allow_overwrite=False,
consistency_level="ONE",
directory_entries_insert_algo="one-by-one",
+ select_missing_algo="grouped-naive",
):
"""
A backend of swh-storage backed by Cassandra
@@ -128,14 +129,18 @@
* one-by-one: naive, one INSERT per directory entry, serialized
* concurrent: one INSERT per directory entry, concurrent
* batch: using UNLOGGED BATCH to insert many entries in a few statements
+ select_missing_algo: Must be one of:
+ * concurrent: one SELECT per key, concurrent
+ * grouped-naive: group keys, run SELECT on each server, serially
+ * grouped-pk-serial: group keys per server, run SELECT on each server,
+ server-by-server
+ * grouped-pk-concurrent: same as before, but send all server queries
+ in parallel
"""
self._hosts = hosts
self._keyspace = keyspace
self._port = port
self._consistency_level = consistency_level
- self._set_cql_runner()
- self.journal_writer: JournalWriter = JournalWriter(journal_writer)
- self.objstorage: ObjStorage = ObjStorage(objstorage)
self._allow_overwrite = allow_overwrite
if directory_entries_insert_algo not in DIRECTORY_ENTRIES_INSERT_ALGOS:
@@ -144,11 +149,26 @@
f"{', '.join(DIRECTORY_ENTRIES_INSERT_ALGOS)}"
)
self._directory_entries_insert_algo = directory_entries_insert_algo
+ if select_missing_algo not in SELECT_MISSING_ALGOS:
+ raise ValueError(
+ f"Configuration error: select_missing_algo has unknown value: "
+ f"{self._select_missing_algo}, expected one of: "
+ f"{', '.join(SELECT_MISSING_ALGOS)}"
+ )
+ self._select_missing_algo = select_missing_algo
+
+ self._set_cql_runner()
+ self.journal_writer: JournalWriter = JournalWriter(journal_writer)
+ self.objstorage: ObjStorage = ObjStorage(objstorage)
def _set_cql_runner(self):
"""Used by tests when they need to reset the CqlRunner"""
self._cql_runner: CqlRunner = CqlRunner(
- self._hosts, self._keyspace, self._port, self._consistency_level
+ self._hosts,
+ self._keyspace,
+ self._port,
+ self._consistency_level,
+ select_missing_algo=self._select_missing_algo,
)
@timed
diff --git a/swh/storage/tests/test_cassandra.py b/swh/storage/tests/test_cassandra.py
--- a/swh/storage/tests/test_cassandra.py
+++ b/swh/storage/tests/test_cassandra.py
@@ -22,7 +22,7 @@
from swh.model.model import Directory, DirectoryEntry, Snapshot, SnapshotBranch
from swh.storage import get_storage
from swh.storage.cassandra import create_keyspace
-from swh.storage.cassandra.cql import BATCH_INSERT_MAX_SIZE
+from swh.storage.cassandra.cql import BATCH_INSERT_MAX_SIZE, SELECT_MISSING_ALGOS
from swh.storage.cassandra.model import ContentRow, ExtIDRow
from swh.storage.cassandra.schema import HASH_ALGORITHMS, TABLES
from swh.storage.cassandra.storage import DIRECTORY_ENTRIES_INSERT_ALGOS
@@ -216,8 +216,8 @@
# below
-@pytest.fixture
-def swh_storage_backend_config(cassandra_cluster, keyspace):
+@pytest.fixture(params=SELECT_MISSING_ALGOS)
+def swh_storage_backend_config(cassandra_cluster, keyspace, request):
(hosts, port) = cassandra_cluster
storage_config = dict(
@@ -227,6 +227,7 @@
keyspace=keyspace,
journal_writer={"cls": "memory"},
objstorage={"cls": "memory"},
+ select_missing_algo=request.param,
)
yield storage_config

File Metadata

Mime Type
text/plain
Expires
Jul 3 2025, 8:12 AM (10 w, 4 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3217656

Event Timeline