Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/cql.py
Show All 29 Lines | |||||
from mypy_extensions import NamedArg | from mypy_extensions import NamedArg | ||||
from tenacity import ( | from tenacity import ( | ||||
retry, | retry, | ||||
retry_if_exception_type, | retry_if_exception_type, | ||||
stop_after_attempt, | stop_after_attempt, | ||||
wait_random_exponential, | wait_random_exponential, | ||||
) | ) | ||||
from swh.core.utils import grouper | |||||
from swh.model.identifiers import CoreSWHID | from swh.model.identifiers import CoreSWHID | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Content, | Content, | ||||
Person, | Person, | ||||
Sha1Git, | Sha1Git, | ||||
SkippedContent, | SkippedContent, | ||||
Timestamp, | Timestamp, | ||||
TimestampWithTimezone, | TimestampWithTimezone, | ||||
Show All 23 Lines | from .model import ( | ||||
RevisionRow, | RevisionRow, | ||||
SkippedContentRow, | SkippedContentRow, | ||||
SnapshotBranchRow, | SnapshotBranchRow, | ||||
SnapshotRow, | SnapshotRow, | ||||
content_index_table_name, | content_index_table_name, | ||||
) | ) | ||||
from .schema import CREATE_TABLES_QUERIES, HASH_ALGORITHMS | from .schema import CREATE_TABLES_QUERIES, HASH_ALGORITHMS | ||||
PARTITION_KEY_RESTRICTION_MAX_SIZE = 100 | |||||
"""Maximum number of restrictions in a single query. | |||||
Usually this is a very low number (eg. SELECT ... FROM ... WHERE x=?), | |||||
but some queries can request arbitrarily many (eg. SELECT ... FROM ... WHERE x IN ?). | |||||
This can cause performance issues, as the node getting the query need to | |||||
coordinate with other nodes to get the complete results. | |||||
See <https://github.com/scylladb/scylla/pull/4797> for details and rationale. | |||||
""" | |||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
def get_execution_profiles( | def get_execution_profiles( | ||||
consistency_level: str = "ONE", | consistency_level: str = "ONE", | ||||
) -> Dict[object, ExecutionProfile]: | ) -> Dict[object, ExecutionProfile]: | ||||
if consistency_level not in ConsistencyLevel.name_to_value: | if consistency_level not in ConsistencyLevel.name_to_value: | ||||
raise ValueError( | raise ValueError( | ||||
▲ Show 20 Lines • Show All 212 Lines • ▼ Show 20 Lines | def _get_random_row(self, row_class: Type[_T], statement) -> Optional[_T]: # noqa | ||||
# the row with the smallest token | # the row with the smallest token | ||||
rows = self._execute_with_retries(statement, [TOKEN_BEGIN]) | rows = self._execute_with_retries(statement, [TOKEN_BEGIN]) | ||||
if rows: | if rows: | ||||
return row_class.from_dict(rows.one()) # type: ignore | return row_class.from_dict(rows.one()) # type: ignore | ||||
else: | else: | ||||
return None | return None | ||||
def _missing(self, statement, ids): | def _missing(self, statement, ids): | ||||
rows = self._execute_with_retries(statement, [ids]) | found_ids = set() | ||||
found_ids = {row["id"] for row in rows} | for id_group in grouper(ids, PARTITION_KEY_RESTRICTION_MAX_SIZE): | ||||
rows = self._execute_with_retries(statement, [list(id_group)]) | |||||
found_ids.update(row["id"] for row in rows) | |||||
return [id_ for id_ in ids if id_ not in found_ids] | return [id_ for id_ in ids if id_ not in found_ids] | ||||
########################## | ########################## | ||||
# 'content' table | # 'content' table | ||||
########################## | ########################## | ||||
def _content_add_finalize(self, statement: BoundStatement) -> None: | def _content_add_finalize(self, statement: BoundStatement) -> None: | ||||
"""Returned currified by content_add_prepare, to be called when the | """Returned currified by content_add_prepare, to be called when the | ||||
▲ Show 20 Lines • Show All 964 Lines • Show Last 20 Lines |