Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/cql.py
Show First 20 Lines • Show All 293 Lines • ▼ Show 20 Lines | class CqlRunner: | ||||
@retry( | @retry( | ||||
wait=wait_random_exponential(multiplier=1, max=10), | wait=wait_random_exponential(multiplier=1, max=10), | ||||
stop=stop_after_attempt(MAX_RETRIES), | stop=stop_after_attempt(MAX_RETRIES), | ||||
retry=retry_if_exception_type(CoordinationFailure), | retry=retry_if_exception_type(CoordinationFailure), | ||||
) | ) | ||||
def _execute_many_with_retries( | def _execute_many_with_retries( | ||||
self, statement, args_list: List[Tuple] | self, statement, args_list: List[Tuple] | ||||
) -> Iterable[BaseRow]: | ) -> Iterable[Dict[str, Any]]: | ||||
for res in execute_concurrent_with_args(self._session, statement, args_list): | for res in execute_concurrent_with_args(self._session, statement, args_list): | ||||
yield from res.result_or_exc | yield from res.result_or_exc | ||||
def _add_one(self, statement, obj: BaseRow) -> None: | def _add_one(self, statement, obj: BaseRow) -> None: | ||||
self._execute_with_retries(statement, dataclasses.astuple(obj)) | self._execute_with_retries(statement, dataclasses.astuple(obj)) | ||||
def _add_many(self, statement, objs: Sequence[BaseRow]) -> None: | def _add_many(self, statement, objs: Sequence[BaseRow]) -> None: | ||||
tables = {obj.TABLE for obj in objs} | tables = {obj.TABLE for obj in objs} | ||||
▲ Show 20 Lines • Show All 108 Lines • ▼ Show 20 Lines | def _content_get_hashes_from_sha256( | ||||
self, ids: List[bytes], *, statement | self, ids: List[bytes], *, statement | ||||
) -> Iterator[Tuple[bytes, bytes, bytes, bytes]]: | ) -> Iterator[Tuple[bytes, bytes, bytes, bytes]]: | ||||
for row in self._execute_with_retries(statement, [ids]): | for row in self._execute_with_retries(statement, [ids]): | ||||
yield tuple(row[algo] for algo in HASH_ALGORITHMS) # type: ignore | yield tuple(row[algo] for algo in HASH_ALGORITHMS) # type: ignore | ||||
@_prepared_select_statement( | @_prepared_select_statement( | ||||
ContentRow, f"WHERE token({', '.join(ContentRow.PARTITION_KEY)}) = ?" | ContentRow, f"WHERE token({', '.join(ContentRow.PARTITION_KEY)}) = ?" | ||||
) | ) | ||||
def content_get_from_token(self, token, *, statement) -> Iterable[ContentRow]: | def content_get_from_tokens(self, tokens, *, statement) -> Iterable[ContentRow]: | ||||
return map(ContentRow.from_dict, self._execute_with_retries(statement, [token])) | return map( | ||||
ContentRow.from_dict, | |||||
self._execute_many_with_retries(statement, [(token,) for token in tokens]), | |||||
) | |||||
@_prepared_select_statement( | @_prepared_select_statement( | ||||
ContentRow, f"WHERE token({', '.join(ContentRow.PARTITION_KEY)}) > ? LIMIT 1" | ContentRow, f"WHERE token({', '.join(ContentRow.PARTITION_KEY)}) > ? LIMIT 1" | ||||
) | ) | ||||
def content_get_random(self, *, statement) -> Optional[ContentRow]: | def content_get_random(self, *, statement) -> Optional[ContentRow]: | ||||
return self._get_random_row(ContentRow, statement) | return self._get_random_row(ContentRow, statement) | ||||
@_prepared_statement( | @_prepared_statement( | ||||
▲ Show 20 Lines • Show All 1,012 Lines • Show Last 20 Lines |