Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/cql.py
Show First 20 Lines • Show All 192 Lines • ▼ Show 20 Lines | |||||
def _prepared_exists_statement( | def _prepared_exists_statement( | ||||
table_name: str, | table_name: str, | ||||
) -> Callable[ | ) -> Callable[ | ||||
[Callable[[TSelf, TArg, NamedArg(Any, "statement")], TRet]], # noqa | [Callable[[TSelf, TArg, NamedArg(Any, "statement")], TRet]], # noqa | ||||
Callable[[TSelf, TArg], TRet], | Callable[[TSelf, TArg], TRet], | ||||
]: | ]: | ||||
"""Shorthand for using `_prepared_statement` for queries that only | """Shorthand for using `_prepared_statement` for queries that only | ||||
check which ids in a list exist in the table.""" | check which ids in a list exist in the table.""" | ||||
return _prepared_statement(f"SELECT id FROM {table_name} WHERE id IN ?") | return _prepared_statement(f"SELECT id FROM {table_name} WHERE id = ?") | ||||
def _prepared_select_statement( | def _prepared_select_statement( | ||||
row_class: Type[BaseRow], clauses: str = "", cols: Optional[List[str]] = None, | row_class: Type[BaseRow], clauses: str = "", cols: Optional[List[str]] = None, | ||||
) -> Callable[[Callable[..., TRet]], Callable[..., TRet]]: | ) -> Callable[[Callable[..., TRet]], Callable[..., TRet]]: | ||||
if cols is None: | if cols is None: | ||||
cols = row_class.cols() | cols = row_class.cols() | ||||
▲ Show 20 Lines • Show All 83 Lines • ▼ Show 20 Lines | def _execute_with_retries(self, statement, args: Optional[Sequence]) -> ResultSet: | ||||
return self._session.execute(statement, args, timeout=1000.0) | return self._session.execute(statement, args, timeout=1000.0) | ||||
@retry( | @retry( | ||||
wait=wait_random_exponential(multiplier=1, max=10), | wait=wait_random_exponential(multiplier=1, max=10), | ||||
stop=stop_after_attempt(MAX_RETRIES), | stop=stop_after_attempt(MAX_RETRIES), | ||||
retry=retry_if_exception_type(CoordinationFailure), | retry=retry_if_exception_type(CoordinationFailure), | ||||
) | ) | ||||
def _execute_many_with_retries( | def _execute_many_with_retries( | ||||
self, statement, args_list: List[Tuple] | self, statement, args_list: Sequence[Tuple] | ||||
) -> Iterable[Dict[str, Any]]: | ) -> Iterable[Dict[str, Any]]: | ||||
for res in execute_concurrent_with_args(self._session, statement, args_list): | for res in execute_concurrent_with_args(self._session, statement, args_list): | ||||
yield from res.result_or_exc | yield from res.result_or_exc | ||||
def _add_one(self, statement, obj: BaseRow) -> None: | def _add_one(self, statement, obj: BaseRow) -> None: | ||||
self._execute_with_retries(statement, dataclasses.astuple(obj)) | self._execute_with_retries(statement, dataclasses.astuple(obj)) | ||||
def _add_many(self, statement, objs: Sequence[BaseRow]) -> None: | def _add_many(self, statement, objs: Sequence[BaseRow]) -> None: | ||||
Show All 16 Lines | def _get_random_row(self, row_class: Type[_T], statement) -> Optional[_T]: # noqa | ||||
# There are no row with a greater token; wrap around to get | # There are no row with a greater token; wrap around to get | ||||
# the row with the smallest token | # the row with the smallest token | ||||
rows = self._execute_with_retries(statement, [TOKEN_BEGIN]) | rows = self._execute_with_retries(statement, [TOKEN_BEGIN]) | ||||
if rows: | if rows: | ||||
return row_class.from_dict(rows.one()) # type: ignore | return row_class.from_dict(rows.one()) # type: ignore | ||||
else: | else: | ||||
return None | return None | ||||
def _missing(self, statement, ids): | def _missing(self, statement: PreparedStatement, ids): | ||||
found_ids = set() | found_ids = set() | ||||
for id_group in grouper(ids, PARTITION_KEY_RESTRICTION_MAX_SIZE): | |||||
rows = self._execute_with_retries(statement, [list(id_group)]) | if not ids: | ||||
found_ids.update(row["id"] for row in rows) | return [] | ||||
for row in self._execute_many_with_retries(statement, [(id_,) for id_ in ids]): | |||||
found_ids.add(row["id"]) | |||||
return [id_ for id_ in ids if id_ not in found_ids] | return [id_ for id_ in ids if id_ not in found_ids] | ||||
########################## | ########################## | ||||
# 'content' table | # 'content' table | ||||
########################## | ########################## | ||||
def _content_add_finalize(self, statement: BoundStatement) -> None: | def _content_add_finalize(self, statement: BoundStatement) -> None: | ||||
"""Returned currified by content_add_prepare, to be called when the | """Returned currified by content_add_prepare, to be called when the | ||||
▲ Show 20 Lines • Show All 1,101 Lines • Show Last 20 Lines |