Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/cql.py
| Show First 20 Lines • Show All 283 Lines • ▼ Show 20 Lines | class CqlRunner: | ||||
| MAX_RETRIES = 3 | MAX_RETRIES = 3 | ||||
| @retry( | @retry( | ||||
| wait=wait_random_exponential(multiplier=1, max=10), | wait=wait_random_exponential(multiplier=1, max=10), | ||||
| stop=stop_after_attempt(MAX_RETRIES), | stop=stop_after_attempt(MAX_RETRIES), | ||||
| retry=retry_if_exception_type(CoordinationFailure), | retry=retry_if_exception_type(CoordinationFailure), | ||||
| ) | ) | ||||
| def _execute_with_retries(self, statement, args: Optional[Sequence]) -> ResultSet: | def _execute_with_retries(self, statement, args: Optional[Sequence]) -> ResultSet: | ||||
| return self._session.execute(statement, args, timeout=1000.0) | return self._session.execute(statement, args, timeout=1000.0) | ||||
olasd: Shouldn't this be turned into a generator for consistency? Or the other method kept eager? Is… | |||||
Done Inline Actions
ResultSet is lazy. https://docs.datastax.com/en/developer/python-driver/3.24/api/cassandra/cluster/#cassandra.cluster.ResultSet
In _add_many. vlorentz: > Shouldn't this be turned into a generator for consistency? Or the other method kept eager? | |||||
| @retry( | @retry( | ||||
| wait=wait_random_exponential(multiplier=1, max=10), | wait=wait_random_exponential(multiplier=1, max=10), | ||||
| stop=stop_after_attempt(MAX_RETRIES), | stop=stop_after_attempt(MAX_RETRIES), | ||||
| retry=retry_if_exception_type(CoordinationFailure), | retry=retry_if_exception_type(CoordinationFailure), | ||||
| ) | ) | ||||
| def _execute_many_with_retries( | def _execute_many_with_retries( | ||||
| self, statement, args_list: List[Tuple] | self, statement, args_list: List[Tuple] | ||||
| ) -> ResultSet: | ) -> Iterable[BaseRow]: | ||||
| return execute_concurrent_with_args(self._session, statement, args_list) | for res in execute_concurrent_with_args(self._session, statement, args_list): | ||||
| yield from res.result_or_exc | |||||
| def _add_one(self, statement, obj: BaseRow) -> None: | def _add_one(self, statement, obj: BaseRow) -> None: | ||||
| self._execute_with_retries(statement, dataclasses.astuple(obj)) | self._execute_with_retries(statement, dataclasses.astuple(obj)) | ||||
| def _add_many(self, statement, objs: Sequence[BaseRow]) -> None: | def _add_many(self, statement, objs: Sequence[BaseRow]) -> None: | ||||
| tables = {obj.TABLE for obj in objs} | tables = {obj.TABLE for obj in objs} | ||||
| assert len(tables) == 1, f"Cannot insert to multiple tables: {tables}" | assert len(tables) == 1, f"Cannot insert to multiple tables: {tables}" | ||||
| (table,) = tables | rows = list(map(dataclasses.astuple, objs)) | ||||
| self._execute_many_with_retries(statement, list(map(dataclasses.astuple, objs))) | for _ in self._execute_many_with_retries(statement, rows): | ||||
| # Need to consume the generator to actually run the INSERTs | |||||
Not Done Inline Actions*ugh* generators. olasd: *ugh* generators. | |||||
Done Inline Actionsikr... as usual it took me some time to figure it out vlorentz: ikr... as usual it took me some time to figure it out | |||||
| pass | |||||
Not Done Inline ActionsUse a list comprehension? olasd: Use a list comprehension? | |||||
| _T = TypeVar("_T", bound=BaseRow) | _T = TypeVar("_T", bound=BaseRow) | ||||
| def _get_random_row(self, row_class: Type[_T], statement) -> Optional[_T]: # noqa | def _get_random_row(self, row_class: Type[_T], statement) -> Optional[_T]: # noqa | ||||
| """Takes a prepared statement of the form | """Takes a prepared statement of the form | ||||
| "SELECT * FROM <table> WHERE token(<keys>) > ? LIMIT 1" | "SELECT * FROM <table> WHERE token(<keys>) > ? LIMIT 1" | ||||
| and uses it to return a random row""" | and uses it to return a random row""" | ||||
| token = random.randint(TOKEN_BEGIN, TOKEN_END) | token = random.randint(TOKEN_BEGIN, TOKEN_END) | ||||
| ▲ Show 20 Lines • Show All 149 Lines • ▼ Show 20 Lines | def content_index_add_one(self, algo: str, content: Content, token: int) -> None: | ||||
| the main 'content' table.""" | the main 'content' table.""" | ||||
| query = f""" | query = f""" | ||||
| INSERT INTO {content_index_table_name(algo, skipped_content=False)} | INSERT INTO {content_index_table_name(algo, skipped_content=False)} | ||||
| ({algo}, target_token) | ({algo}, target_token) | ||||
| VALUES (%s, %s) | VALUES (%s, %s) | ||||
| """ | """ | ||||
| self._execute_with_retries(query, [content.get_hash(algo), token]) | self._execute_with_retries(query, [content.get_hash(algo), token]) | ||||
| def content_get_tokens_from_single_hash( | def content_get_tokens_from_single_algo( | ||||
| self, algo: str, hash_: bytes | self, algo: str, hashes: List[bytes] | ||||
| ) -> Iterable[int]: | ) -> Iterable[int]: | ||||
| assert algo in HASH_ALGORITHMS | assert algo in HASH_ALGORITHMS | ||||
| query = f""" | query = f""" | ||||
| SELECT target_token | SELECT target_token | ||||
| FROM {content_index_table_name(algo, skipped_content=False)} | FROM {content_index_table_name(algo, skipped_content=False)} | ||||
| WHERE {algo} = %s | WHERE {algo} = %s | ||||
| """ | """ | ||||
| return ( | return ( | ||||
| row["target_token"] for row in self._execute_with_retries(query, [hash_]) | row["target_token"] # type: ignore | ||||
| for row in self._execute_many_with_retries( | |||||
| query, [(hash_,) for hash_ in hashes] | |||||
| ) | |||||
| ) | ) | ||||
| ########################## | ########################## | ||||
| # 'skipped_content' table | # 'skipped_content' table | ||||
| ########################## | ########################## | ||||
| def _skipped_content_add_finalize(self, statement: BoundStatement) -> None: | def _skipped_content_add_finalize(self, statement: BoundStatement) -> None: | ||||
| """Returned currified by skipped_content_add_prepare, to be called | """Returned currified by skipped_content_add_prepare, to be called | ||||
| ▲ Show 20 Lines • Show All 897 Lines • Show Last 20 Lines | |||||
Shouldn't this be turned into a generator for consistency? Or the other method kept eager? Is there a point where we do _execute_many_with_retries but don't want to keep all results?