Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/cql.py
Show All 22 Lines | from typing import ( | ||||
Type, | Type, | ||||
TypeVar, | TypeVar, | ||||
Union, | Union, | ||||
) | ) | ||||
from cassandra import ConsistencyLevel, CoordinationFailure | from cassandra import ConsistencyLevel, CoordinationFailure | ||||
from cassandra.cluster import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, ResultSet | from cassandra.cluster import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, ResultSet | ||||
from cassandra.concurrent import execute_concurrent_with_args | from cassandra.concurrent import execute_concurrent_with_args | ||||
from cassandra.metadata import group_keys_by_replica | |||||
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy | from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy | ||||
from cassandra.query import BoundStatement, PreparedStatement, dict_factory | from cassandra.query import BoundStatement, PreparedStatement, dict_factory | ||||
from mypy_extensions import NamedArg | from mypy_extensions import NamedArg | ||||
from tenacity import ( | from tenacity import ( | ||||
retry, | retry, | ||||
retry_if_exception_type, | retry_if_exception_type, | ||||
stop_after_attempt, | stop_after_attempt, | ||||
wait_random_exponential, | wait_random_exponential, | ||||
▲ Show 20 Lines • Show All 45 Lines • ▼ Show 20 Lines | |||||
but some queries can request arbitrarily many (eg. SELECT ... FROM ... WHERE x IN ?). | but some queries can request arbitrarily many (eg. SELECT ... FROM ... WHERE x IN ?). | ||||
This can cause performance issues, as the node getting the query need to | This can cause performance issues, as the node getting the query need to | ||||
coordinate with other nodes to get the complete results. | coordinate with other nodes to get the complete results. | ||||
See <https://github.com/scylladb/scylla/pull/4797> for details and rationale. | See <https://github.com/scylladb/scylla/pull/4797> for details and rationale. | ||||
""" | """ | ||||
BATCH_INSERT_MAX_SIZE = 1000 | BATCH_INSERT_MAX_SIZE = 1000 | ||||
SELECT_MISSING_ALGOS = [ | |||||
"concurrent", | |||||
"grouped-naive", | |||||
"grouped-pk-serial", | |||||
"grouped-pk-concurrent", | |||||
] | |||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
def get_execution_profiles( | def get_execution_profiles( | ||||
consistency_level: str = "ONE", | consistency_level: str = "ONE", | ||||
) -> Dict[object, ExecutionProfile]: | ) -> Dict[object, ExecutionProfile]: | ||||
▲ Show 20 Lines • Show All 151 Lines • ▼ Show 20 Lines | def _next_bytes_value(value: bytes) -> bytes: | ||||
) | ) | ||||
class CqlRunner: | class CqlRunner: | ||||
"""Class managing prepared statements and building queries to be sent | """Class managing prepared statements and building queries to be sent | ||||
to Cassandra.""" | to Cassandra.""" | ||||
def __init__( | def __init__( | ||||
self, hosts: List[str], keyspace: str, port: int, consistency_level: str | self, | ||||
hosts: List[str], | |||||
keyspace: str, | |||||
port: int, | |||||
consistency_level: str, | |||||
select_missing_algo: str, | |||||
): | ): | ||||
self._keyspace = keyspace | |||||
self._cluster = Cluster( | self._cluster = Cluster( | ||||
hosts, | hosts, | ||||
port=port, | port=port, | ||||
execution_profiles=get_execution_profiles(consistency_level), | execution_profiles=get_execution_profiles(consistency_level), | ||||
) | ) | ||||
self._session = self._cluster.connect(keyspace) | self._session = self._cluster.connect(keyspace) | ||||
self._cluster.register_user_type( | self._cluster.register_user_type( | ||||
keyspace, "microtimestamp_with_timezone", TimestampWithTimezone | keyspace, "microtimestamp_with_timezone", TimestampWithTimezone | ||||
) | ) | ||||
self._cluster.register_user_type(keyspace, "microtimestamp", Timestamp) | self._cluster.register_user_type(keyspace, "microtimestamp", Timestamp) | ||||
self._cluster.register_user_type(keyspace, "person", Person) | self._cluster.register_user_type(keyspace, "person", Person) | ||||
# directly a PreparedStatement for methods decorated with | # directly a PreparedStatement for methods decorated with | ||||
# @_prepared_statements (and its wrappers, _prepared_insert_statement, | # @_prepared_statements (and its wrappers, _prepared_insert_statement, | ||||
# _prepared_exists_statement, and _prepared_select_statement); | # _prepared_exists_statement, and _prepared_select_statement); | ||||
# and a dict of PreparedStatements with @_prepared_select_statements | # and a dict of PreparedStatements with @_prepared_select_statements | ||||
self._prepared_statements: Dict[ | self._prepared_statements: Dict[ | ||||
str, Union[PreparedStatement, Dict[Any, PreparedStatement]] | str, Union[PreparedStatement, Dict[Any, PreparedStatement]] | ||||
] = {} | ] = {} | ||||
self._select_missing_algo = select_missing_algo | |||||
########################## | ########################## | ||||
# Common utility functions | # Common utility functions | ||||
########################## | ########################## | ||||
MAX_RETRIES = 3 | MAX_RETRIES = 3 | ||||
@retry( | @retry( | ||||
wait=wait_random_exponential(multiplier=1, max=10), | wait=wait_random_exponential(multiplier=1, max=10), | ||||
stop=stop_after_attempt(MAX_RETRIES), | stop=stop_after_attempt(MAX_RETRIES), | ||||
retry=retry_if_exception_type(CoordinationFailure), | retry=retry_if_exception_type(CoordinationFailure), | ||||
) | ) | ||||
def _execute_with_retries(self, statement, args: Optional[Sequence]) -> ResultSet: | def _execute_with_retries(self, statement, args: Optional[Sequence]) -> ResultSet: | ||||
return self._session.execute(statement, args, timeout=1000.0) | return self._session.execute(statement, args, timeout=1000.0) | ||||
@retry( | @retry( | ||||
wait=wait_random_exponential(multiplier=1, max=10), | wait=wait_random_exponential(multiplier=1, max=10), | ||||
stop=stop_after_attempt(MAX_RETRIES), | stop=stop_after_attempt(MAX_RETRIES), | ||||
retry=retry_if_exception_type(CoordinationFailure), | retry=retry_if_exception_type(CoordinationFailure), | ||||
) | ) | ||||
def _execute_many_with_retries( | def _execute_many_with_retries( | ||||
self, statement, args_list: List[Tuple] | self, statement, args_list: Sequence[Tuple] | ||||
) -> Iterable[Dict[str, Any]]: | ) -> Iterable[Dict[str, Any]]: | ||||
for res in execute_concurrent_with_args(self._session, statement, args_list): | for res in execute_concurrent_with_args(self._session, statement, args_list): | ||||
yield from res.result_or_exc | yield from res.result_or_exc | ||||
def _add_one(self, statement, obj: BaseRow) -> None: | def _add_one(self, statement, obj: BaseRow) -> None: | ||||
self._execute_with_retries(statement, dataclasses.astuple(obj)) | self._execute_with_retries(statement, dataclasses.astuple(obj)) | ||||
def _add_many(self, statement, objs: Sequence[BaseRow]) -> None: | def _add_many(self, statement, objs: Sequence[BaseRow]) -> None: | ||||
Show All 16 Lines | def _get_random_row(self, row_class: Type[_T], statement) -> Optional[_T]: # noqa | ||||
# There are no row with a greater token; wrap around to get | # There are no row with a greater token; wrap around to get | ||||
# the row with the smallest token | # the row with the smallest token | ||||
rows = self._execute_with_retries(statement, [TOKEN_BEGIN]) | rows = self._execute_with_retries(statement, [TOKEN_BEGIN]) | ||||
if rows: | if rows: | ||||
return row_class.from_dict(rows.one()) # type: ignore | return row_class.from_dict(rows.one()) # type: ignore | ||||
else: | else: | ||||
return None | return None | ||||
def _missing(self, statement, ids): | def _missing(self, statement: PreparedStatement, ids): | ||||
found_ids = set() | found_ids = set() | ||||
if not ids: | |||||
return [] | |||||
if self._select_missing_algo == "concurrent": | |||||
# One statement per id | |||||
for row in self._execute_many_with_retries( | |||||
statement, [([id_],) for id_ in ids] | |||||
): | |||||
found_ids.add(row["id"]) | |||||
elif self._select_missing_algo == "grouped-naive": | |||||
# Grouped in the order they were given | |||||
for id_group in grouper(ids, PARTITION_KEY_RESTRICTION_MAX_SIZE): | for id_group in grouper(ids, PARTITION_KEY_RESTRICTION_MAX_SIZE): | ||||
rows = self._execute_with_retries(statement, [list(id_group)]) | rows = self._execute_with_retries(statement, [list(id_group)]) | ||||
found_ids.update(row["id"] for row in rows) | found_ids.update(row["id"] for row in rows) | ||||
else: | |||||
# Grouped smartly, so each query only fetches data from a single server | |||||
(first_col, *_) = statement.column_metadata | |||||
table = first_col.table_name | |||||
groups = group_keys_by_replica(self._session, self._keyspace, table, ids) | |||||
subgroups = [ | |||||
(list(subgroup),) | |||||
for (host, group) in groups.items() | |||||
for subgroup in grouper(group, PARTITION_KEY_RESTRICTION_MAX_SIZE) | |||||
] | |||||
if self._select_missing_algo == "grouped-pk-serial": | |||||
# Send queries for each subgroup, one-by-one | |||||
for subgroup in subgroups: | |||||
rows = self._execute_with_retries(statement, subgroup) | |||||
found_ids.update(row["id"] for row in rows) | |||||
elif self._select_missing_algo == "grouped-pk-concurrent": | |||||
# Same as above, but we send all queries in parallel | |||||
for row in self._execute_many_with_retries(statement, subgroups): | |||||
found_ids.add(row["id"]) | |||||
else: | |||||
assert False, self._select_missing_algo | |||||
return [id_ for id_ in ids if id_ not in found_ids] | return [id_ for id_ in ids if id_ not in found_ids] | ||||
########################## | ########################## | ||||
# 'content' table | # 'content' table | ||||
########################## | ########################## | ||||
def _content_add_finalize(self, statement: BoundStatement) -> None: | def _content_add_finalize(self, statement: BoundStatement) -> None: | ||||
"""Returned currified by content_add_prepare, to be called when the | """Returned currified by content_add_prepare, to be called when the | ||||
▲ Show 20 Lines • Show All 1,093 Lines • Show Last 20 Lines |