Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/cql.py
Show First 20 Lines • Show All 283 Lines • ▼ Show 20 Lines | class CqlRunner: | ||||
MAX_RETRIES = 3 | MAX_RETRIES = 3 | ||||
@retry( | @retry( | ||||
wait=wait_random_exponential(multiplier=1, max=10), | wait=wait_random_exponential(multiplier=1, max=10), | ||||
stop=stop_after_attempt(MAX_RETRIES), | stop=stop_after_attempt(MAX_RETRIES), | ||||
retry=retry_if_exception_type(CoordinationFailure), | retry=retry_if_exception_type(CoordinationFailure), | ||||
) | ) | ||||
def _execute_with_retries(self, statement, args: Optional[Sequence]) -> ResultSet: | def _execute_with_retries(self, statement, args: Optional[Sequence]) -> ResultSet: | ||||
return self._session.execute(statement, args, timeout=1000.0) | return self._session.execute(statement, args, timeout=1000.0) | ||||
olasd: Shouldn't this be turned into a generator for consistency? Or the other method kept eager? Is… | |||||
Done Inline Actions
ResultSet is lazy. https://docs.datastax.com/en/developer/python-driver/3.24/api/cassandra/cluster/#cassandra.cluster.ResultSet
In _add_many. vlorentz: > Shouldn't this be turned into a generator for consistency? Or the other method kept eager? | |||||
@retry( | @retry( | ||||
wait=wait_random_exponential(multiplier=1, max=10), | wait=wait_random_exponential(multiplier=1, max=10), | ||||
stop=stop_after_attempt(MAX_RETRIES), | stop=stop_after_attempt(MAX_RETRIES), | ||||
retry=retry_if_exception_type(CoordinationFailure), | retry=retry_if_exception_type(CoordinationFailure), | ||||
) | ) | ||||
def _execute_many_with_retries( | def _execute_many_with_retries( | ||||
self, statement, args_list: List[Tuple] | self, statement, args_list: List[Tuple] | ||||
) -> ResultSet: | ) -> Iterable[BaseRow]: | ||||
return execute_concurrent_with_args(self._session, statement, args_list) | for res in execute_concurrent_with_args(self._session, statement, args_list): | ||||
yield from res.result_or_exc | |||||
def _add_one(self, statement, obj: BaseRow) -> None: | def _add_one(self, statement, obj: BaseRow) -> None: | ||||
self._execute_with_retries(statement, dataclasses.astuple(obj)) | self._execute_with_retries(statement, dataclasses.astuple(obj)) | ||||
def _add_many(self, statement, objs: Sequence[BaseRow]) -> None: | def _add_many(self, statement, objs: Sequence[BaseRow]) -> None: | ||||
tables = {obj.TABLE for obj in objs} | tables = {obj.TABLE for obj in objs} | ||||
assert len(tables) == 1, f"Cannot insert to multiple tables: {tables}" | assert len(tables) == 1, f"Cannot insert to multiple tables: {tables}" | ||||
(table,) = tables | rows = list(map(dataclasses.astuple, objs)) | ||||
self._execute_many_with_retries(statement, list(map(dataclasses.astuple, objs))) | for _ in self._execute_many_with_retries(statement, rows): | ||||
# Need to consume the generator to actually run the INSERTs | |||||
Not Done Inline Actions*ugh* generators. olasd: *ugh* generators. | |||||
Done Inline Actionsikr... as usual it took me some time to figure it out vlorentz: ikr... as usual it took me some time to figure it out | |||||
pass | |||||
Not Done Inline ActionsUse a list comprehension? olasd: Use a list comprehension? | |||||
_T = TypeVar("_T", bound=BaseRow) | _T = TypeVar("_T", bound=BaseRow) | ||||
def _get_random_row(self, row_class: Type[_T], statement) -> Optional[_T]: # noqa | def _get_random_row(self, row_class: Type[_T], statement) -> Optional[_T]: # noqa | ||||
"""Takes a prepared statement of the form | """Takes a prepared statement of the form | ||||
"SELECT * FROM <table> WHERE token(<keys>) > ? LIMIT 1" | "SELECT * FROM <table> WHERE token(<keys>) > ? LIMIT 1" | ||||
and uses it to return a random row""" | and uses it to return a random row""" | ||||
token = random.randint(TOKEN_BEGIN, TOKEN_END) | token = random.randint(TOKEN_BEGIN, TOKEN_END) | ||||
▲ Show 20 Lines • Show All 149 Lines • ▼ Show 20 Lines | def content_index_add_one(self, algo: str, content: Content, token: int) -> None: | ||||
the main 'content' table.""" | the main 'content' table.""" | ||||
query = f""" | query = f""" | ||||
INSERT INTO {content_index_table_name(algo, skipped_content=False)} | INSERT INTO {content_index_table_name(algo, skipped_content=False)} | ||||
({algo}, target_token) | ({algo}, target_token) | ||||
VALUES (%s, %s) | VALUES (%s, %s) | ||||
""" | """ | ||||
self._execute_with_retries(query, [content.get_hash(algo), token]) | self._execute_with_retries(query, [content.get_hash(algo), token]) | ||||
def content_get_tokens_from_single_hash( | def content_get_tokens_from_single_algo( | ||||
self, algo: str, hash_: bytes | self, algo: str, hashes: List[bytes] | ||||
) -> Iterable[int]: | ) -> Iterable[int]: | ||||
assert algo in HASH_ALGORITHMS | assert algo in HASH_ALGORITHMS | ||||
query = f""" | query = f""" | ||||
SELECT target_token | SELECT target_token | ||||
FROM {content_index_table_name(algo, skipped_content=False)} | FROM {content_index_table_name(algo, skipped_content=False)} | ||||
WHERE {algo} = %s | WHERE {algo} = %s | ||||
""" | """ | ||||
return ( | return ( | ||||
row["target_token"] for row in self._execute_with_retries(query, [hash_]) | row["target_token"] # type: ignore | ||||
for row in self._execute_many_with_retries( | |||||
query, [(hash_,) for hash_ in hashes] | |||||
) | |||||
) | ) | ||||
########################## | ########################## | ||||
# 'skipped_content' table | # 'skipped_content' table | ||||
########################## | ########################## | ||||
def _skipped_content_add_finalize(self, statement: BoundStatement) -> None: | def _skipped_content_add_finalize(self, statement: BoundStatement) -> None: | ||||
"""Returned currified by skipped_content_add_prepare, to be called | """Returned currified by skipped_content_add_prepare, to be called | ||||
▲ Show 20 Lines • Show All 897 Lines • Show Last 20 Lines |
Shouldn't this be turned into a generator for consistency? Or the other method kept eager? Is there a point where we do _execute_many_with_retries but don't want to keep all results?