Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/cql.py
Show All 13 Lines | from typing import ( | ||||
Dict, | Dict, | ||||
Iterable, | Iterable, | ||||
Iterator, | Iterator, | ||||
List, | List, | ||||
Optional, | Optional, | ||||
Tuple, | Tuple, | ||||
Type, | Type, | ||||
TypeVar, | TypeVar, | ||||
Union, | |||||
) | ) | ||||
from cassandra import CoordinationFailure | from cassandra import CoordinationFailure | ||||
from cassandra.cluster import Cluster, EXEC_PROFILE_DEFAULT, ExecutionProfile, ResultSet | from cassandra.cluster import Cluster, EXEC_PROFILE_DEFAULT, ExecutionProfile, ResultSet | ||||
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy | from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy | ||||
from cassandra.query import PreparedStatement, BoundStatement, dict_factory | from cassandra.query import PreparedStatement, BoundStatement, dict_factory | ||||
from tenacity import ( | from tenacity import ( | ||||
retry, | retry, | ||||
▲ Show 20 Lines • Show All 138 Lines • ▼ Show 20 Lines | ) -> Callable[[Callable[..., TRet]], Callable[..., TRet]]: | ||||
if cols is None: | if cols is None: | ||||
cols = row_class.cols() | cols = row_class.cols() | ||||
return _prepared_statement( | return _prepared_statement( | ||||
f"SELECT {', '.join(cols)} FROM {row_class.TABLE} {clauses}" | f"SELECT {', '.join(cols)} FROM {row_class.TABLE} {clauses}" | ||||
) | ) | ||||
def _prepared_select_statements( | |||||
row_class: Type[BaseRow], queries: Dict[Any, str], | |||||
) -> Callable[[Callable[..., TRet]], Callable[..., TRet]]: | |||||
"""Like _prepared_statement, but supports multiple statements, passed a dict, | |||||
and passes a dict of prepared statements to the decorated method""" | |||||
cols = row_class.cols() | |||||
statement_start = f"SELECT {', '.join(cols)} FROM {row_class.TABLE} " | |||||
def decorator(f): | |||||
@functools.wraps(f) | |||||
def newf(self, *args, **kwargs) -> TRet: | |||||
if f.__name__ not in self._prepared_statements: | |||||
self._prepared_statements[f.__name__] = { | |||||
key: self._session.prepare(statement_start + query) | |||||
for (key, query) in queries.items() | |||||
} | |||||
return f( | |||||
self, *args, **kwargs, statements=self._prepared_statements[f.__name__] | |||||
) | |||||
return newf | |||||
return decorator | |||||
class CqlRunner: | class CqlRunner: | ||||
"""Class managing prepared statements and building queries to be sent | """Class managing prepared statements and building queries to be sent | ||||
to Cassandra.""" | to Cassandra.""" | ||||
def __init__(self, hosts: List[str], keyspace: str, port: int): | def __init__(self, hosts: List[str], keyspace: str, port: int): | ||||
self._cluster = Cluster( | self._cluster = Cluster( | ||||
hosts, port=port, execution_profiles=_execution_profiles | hosts, port=port, execution_profiles=_execution_profiles | ||||
) | ) | ||||
self._session = self._cluster.connect(keyspace) | self._session = self._cluster.connect(keyspace) | ||||
self._cluster.register_user_type( | self._cluster.register_user_type( | ||||
keyspace, "microtimestamp_with_timezone", TimestampWithTimezone | keyspace, "microtimestamp_with_timezone", TimestampWithTimezone | ||||
) | ) | ||||
self._cluster.register_user_type(keyspace, "microtimestamp", Timestamp) | self._cluster.register_user_type(keyspace, "microtimestamp", Timestamp) | ||||
self._cluster.register_user_type(keyspace, "person", Person) | self._cluster.register_user_type(keyspace, "person", Person) | ||||
self._prepared_statements: Dict[str, PreparedStatement] = {} | # directly a PreparedStatement for methods decorated with | ||||
# @_prepared_statements (and its wrappers, _prepared_insert_statement, | |||||
# _prepared_exists_statement, and _prepared_select_statement); | |||||
# and a dict of PreparedStatements with @_prepared_select_statements | |||||
self._prepared_statements: Dict[ | |||||
str, Union[PreparedStatement, Dict[Any, PreparedStatement]] | |||||
] = {} | |||||
########################## | ########################## | ||||
# Common utility functions | # Common utility functions | ||||
########################## | ########################## | ||||
MAX_RETRIES = 3 | MAX_RETRIES = 3 | ||||
@retry( | @retry( | ||||
▲ Show 20 Lines • Show All 453 Lines • ▼ Show 20 Lines | def origin_generate_unique_visit_id(self, origin_url: str, *, statement) -> int: | ||||
# TODO: abort after too many attempts | # TODO: abort after too many attempts | ||||
return next_id | return next_id | ||||
########################## | ########################## | ||||
# 'origin_visit' table | # 'origin_visit' table | ||||
########################## | ########################## | ||||
@_prepared_select_statement( | @_prepared_select_statements( | ||||
OriginVisitRow, "WHERE origin = ? AND visit > ? ORDER BY visit ASC LIMIT ?" | OriginVisitRow, | ||||
) | { | ||||
def _origin_visit_get_pagination_asc( | (True, ListOrder.ASC): ( | ||||
self, origin_url: str, last_visit: int, limit: int, *, statement | "WHERE origin = ? AND visit > ? ORDER BY visit ASC LIMIT ?" | ||||
) -> ResultSet: | ), | ||||
return self._execute_with_retries(statement, [origin_url, last_visit, limit]) | (True, ListOrder.DESC): ( | ||||
"WHERE origin = ? AND visit < ? ORDER BY visit DESC LIMIT ?" | |||||
@_prepared_select_statement( | ), | ||||
OriginVisitRow, "WHERE origin = ? AND visit < ? ORDER BY visit DESC LIMIT ?" | (False, ListOrder.ASC): "WHERE origin = ? ORDER BY visit ASC LIMIT ?", | ||||
) | (False, ListOrder.DESC): "WHERE origin = ? ORDER BY visit DESC LIMIT ?", | ||||
def _origin_visit_get_pagination_desc( | }, | ||||
self, origin_url: str, last_visit: int, limit: int, *, statement | |||||
) -> ResultSet: | |||||
return self._execute_with_retries(statement, [origin_url, last_visit, limit]) | |||||
@_prepared_select_statement( | |||||
OriginVisitRow, "WHERE origin = ? ORDER BY visit ASC LIMIT ?" | |||||
) | |||||
def _origin_visit_get_no_pagination_asc( | |||||
self, origin_url: str, limit: int, *, statement | |||||
) -> ResultSet: | |||||
return self._execute_with_retries(statement, [origin_url, limit]) | |||||
@_prepared_select_statement( | |||||
OriginVisitRow, "WHERE origin = ? ORDER BY visit DESC LIMIT ?" | |||||
) | ) | ||||
def _origin_visit_get_no_pagination_desc( | |||||
self, origin_url: str, limit: int, *, statement | |||||
) -> ResultSet: | |||||
return self._execute_with_retries(statement, [origin_url, limit]) | |||||
def origin_visit_get( | def origin_visit_get( | ||||
self, origin_url: str, last_visit: Optional[int], limit: int, order: ListOrder, | self, | ||||
origin_url: str, | |||||
last_visit: Optional[int], | |||||
limit: int, | |||||
order: ListOrder, | |||||
*, | |||||
statements, | |||||
) -> Iterable[OriginVisitRow]: | ) -> Iterable[OriginVisitRow]: | ||||
args: List[Any] = [origin_url] | args: List[Any] = [origin_url] | ||||
if last_visit is not None: | if last_visit is not None: | ||||
page_name = "pagination" | |||||
args.append(last_visit) | args.append(last_visit) | ||||
else: | |||||
page_name = "no_pagination" | |||||
args.append(limit) | args.append(limit) | ||||
method_name = f"_origin_visit_get_{page_name}_{order.value}" | statement = statements[(last_visit is not None, order)] | ||||
origin_visit_get_method = getattr(self, method_name) | return map( | ||||
return map(OriginVisitRow.from_dict, origin_visit_get_method(*args)) | OriginVisitRow.from_dict, self._execute_with_retries(statement, args) | ||||
) | |||||
@_prepared_insert_statement(OriginVisitRow) | @_prepared_insert_statement(OriginVisitRow) | ||||
def origin_visit_add_one(self, visit: OriginVisitRow, *, statement) -> None: | def origin_visit_add_one(self, visit: OriginVisitRow, *, statement) -> None: | ||||
self._add_one(statement, visit) | self._add_one(statement, visit) | ||||
@_prepared_select_statement(OriginVisitRow, "WHERE origin = ? AND visit = ?") | @_prepared_select_statement(OriginVisitRow, "WHERE origin = ? AND visit = ?") | ||||
def origin_visit_get_one( | def origin_visit_get_one( | ||||
self, origin_url: str, visit_id: int, *, statement | self, origin_url: str, visit_id: int, *, statement | ||||
Show All 35 Lines | def origin_visit_iter(self, start_token: int) -> Iterator[OriginVisitRow]: | ||||
and wraps around the token space.""" | and wraps around the token space.""" | ||||
yield from self._origin_visit_iter_from(start_token) | yield from self._origin_visit_iter_from(start_token) | ||||
yield from self._origin_visit_iter_to(start_token) | yield from self._origin_visit_iter_to(start_token) | ||||
########################## | ########################## | ||||
# 'origin_visit_status' table | # 'origin_visit_status' table | ||||
########################## | ########################## | ||||
@_prepared_select_statement( | @_prepared_select_statements( | ||||
OriginVisitStatusRow, | |||||
"WHERE origin = ? AND visit = ? AND date >= ? ORDER BY date ASC LIMIT ?", | |||||
) | |||||
def _origin_visit_status_get_with_date_asc_limit( | |||||
self, | |||||
origin: str, | |||||
visit: int, | |||||
date_from: datetime.datetime, | |||||
limit: int, | |||||
*, | |||||
statement, | |||||
) -> ResultSet: | |||||
return self._execute_with_retries(statement, [origin, visit, date_from, limit]) | |||||
@_prepared_select_statement( | |||||
OriginVisitStatusRow, | OriginVisitStatusRow, | ||||
"WHERE origin = ? AND visit = ? AND date <= ? ORDER BY visit DESC LIMIT ?", | { | ||||
) | (True, ListOrder.ASC): ( | ||||
def _origin_visit_status_get_with_date_desc_limit( | "WHERE origin = ? AND visit = ? AND date >= ? " | ||||
self, | "ORDER BY visit ASC LIMIT ?" | ||||
origin: str, | ), | ||||
visit: int, | (True, ListOrder.DESC): ( | ||||
date_from: datetime.datetime, | "WHERE origin = ? AND visit = ? AND date <= ? " | ||||
limit: int, | "ORDER BY visit DESC LIMIT ?" | ||||
*, | ), | ||||
statement, | (False, ListOrder.ASC): ( | ||||
) -> ResultSet: | "WHERE origin = ? AND visit = ? ORDER BY visit ASC LIMIT ?" | ||||
return self._execute_with_retries(statement, [origin, visit, date_from, limit]) | ), | ||||
(False, ListOrder.DESC): ( | |||||
@_prepared_select_statement( | "WHERE origin = ? AND visit = ? ORDER BY visit DESC LIMIT ?" | ||||
OriginVisitStatusRow, | ), | ||||
"WHERE origin = ? AND visit = ? ORDER BY visit ASC LIMIT ?", | }, | ||||
) | |||||
def _origin_visit_status_get_with_no_date_asc_limit( | |||||
self, origin: str, visit: int, limit: int, *, statement | |||||
) -> ResultSet: | |||||
return self._execute_with_retries(statement, [origin, visit, limit]) | |||||
@_prepared_select_statement( | |||||
OriginVisitStatusRow, | |||||
"WHERE origin = ? AND visit = ? ORDER BY visit DESC LIMIT ?", | |||||
) | ) | ||||
def _origin_visit_status_get_with_no_date_desc_limit( | |||||
self, origin: str, visit: int, limit: int, *, statement | |||||
) -> ResultSet: | |||||
return self._execute_with_retries(statement, [origin, visit, limit]) | |||||
def origin_visit_status_get_range( | def origin_visit_status_get_range( | ||||
self, | self, | ||||
origin: str, | origin: str, | ||||
visit: int, | visit: int, | ||||
date_from: Optional[datetime.datetime], | date_from: Optional[datetime.datetime], | ||||
limit: int, | limit: int, | ||||
order: ListOrder, | order: ListOrder, | ||||
*, | |||||
statements, | |||||
) -> Iterable[OriginVisitStatusRow]: | ) -> Iterable[OriginVisitStatusRow]: | ||||
args: List[Any] = [origin, visit] | args: List[Any] = [origin, visit] | ||||
if date_from is not None: | if date_from is not None: | ||||
date_name = "date" | |||||
args.append(date_from) | args.append(date_from) | ||||
else: | |||||
date_name = "no_date" | |||||
args.append(limit) | args.append(limit) | ||||
method_name = f"_origin_visit_status_get_with_{date_name}_{order.value}_limit" | statement = statements[(date_from is not None, order)] | ||||
origin_visit_status_get_method = getattr(self, method_name) | |||||
return map( | return map( | ||||
OriginVisitStatusRow.from_dict, origin_visit_status_get_method(*args) | OriginVisitStatusRow.from_dict, self._execute_with_retries(statement, args) | ||||
) | ) | ||||
@_prepared_insert_statement(OriginVisitStatusRow) | @_prepared_insert_statement(OriginVisitStatusRow) | ||||
def origin_visit_status_add_one( | def origin_visit_status_add_one( | ||||
self, visit_update: OriginVisitStatusRow, *, statement | self, visit_update: OriginVisitStatusRow, *, statement | ||||
) -> None: | ) -> None: | ||||
self._add_one(statement, visit_update) | self._add_one(statement, visit_update) | ||||
▲ Show 20 Lines • Show All 139 Lines • Show Last 20 Lines |