Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/cql.py
Show First 20 Lines • Show All 277 Lines • ▼ Show 20 Lines | class CqlRunner: | ||||
@retry( | @retry( | ||||
wait=wait_random_exponential(multiplier=1, max=10), | wait=wait_random_exponential(multiplier=1, max=10), | ||||
stop=stop_after_attempt(MAX_RETRIES), | stop=stop_after_attempt(MAX_RETRIES), | ||||
retry=retry_if_exception_type(CoordinationFailure), | retry=retry_if_exception_type(CoordinationFailure), | ||||
) | ) | ||||
def _execute_with_retries(self, statement, args) -> ResultSet: | def _execute_with_retries(self, statement, args) -> ResultSet: | ||||
return self._session.execute(statement, args, timeout=1000.0) | return self._session.execute(statement, args, timeout=1000.0) | ||||
@_prepared_statement( | |||||
"UPDATE object_count SET count = count + ? " | |||||
"WHERE partition_key = 0 AND object_type = ?" | |||||
) | |||||
def _increment_counter( | |||||
self, object_type: str, nb: int, *, statement: PreparedStatement | |||||
) -> None: | |||||
self._execute_with_retries(statement, [nb, object_type]) | |||||
def _add_one(self, statement, obj: BaseRow) -> None: | def _add_one(self, statement, obj: BaseRow) -> None: | ||||
self._increment_counter(obj.TABLE, 1) | |||||
self._execute_with_retries(statement, dataclasses.astuple(obj)) | self._execute_with_retries(statement, dataclasses.astuple(obj)) | ||||
_T = TypeVar("_T", bound=BaseRow) | _T = TypeVar("_T", bound=BaseRow) | ||||
def _get_random_row(self, row_class: Type[_T], statement) -> Optional[_T]: # noqa | def _get_random_row(self, row_class: Type[_T], statement) -> Optional[_T]: # noqa | ||||
"""Takes a prepared statement of the form | """Takes a prepared statement of the form | ||||
"SELECT * FROM <table> WHERE token(<keys>) > ? LIMIT 1" | "SELECT * FROM <table> WHERE token(<keys>) > ? LIMIT 1" | ||||
and uses it to return a random row""" | and uses it to return a random row""" | ||||
Show All 18 Lines | class CqlRunner: | ||||
########################## | ########################## | ||||
# 'content' table | # 'content' table | ||||
########################## | ########################## | ||||
def _content_add_finalize(self, statement: BoundStatement) -> None: | def _content_add_finalize(self, statement: BoundStatement) -> None: | ||||
"""Returned currified by content_add_prepare, to be called when the | """Returned currified by content_add_prepare, to be called when the | ||||
content row should be added to the primary table.""" | content row should be added to the primary table.""" | ||||
self._execute_with_retries(statement, None) | self._execute_with_retries(statement, None) | ||||
self._increment_counter("content", 1) | |||||
@_prepared_insert_statement(ContentRow) | @_prepared_insert_statement(ContentRow) | ||||
def content_add_prepare( | def content_add_prepare( | ||||
self, content: ContentRow, *, statement | self, content: ContentRow, *, statement | ||||
) -> Tuple[int, Callable[[], None]]: | ) -> Tuple[int, Callable[[], None]]: | ||||
"""Prepares insertion of a Content to the main 'content' table. | """Prepares insertion of a Content to the main 'content' table. | ||||
Returns a token (to be used in secondary tables), and a function to be | Returns a token (to be used in secondary tables), and a function to be | ||||
called to perform the insertion in the main table.""" | called to perform the insertion in the main table.""" | ||||
▲ Show 20 Lines • Show All 137 Lines • ▼ Show 20 Lines | class CqlRunner: | ||||
########################## | ########################## | ||||
# 'skipped_content' table | # 'skipped_content' table | ||||
########################## | ########################## | ||||
def _skipped_content_add_finalize(self, statement: BoundStatement) -> None: | def _skipped_content_add_finalize(self, statement: BoundStatement) -> None: | ||||
"""Returned currified by skipped_content_add_prepare, to be called | """Returned currified by skipped_content_add_prepare, to be called | ||||
when the content row should be added to the primary table.""" | when the content row should be added to the primary table.""" | ||||
self._execute_with_retries(statement, None) | self._execute_with_retries(statement, None) | ||||
self._increment_counter("skipped_content", 1) | |||||
@_prepared_insert_statement(SkippedContentRow) | @_prepared_insert_statement(SkippedContentRow) | ||||
def skipped_content_add_prepare( | def skipped_content_add_prepare( | ||||
self, content, *, statement | self, content, *, statement | ||||
) -> Tuple[int, Callable[[], None]]: | ) -> Tuple[int, Callable[[], None]]: | ||||
"""Prepares insertion of a Content to the main 'skipped_content' table. | """Prepares insertion of a Content to the main 'skipped_content' table. | ||||
Returns a token (to be used in secondary tables), and a function to be | Returns a token (to be used in secondary tables), and a function to be | ||||
called to perform the insertion in the main table.""" | called to perform the insertion in the main table.""" | ||||
▲ Show 20 Lines • Show All 720 Lines • ▼ Show 20 Lines | class CqlRunner: | ||||
########################## | ########################## | ||||
# 'extid' table | # 'extid' table | ||||
########################## | ########################## | ||||
def _extid_add_finalize(self, statement: BoundStatement) -> None: | def _extid_add_finalize(self, statement: BoundStatement) -> None: | ||||
"""Returned currified by extid_add_prepare, to be called when the | """Returned currified by extid_add_prepare, to be called when the | ||||
extid row should be added to the primary table.""" | extid row should be added to the primary table.""" | ||||
self._execute_with_retries(statement, None) | self._execute_with_retries(statement, None) | ||||
self._increment_counter("extid", 1) | |||||
@_prepared_insert_statement(ExtIDRow) | @_prepared_insert_statement(ExtIDRow) | ||||
def extid_add_prepare( | def extid_add_prepare( | ||||
self, extid: ExtIDRow, *, statement | self, extid: ExtIDRow, *, statement | ||||
) -> Tuple[int, Callable[[], None]]: | ) -> Tuple[int, Callable[[], None]]: | ||||
statement = statement.bind(dataclasses.astuple(extid)) | statement = statement.bind(dataclasses.astuple(extid)) | ||||
token_class = self._cluster.metadata.token_map.token_class | token_class = self._cluster.metadata.token_map.token_class | ||||
token = token_class.from_key(statement.routing_key).value | token = token_class.from_key(statement.routing_key).value | ||||
▲ Show 20 Lines • Show All 92 Lines • ▼ Show 20 Lines | ) -> Iterable[int]: | ||||
row["target_token"] | row["target_token"] | ||||
for row in self._execute_with_retries(statement, [target_type, target]) | for row in self._execute_with_retries(statement, [target_type, target]) | ||||
) | ) | ||||
########################## | ########################## | ||||
# Miscellaneous | # Miscellaneous | ||||
########################## | ########################## | ||||
def stat_counters(self) -> Iterable[ObjectCountRow]: | |||||
raise NotImplementedError( | |||||
"stat_counters is not implemented by the Cassandra backend" | |||||
) | |||||
@_prepared_statement("SELECT uuid() FROM revision LIMIT 1;") | @_prepared_statement("SELECT uuid() FROM revision LIMIT 1;") | ||||
def check_read(self, *, statement): | def check_read(self, *, statement): | ||||
self._execute_with_retries(statement, []) | self._execute_with_retries(statement, []) | ||||
@_prepared_select_statement(ObjectCountRow, "WHERE partition_key=0") | |||||
def stat_counters(self, *, statement) -> Iterable[ObjectCountRow]: | |||||
return map(ObjectCountRow.from_dict, self._execute_with_retries(statement, [])) |