Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/cql.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import Counter | from collections import Counter | ||||
import dataclasses | import dataclasses | ||||
import datetime | import datetime | ||||
import functools | import functools | ||||
import itertools | |||||
import logging | import logging | ||||
import random | import random | ||||
from typing import ( | from typing import ( | ||||
Any, | Any, | ||||
Callable, | Callable, | ||||
Dict, | Dict, | ||||
Iterable, | Iterable, | ||||
Iterator, | Iterator, | ||||
List, | List, | ||||
Optional, | Optional, | ||||
Sequence, | |||||
Tuple, | Tuple, | ||||
Type, | Type, | ||||
TypeVar, | TypeVar, | ||||
Union, | Union, | ||||
) | ) | ||||
from cassandra import ConsistencyLevel, CoordinationFailure | from cassandra import ConsistencyLevel, CoordinationFailure | ||||
from cassandra.cluster import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, ResultSet | from cassandra.cluster import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, ResultSet | ||||
from cassandra.concurrent import execute_concurrent_with_args | |||||
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy | from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy | ||||
from cassandra.query import BoundStatement, PreparedStatement, dict_factory | from cassandra.query import BoundStatement, PreparedStatement, dict_factory | ||||
from mypy_extensions import NamedArg | from mypy_extensions import NamedArg | ||||
from tenacity import ( | from tenacity import ( | ||||
retry, | retry, | ||||
retry_if_exception_type, | retry_if_exception_type, | ||||
stop_after_attempt, | stop_after_attempt, | ||||
wait_random_exponential, | wait_random_exponential, | ||||
▲ Show 20 Lines • Show All 44 Lines • ▼ Show 20 Lines | |||||
Usually this is a very low number (eg. SELECT ... FROM ... WHERE x=?), | Usually this is a very low number (eg. SELECT ... FROM ... WHERE x=?), | ||||
but some queries can request arbitrarily many (eg. SELECT ... FROM ... WHERE x IN ?). | but some queries can request arbitrarily many (eg. SELECT ... FROM ... WHERE x IN ?). | ||||
This can cause performance issues, as the node getting the query need to | This can cause performance issues, as the node getting the query need to | ||||
coordinate with other nodes to get the complete results. | coordinate with other nodes to get the complete results. | ||||
See <https://github.com/scylladb/scylla/pull/4797> for details and rationale. | See <https://github.com/scylladb/scylla/pull/4797> for details and rationale. | ||||
""" | """ | ||||
BATCH_INSERT_MAX_SIZE = 1000 | |||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
def get_execution_profiles( | def get_execution_profiles( | ||||
consistency_level: str = "ONE", | consistency_level: str = "ONE", | ||||
) -> Dict[object, ExecutionProfile]: | ) -> Dict[object, ExecutionProfile]: | ||||
if consistency_level not in ConsistencyLevel.name_to_value: | if consistency_level not in ConsistencyLevel.name_to_value: | ||||
▲ Show 20 Lines • Show All 65 Lines • ▼ Show 20 Lines | ) -> Callable[[Callable[..., TRet]], Callable[..., TRet]]: | ||||
return decorator | return decorator | ||||
TArg = TypeVar("TArg") | TArg = TypeVar("TArg") | ||||
TSelf = TypeVar("TSelf") | TSelf = TypeVar("TSelf") | ||||
def _insert_query(row_class): | |||||
columns = row_class.cols() | |||||
return ( | |||||
f"INSERT INTO {row_class.TABLE} ({', '.join(columns)}) " | |||||
f"VALUES ({', '.join('?' for _ in columns)})" | |||||
) | |||||
def _prepared_insert_statement( | def _prepared_insert_statement( | ||||
row_class: Type[BaseRow], | row_class: Type[BaseRow], | ||||
) -> Callable[ | ) -> Callable[ | ||||
[Callable[[TSelf, TArg, NamedArg(Any, "statement")], TRet]], # noqa | [Callable[[TSelf, TArg, NamedArg(Any, "statement")], TRet]], # noqa | ||||
Callable[[TSelf, TArg], TRet], | Callable[[TSelf, TArg], TRet], | ||||
]: | ]: | ||||
"""Shorthand for using `_prepared_statement` for `INSERT INTO` | """Shorthand for using `_prepared_statement` for `INSERT INTO` | ||||
statements.""" | statements.""" | ||||
columns = row_class.cols() | return _prepared_statement(_insert_query(row_class)) | ||||
return _prepared_statement( | |||||
"INSERT INTO %s (%s) VALUES (%s)" | |||||
% (row_class.TABLE, ", ".join(columns), ", ".join("?" for _ in columns),) | |||||
) | |||||
def _prepared_exists_statement( | def _prepared_exists_statement( | ||||
table_name: str, | table_name: str, | ||||
) -> Callable[ | ) -> Callable[ | ||||
[Callable[[TSelf, TArg, NamedArg(Any, "statement")], TRet]], # noqa | [Callable[[TSelf, TArg, NamedArg(Any, "statement")], TRet]], # noqa | ||||
Callable[[TSelf, TArg], TRet], | Callable[[TSelf, TArg], TRet], | ||||
]: | ]: | ||||
▲ Show 20 Lines • Show All 85 Lines • ▼ Show 20 Lines | class CqlRunner: | ||||
MAX_RETRIES = 3 | MAX_RETRIES = 3 | ||||
@retry( | @retry( | ||||
wait=wait_random_exponential(multiplier=1, max=10), | wait=wait_random_exponential(multiplier=1, max=10), | ||||
stop=stop_after_attempt(MAX_RETRIES), | stop=stop_after_attempt(MAX_RETRIES), | ||||
retry=retry_if_exception_type(CoordinationFailure), | retry=retry_if_exception_type(CoordinationFailure), | ||||
) | ) | ||||
def _execute_with_retries(self, statement, args) -> ResultSet: | def _execute_with_retries(self, statement, args: Optional[Sequence]) -> ResultSet: | ||||
return self._session.execute(statement, args, timeout=1000.0) | return self._session.execute(statement, args, timeout=1000.0) | ||||
@retry( | |||||
wait=wait_random_exponential(multiplier=1, max=10), | |||||
stop=stop_after_attempt(MAX_RETRIES), | |||||
retry=retry_if_exception_type(CoordinationFailure), | |||||
) | |||||
def _execute_many_with_retries( | |||||
self, statement, args_list: List[Tuple] | |||||
) -> ResultSet: | |||||
return execute_concurrent_with_args(self._session, statement, args_list) | |||||
def _add_one(self, statement, obj: BaseRow) -> None: | def _add_one(self, statement, obj: BaseRow) -> None: | ||||
self._execute_with_retries(statement, dataclasses.astuple(obj)) | self._execute_with_retries(statement, dataclasses.astuple(obj)) | ||||
def _add_many(self, statement, objs: Sequence[BaseRow]) -> None: | |||||
tables = {obj.TABLE for obj in objs} | |||||
assert len(tables) == 1, f"Cannot insert to multiple tables: {tables}" | |||||
(table,) = tables | |||||
self._execute_many_with_retries(statement, list(map(dataclasses.astuple, objs))) | |||||
_T = TypeVar("_T", bound=BaseRow) | _T = TypeVar("_T", bound=BaseRow) | ||||
def _get_random_row(self, row_class: Type[_T], statement) -> Optional[_T]: # noqa | def _get_random_row(self, row_class: Type[_T], statement) -> Optional[_T]: # noqa | ||||
"""Takes a prepared statement of the form | """Takes a prepared statement of the form | ||||
"SELECT * FROM <table> WHERE token(<keys>) > ? LIMIT 1" | "SELECT * FROM <table> WHERE token(<keys>) > ? LIMIT 1" | ||||
and uses it to return a random row""" | and uses it to return a random row""" | ||||
token = random.randint(TOKEN_BEGIN, TOKEN_END) | token = random.randint(TOKEN_BEGIN, TOKEN_END) | ||||
rows = self._execute_with_retries(statement, [token]) | rows = self._execute_with_retries(statement, [token]) | ||||
▲ Show 20 Lines • Show All 363 Lines • ▼ Show 20 Lines | class CqlRunner: | ||||
########################## | ########################## | ||||
# 'directory_entry' table | # 'directory_entry' table | ||||
########################## | ########################## | ||||
@_prepared_insert_statement(DirectoryEntryRow) | @_prepared_insert_statement(DirectoryEntryRow) | ||||
def directory_entry_add_one(self, entry: DirectoryEntryRow, *, statement) -> None: | def directory_entry_add_one(self, entry: DirectoryEntryRow, *, statement) -> None: | ||||
self._add_one(statement, entry) | self._add_one(statement, entry) | ||||
@_prepared_insert_statement(DirectoryEntryRow) | |||||
def directory_entry_add_concurrent( | |||||
self, entries: List[DirectoryEntryRow], *, statement | |||||
) -> None: | |||||
if len(entries) == 0: | |||||
# nothing to do | |||||
return | |||||
assert ( | |||||
len({entry.directory_id for entry in entries}) == 1 | |||||
), "directory_entry_add_many must be called with entries for a single dir" | |||||
self._add_many(statement, entries) | |||||
def directory_entry_add_batch(self, entries: List[DirectoryEntryRow],) -> None: | |||||
if len(entries) == 0: | |||||
# nothing to do | |||||
return | |||||
assert ( | |||||
len({entry.directory_id for entry in entries}) == 1 | |||||
), "directory_entry_add_many must be called with entries for a single dir" | |||||
# query to INSERT one row | |||||
insert_query = _insert_query(DirectoryEntryRow) + ";\n" | |||||
# In "steady state", we insert batches of the maximum allowed size. | |||||
# Then, the last one has however many entries remain. | |||||
last_batch_size = len(entries) % BATCH_INSERT_MAX_SIZE | |||||
if len(entries) >= BATCH_INSERT_MAX_SIZE: | |||||
# TODO: the main_statement's size is statically known, so we could avoid | |||||
# re-preparing it on every call | |||||
main_statement = self._session.prepare( | |||||
"BEGIN UNLOGGED BATCH\n" | |||||
+ insert_query * BATCH_INSERT_MAX_SIZE | |||||
+ "APPLY BATCH" | |||||
) | |||||
last_statement = self._session.prepare( | |||||
"BEGIN UNLOGGED BATCH\n" + insert_query * last_batch_size + "APPLY BATCH" | |||||
) | |||||
for entry_group in grouper(entries, BATCH_INSERT_MAX_SIZE): | |||||
entry_group = list(map(dataclasses.astuple, entry_group)) | |||||
if len(entry_group) == BATCH_INSERT_MAX_SIZE: | |||||
self._execute_with_retries( | |||||
main_statement, list(itertools.chain.from_iterable(entry_group)) | |||||
) | |||||
else: | |||||
assert len(entry_group) == last_batch_size | |||||
self._execute_with_retries( | |||||
last_statement, list(itertools.chain.from_iterable(entry_group)) | |||||
) | |||||
@_prepared_select_statement(DirectoryEntryRow, "WHERE directory_id IN ?") | @_prepared_select_statement(DirectoryEntryRow, "WHERE directory_id IN ?") | ||||
def directory_entry_get( | def directory_entry_get( | ||||
self, directory_ids, *, statement | self, directory_ids, *, statement | ||||
) -> Iterable[DirectoryEntryRow]: | ) -> Iterable[DirectoryEntryRow]: | ||||
return map( | return map( | ||||
DirectoryEntryRow.from_dict, | DirectoryEntryRow.from_dict, | ||||
self._execute_with_retries(statement, [directory_ids]), | self._execute_with_retries(statement, [directory_ids]), | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 650 Lines • Show Last 20 Lines |