diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -23,7 +23,7 @@ Union, ) -from cassandra import CoordinationFailure +from cassandra import ConsistencyLevel, CoordinationFailure from cassandra.cluster import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, ResultSet from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy from cassandra.query import BoundStatement, PreparedStatement, dict_factory @@ -77,12 +77,23 @@ logger = logging.getLogger(__name__) -_execution_profiles = { - EXEC_PROFILE_DEFAULT: ExecutionProfile( - load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy()), - row_factory=dict_factory, - ), -} +def get_execution_profiles( + consistency_level: str = "ONE", +) -> Dict[object, ExecutionProfile]: + if consistency_level not in ConsistencyLevel.name_to_value: + raise ValueError( + "Configuration error: Unknown consistency level '%s'" % consistency_level + ) + + return { + EXEC_PROFILE_DEFAULT: ExecutionProfile( + load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy()), + row_factory=dict_factory, + consistency_level=ConsistencyLevel.name_to_value[consistency_level], + ) + } + + # Configuration for cassandra-driver's access to servers: # * hit the right server directly when sending a query (TokenAwarePolicy), # * if there's more than one, then pick one at random that's in the same @@ -92,7 +103,7 @@ def create_keyspace( hosts: List[str], keyspace: str, port: int = 9042, *, durable_writes=True ): - cluster = Cluster(hosts, port=port, execution_profiles=_execution_profiles) + cluster = Cluster(hosts, port=port, execution_profiles=get_execution_profiles()) session = cluster.connect() extra_params = "" if not durable_writes: @@ -223,9 +234,13 @@ """Class managing prepared statements and building queries to be sent to Cassandra.""" - def __init__(self, hosts: List[str], keyspace: str, port: int): + def __init__( + self, hosts: List[str], keyspace: str, port: int, consistency_level: str + ): self._cluster = Cluster( - hosts, port=port, execution_profiles=_execution_profiles + hosts, + port=port, + execution_profiles=get_execution_profiles(consistency_level), ) self._session = self._cluster.connect(keyspace) self._cluster.register_user_type( diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -99,6 +99,7 @@ port=9042, journal_writer=None, allow_overwrite=False, + consistency_level="ONE", ): """ A backend of swh-storage backed by Cassandra @@ -118,10 +119,12 @@ or when the database is known to be mostly empty. Note that a ``False`` value does not guarantee there won't be any overwrite. + consistency_level: The default read/write consistency to use """ self._hosts = hosts self._keyspace = keyspace self._port = port + self._consistency_level = consistency_level self._set_cql_runner() self.journal_writer: JournalWriter = JournalWriter(journal_writer) self.objstorage: ObjStorage = ObjStorage(objstorage) @@ -129,7 +132,9 @@ def _set_cql_runner(self): """Used by tests when they need to reset the CqlRunner""" - self._cql_runner: CqlRunner = CqlRunner(self._hosts, self._keyspace, self._port) + self._cql_runner: CqlRunner = CqlRunner( + self._hosts, self._keyspace, self._port, self._consistency_level + ) def check_config(self, *, check_write: bool) -> bool: self._cql_runner.check_read() diff --git a/swh/storage/tests/test_cassandra.py b/swh/storage/tests/test_cassandra.py --- a/swh/storage/tests/test_cassandra.py +++ b/swh/storage/tests/test_cassandra.py @@ -14,6 +14,7 @@ from typing import Any, Dict import attr +from cassandra.cluster import NoHostAvailable import pytest from swh.core.api.classes import stream_results @@ -237,6 +238,30 @@ @pytest.mark.cassandra class TestCassandraStorage(_TestStorage): + def test_config_wrong_consistency_should_raise(self): + storage_config = dict( + cls="cassandra", + hosts=["first"], + port=9999, + keyspace="any", + consistency_level="fake", + journal_writer={"cls": "memory"}, + objstorage={"cls": "memory"}, + ) + + with pytest.raises(ValueError): + get_storage(**storage_config) + + def test_config_consistency_used(self, swh_storage_backend_config): + config_with_consistency = dict( + swh_storage_backend_config, **{"consistency_level": "THREE"} + ) + + storage = get_storage(**config_with_consistency) + + with pytest.raises(NoHostAvailable): + storage.content_get_random() + def test_content_add_murmur3_collision(self, swh_storage, mocker, sample_data): """The Murmur3 token is used as link from index tables to the main table; and non-matching contents with colliding murmur3-hash