Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/cql.py
Show All 17 Lines | from typing import ( | ||||
List, | List, | ||||
Optional, | Optional, | ||||
Tuple, | Tuple, | ||||
Type, | Type, | ||||
TypeVar, | TypeVar, | ||||
Union, | Union, | ||||
) | ) | ||||
from cassandra import CoordinationFailure | from cassandra import ConsistencyLevel, CoordinationFailure | ||||
from cassandra.cluster import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, ResultSet | from cassandra.cluster import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, ResultSet | ||||
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy | from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy | ||||
from cassandra.query import BoundStatement, PreparedStatement, dict_factory | from cassandra.query import BoundStatement, PreparedStatement, dict_factory | ||||
from mypy_extensions import NamedArg | from mypy_extensions import NamedArg | ||||
from tenacity import ( | from tenacity import ( | ||||
retry, | retry, | ||||
retry_if_exception_type, | retry_if_exception_type, | ||||
stop_after_attempt, | stop_after_attempt, | ||||
Show All 37 Lines | from .model import ( | ||||
SnapshotRow, | SnapshotRow, | ||||
content_index_table_name, | content_index_table_name, | ||||
) | ) | ||||
from .schema import CREATE_TABLES_QUERIES, HASH_ALGORITHMS | from .schema import CREATE_TABLES_QUERIES, HASH_ALGORITHMS | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
_execution_profiles = { | def get_execution_profiles( | ||||
consistency_level: str = "ONE", | |||||
) -> Dict[object, ExecutionProfile]: | |||||
if consistency_level not in ConsistencyLevel.name_to_value: | |||||
raise ValueError( | |||||
"Configuration error: Unknown consistency level '%s'" % consistency_level | |||||
ardumont: that's not a logging instruction so you can use f-string here ;) | |||||
) | |||||
return { | |||||
EXEC_PROFILE_DEFAULT: ExecutionProfile( | EXEC_PROFILE_DEFAULT: ExecutionProfile( | ||||
load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy()), | load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy()), | ||||
row_factory=dict_factory, | row_factory=dict_factory, | ||||
), | consistency_level=ConsistencyLevel.name_to_value[consistency_level], | ||||
) | |||||
} | } | ||||
# Configuration for cassandra-driver's access to servers: | # Configuration for cassandra-driver's access to servers: | ||||
# * hit the right server directly when sending a query (TokenAwarePolicy), | # * hit the right server directly when sending a query (TokenAwarePolicy), | ||||
# * if there's more than one, then pick one at random that's in the same | # * if there's more than one, then pick one at random that's in the same | ||||
# datacenter as the client (DCAwareRoundRobinPolicy) | # datacenter as the client (DCAwareRoundRobinPolicy) | ||||
def create_keyspace( | def create_keyspace( | ||||
hosts: List[str], keyspace: str, port: int = 9042, *, durable_writes=True | hosts: List[str], keyspace: str, port: int = 9042, *, durable_writes=True | ||||
): | ): | ||||
cluster = Cluster(hosts, port=port, execution_profiles=_execution_profiles) | cluster = Cluster(hosts, port=port, execution_profiles=get_execution_profiles()) | ||||
ardumontUnsubmitted Not Done Inline ActionsShouldn't this be consistent with the execution profile the CqlRunner uses? No idea... After checking which part is using this, it seems to be mostly a test utility function so i'm confused... @vlorentz any hints? ardumont: Shouldn't this be consistent with the execution profile the CqlRunner uses?
No idea...
After… | |||||
ardumontUnsubmitted Not Done Inline Actions@vsellier showed me a use case [1] Still, it'd be best this function clearly explains it's administration related in its docstring (outside the scope of this diff) ardumont: @vsellier showed me a use case [1]
Still, it'd be best this function clearly explains it's… | |||||
vsellierAuthorUnsubmitted Done Inline ActionsIMO it's not necessary at it's only to bootstrap the database. vsellier: IMO it's not necessary at it's only to bootstrap the database.
It certainly needs some cares to… | |||||
vsellierAuthorUnsubmitted Done Inline Actionsmy previous comment was the reply to
vsellier: my previous comment was the reply to
>Shouldn't this be consistent with the execution profile… | |||||
ardumontUnsubmitted Not Done Inline Actionsyes, thanks, i understood it that way ;) ardumont: yes, thanks, i understood it that way ;) | |||||
session = cluster.connect() | session = cluster.connect() | ||||
extra_params = "" | extra_params = "" | ||||
if not durable_writes: | if not durable_writes: | ||||
extra_params = "AND durable_writes = false" | extra_params = "AND durable_writes = false" | ||||
session.execute( | session.execute( | ||||
"""CREATE KEYSPACE IF NOT EXISTS "%s" | """CREATE KEYSPACE IF NOT EXISTS "%s" | ||||
WITH REPLICATION = { | WITH REPLICATION = { | ||||
'class' : 'SimpleStrategy', | 'class' : 'SimpleStrategy', | ||||
▲ Show 20 Lines • Show All 114 Lines • ▼ Show 20 Lines | return next_value_int.to_bytes( | ||||
(next_value_int.bit_length() + 7) // 8, byteorder="big" | (next_value_int.bit_length() + 7) // 8, byteorder="big" | ||||
) | ) | ||||
class CqlRunner: | class CqlRunner: | ||||
"""Class managing prepared statements and building queries to be sent | """Class managing prepared statements and building queries to be sent | ||||
to Cassandra.""" | to Cassandra.""" | ||||
def __init__(self, hosts: List[str], keyspace: str, port: int): | def __init__( | ||||
self, hosts: List[str], keyspace: str, port: int, consistency_level: str | |||||
): | |||||
self._cluster = Cluster( | self._cluster = Cluster( | ||||
hosts, port=port, execution_profiles=_execution_profiles | hosts, | ||||
port=port, | |||||
execution_profiles=get_execution_profiles(consistency_level), | |||||
) | ) | ||||
self._session = self._cluster.connect(keyspace) | self._session = self._cluster.connect(keyspace) | ||||
self._cluster.register_user_type( | self._cluster.register_user_type( | ||||
keyspace, "microtimestamp_with_timezone", TimestampWithTimezone | keyspace, "microtimestamp_with_timezone", TimestampWithTimezone | ||||
) | ) | ||||
self._cluster.register_user_type(keyspace, "microtimestamp", Timestamp) | self._cluster.register_user_type(keyspace, "microtimestamp", Timestamp) | ||||
self._cluster.register_user_type(keyspace, "person", Person) | self._cluster.register_user_type(keyspace, "person", Person) | ||||
▲ Show 20 Lines • Show All 1,013 Lines • Show Last 20 Lines |
that's not a logging instruction so you can use f-string here ;)