Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9337657
D6423.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
8 KB
Subscribers
None
D6423.diff
View Options
diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py
--- a/swh/storage/cassandra/cql.py
+++ b/swh/storage/cassandra/cql.py
@@ -28,6 +28,7 @@
from cassandra import ConsistencyLevel, CoordinationFailure
from cassandra.cluster import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, ResultSet
from cassandra.concurrent import execute_concurrent_with_args
+from cassandra.metadata import group_keys_by_replica
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy
from cassandra.query import BoundStatement, PreparedStatement, dict_factory
from mypy_extensions import NamedArg
@@ -89,6 +90,12 @@
"""
BATCH_INSERT_MAX_SIZE = 1000
+SELECT_MISSING_ALGOS = [
+ "concurrent",
+ "grouped-naive",
+ "grouped-pk-serial",
+ "grouped-pk-concurrent",
+]
logger = logging.getLogger(__name__)
@@ -256,8 +263,14 @@
to Cassandra."""
def __init__(
- self, hosts: List[str], keyspace: str, port: int, consistency_level: str
+ self,
+ hosts: List[str],
+ keyspace: str,
+ port: int,
+ consistency_level: str,
+ select_missing_algo: str,
):
+ self._keyspace = keyspace
self._cluster = Cluster(
hosts,
port=port,
@@ -277,6 +290,7 @@
self._prepared_statements: Dict[
str, Union[PreparedStatement, Dict[Any, PreparedStatement]]
] = {}
+ self._select_missing_algo = select_missing_algo
##########################
# Common utility functions
@@ -298,7 +312,7 @@
retry=retry_if_exception_type(CoordinationFailure),
)
def _execute_many_with_retries(
- self, statement, args_list: List[Tuple]
+ self, statement, args_list: Sequence[Tuple]
) -> Iterable[Dict[str, Any]]:
for res in execute_concurrent_with_args(self._session, statement, args_list):
yield from res.result_or_exc
@@ -331,11 +345,46 @@
else:
return None
- def _missing(self, statement, ids):
+ def _missing(self, statement: PreparedStatement, ids):
found_ids = set()
- for id_group in grouper(ids, PARTITION_KEY_RESTRICTION_MAX_SIZE):
- rows = self._execute_with_retries(statement, [list(id_group)])
- found_ids.update(row["id"] for row in rows)
+
+ if not ids:
+ return []
+
+ if self._select_missing_algo == "concurrent":
+ # One statement per id
+ for row in self._execute_many_with_retries(
+ statement, [([id_],) for id_ in ids]
+ ):
+ found_ids.add(row["id"])
+ elif self._select_missing_algo == "grouped-naive":
+ # Grouped in the order they were given
+ for id_group in grouper(ids, PARTITION_KEY_RESTRICTION_MAX_SIZE):
+ rows = self._execute_with_retries(statement, [list(id_group)])
+ found_ids.update(row["id"] for row in rows)
+ else:
+ # Grouped smartly, so each query only fetches data from a single server
+ (first_col, *_) = statement.column_metadata
+ table = first_col.table_name
+
+ groups = group_keys_by_replica(self._session, self._keyspace, table, ids)
+ subgroups = [
+ (list(subgroup),)
+ for (host, group) in groups.items()
+ for subgroup in grouper(group, PARTITION_KEY_RESTRICTION_MAX_SIZE)
+ ]
+
+ if self._select_missing_algo == "grouped-pk-serial":
+ # Send queries for each subgroup, one-by-one
+ for subgroup in subgroups:
+ rows = self._execute_with_retries(statement, subgroup)
+ found_ids.update(row["id"] for row in rows)
+ elif self._select_missing_algo == "grouped-pk-concurrent":
+ # Same as above, but we send all queries in parallel
+ for row in self._execute_many_with_retries(statement, subgroups):
+ found_ids.add(row["id"])
+ else:
+ assert False, self._select_missing_algo
return [id_ for id_ in ids if id_ not in found_ids]
##########################
diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py
--- a/swh/storage/cassandra/storage.py
+++ b/swh/storage/cassandra/storage.py
@@ -66,7 +66,7 @@
from ..exc import HashCollision, StorageArgumentException
from ..utils import remove_keys
from .common import TOKEN_BEGIN, TOKEN_END, hash_url
-from .cql import CqlRunner
+from .cql import SELECT_MISSING_ALGOS, CqlRunner
from .model import (
ContentRow,
DirectoryEntryRow,
@@ -104,6 +104,7 @@
allow_overwrite=False,
consistency_level="ONE",
directory_entries_insert_algo="one-by-one",
+ select_missing_algo="grouped-naive",
):
"""
A backend of swh-storage backed by Cassandra
@@ -128,14 +129,18 @@
* one-by-one: naive, one INSERT per directory entry, serialized
* concurrent: one INSERT per directory entry, concurrent
* batch: using UNLOGGED BATCH to insert many entries in a few statements
+ select_missing_algo: Must be one of:
+ * concurrent: one SELECT per key, concurrent
+ * grouped-naive: group keys, run SELECT on each server, serially
+ * grouped-pk-serial: group keys per server, run SELECT on each server,
+ server-by-server
+ * grouped-pk-concurrent: same as before, but send all server queries
+ in parallel
"""
self._hosts = hosts
self._keyspace = keyspace
self._port = port
self._consistency_level = consistency_level
- self._set_cql_runner()
- self.journal_writer: JournalWriter = JournalWriter(journal_writer)
- self.objstorage: ObjStorage = ObjStorage(objstorage)
self._allow_overwrite = allow_overwrite
if directory_entries_insert_algo not in DIRECTORY_ENTRIES_INSERT_ALGOS:
@@ -144,11 +149,26 @@
f"{', '.join(DIRECTORY_ENTRIES_INSERT_ALGOS)}"
)
self._directory_entries_insert_algo = directory_entries_insert_algo
+ if select_missing_algo not in SELECT_MISSING_ALGOS:
+ raise ValueError(
+ f"Configuration error: select_missing_algo has unknown value: "
+ f"{self._select_missing_algo}, expected one of: "
+ f"{', '.join(SELECT_MISSING_ALGOS)}"
+ )
+ self._select_missing_algo = select_missing_algo
+
+ self._set_cql_runner()
+ self.journal_writer: JournalWriter = JournalWriter(journal_writer)
+ self.objstorage: ObjStorage = ObjStorage(objstorage)
def _set_cql_runner(self):
"""Used by tests when they need to reset the CqlRunner"""
self._cql_runner: CqlRunner = CqlRunner(
- self._hosts, self._keyspace, self._port, self._consistency_level
+ self._hosts,
+ self._keyspace,
+ self._port,
+ self._consistency_level,
+ select_missing_algo=self._select_missing_algo,
)
@timed
diff --git a/swh/storage/tests/test_cassandra.py b/swh/storage/tests/test_cassandra.py
--- a/swh/storage/tests/test_cassandra.py
+++ b/swh/storage/tests/test_cassandra.py
@@ -22,7 +22,7 @@
from swh.model.model import Directory, DirectoryEntry, Snapshot, SnapshotBranch
from swh.storage import get_storage
from swh.storage.cassandra import create_keyspace
-from swh.storage.cassandra.cql import BATCH_INSERT_MAX_SIZE
+from swh.storage.cassandra.cql import BATCH_INSERT_MAX_SIZE, SELECT_MISSING_ALGOS
from swh.storage.cassandra.model import ContentRow, ExtIDRow
from swh.storage.cassandra.schema import HASH_ALGORITHMS, TABLES
from swh.storage.cassandra.storage import DIRECTORY_ENTRIES_INSERT_ALGOS
@@ -216,8 +216,8 @@
# below
-@pytest.fixture
-def swh_storage_backend_config(cassandra_cluster, keyspace):
+@pytest.fixture(params=SELECT_MISSING_ALGOS)
+def swh_storage_backend_config(cassandra_cluster, keyspace, request):
(hosts, port) = cassandra_cluster
storage_config = dict(
@@ -227,6 +227,7 @@
keyspace=keyspace,
journal_writer={"cls": "memory"},
objstorage={"cls": "memory"},
+ select_missing_algo=request.param,
)
yield storage_config
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Jul 3 2025, 8:12 AM (10 w, 4 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3217656
Attached To
D6423: cassandra: Add alternative algorithms to list missing objects
Event Timeline
Log In to Comment