Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/cql.py
Show All 11 Lines | from typing import ( | ||||
Callable, | Callable, | ||||
Dict, | Dict, | ||||
Iterable, | Iterable, | ||||
Iterator, | Iterator, | ||||
List, | List, | ||||
Optional, | Optional, | ||||
Tuple, | Tuple, | ||||
TypeVar, | TypeVar, | ||||
Union, | |||||
) | ) | ||||
from cassandra import CoordinationFailure | from cassandra import CoordinationFailure | ||||
from cassandra.cluster import Cluster, EXEC_PROFILE_DEFAULT, ExecutionProfile, ResultSet | from cassandra.cluster import Cluster, EXEC_PROFILE_DEFAULT, ExecutionProfile, ResultSet | ||||
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy | from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy | ||||
from cassandra.query import PreparedStatement, BoundStatement | from cassandra.query import PreparedStatement, BoundStatement | ||||
from tenacity import ( | from tenacity import ( | ||||
retry, | retry, | ||||
Show All 11 Lines | from swh.model.model import ( | ||||
SkippedContent, | SkippedContent, | ||||
OriginVisit, | OriginVisit, | ||||
OriginVisitStatus, | OriginVisitStatus, | ||||
Origin, | Origin, | ||||
) | ) | ||||
from .common import Row, TOKEN_BEGIN, TOKEN_END, hash_url | from .common import Row, TOKEN_BEGIN, TOKEN_END, hash_url | ||||
from .schema import CREATE_TABLES_QUERIES, HASH_ALGORITHMS | from .schema import CREATE_TABLES_QUERIES, HASH_ALGORITHMS | ||||
from .. import extrinsic_metadata | |||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
_execution_profiles = { | _execution_profiles = { | ||||
EXEC_PROFILE_DEFAULT: ExecutionProfile( | EXEC_PROFILE_DEFAULT: ExecutionProfile( | ||||
load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy()) | load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy()) | ||||
▲ Show 20 Lines • Show All 732 Lines • ▼ Show 20 Lines | def metadata_fetcher_add(self, name, version, metadata, *, statement): | ||||
return self._execute_with_retries(statement, [name, version, metadata]) | return self._execute_with_retries(statement, [name, version, metadata]) | ||||
@_prepared_statement( | @_prepared_statement( | ||||
"SELECT * from metadata_fetcher WHERE name = ? AND version = ?" | "SELECT * from metadata_fetcher WHERE name = ? AND version = ?" | ||||
) | ) | ||||
def metadata_fetcher_get(self, name, version, *, statement) -> Optional[Row]: | def metadata_fetcher_get(self, name, version, *, statement) -> Optional[Row]: | ||||
return next(iter(self._execute_with_retries(statement, [name, version])), None) | return next(iter(self._execute_with_retries(statement, [name, version])), None) | ||||
########################## | ######################### | ||||
# 'origin_metadata' table | # 'object_metadata' table | ||||
########################## | ######################### | ||||
_origin_metadata_keys = [ | _object_metadata_keys = [ | ||||
"origin", | "id", | ||||
"authority_type", | "authority_type", | ||||
"authority_url", | "authority_url", | ||||
"discovery_date", | "discovery_date", | ||||
"fetcher_name", | "fetcher_name", | ||||
"fetcher_version", | "fetcher_version", | ||||
"format", | "format", | ||||
"metadata", | "metadata", | ||||
"origin", | |||||
"visit", | |||||
"snapshot", | |||||
"release", | |||||
"revision", | |||||
"path", | |||||
"directory", | |||||
] | ] | ||||
@_prepared_insert_statement("origin_metadata", _origin_metadata_keys) | @_prepared_statement( | ||||
def origin_metadata_add( | f"INSERT INTO object_metadata ({', '.join(_object_metadata_keys)}) " | ||||
f"VALUES ({', '.join('?' for _ in _object_metadata_keys)})" | |||||
) | |||||
def object_metadata_add( | |||||
self, | self, | ||||
origin, | object_type: str, | ||||
id: str, | |||||
authority_type, | authority_type, | ||||
authority_url, | authority_url, | ||||
discovery_date, | discovery_date, | ||||
fetcher_name, | fetcher_name, | ||||
fetcher_version, | fetcher_version, | ||||
format, | format, | ||||
metadata, | metadata, | ||||
context: Dict[str, Union[str, bytes, int]], | |||||
*, | *, | ||||
statement, | statement, | ||||
): | ): | ||||
return self._execute_with_retries( | params = [ | ||||
statement, | id, | ||||
[ | |||||
origin, | |||||
authority_type, | authority_type, | ||||
authority_url, | authority_url, | ||||
discovery_date, | discovery_date, | ||||
fetcher_name, | fetcher_name, | ||||
fetcher_version, | fetcher_version, | ||||
format, | format, | ||||
metadata, | metadata, | ||||
], | ] | ||||
params.extend( | |||||
context.get(key) for key in extrinsic_metadata.CONTEXT_KEYS[object_type] | |||||
) | ) | ||||
return self._execute_with_retries(statement, params,) | |||||
@_prepared_statement( | @_prepared_statement( | ||||
"SELECT * from origin_metadata " | "SELECT * from object_metadata " | ||||
"WHERE origin=? AND authority_url=? AND discovery_date>? " | "WHERE id=? AND authority_url=? AND discovery_date>? AND authority_type=?" | ||||
"AND authority_type=?" | |||||
) | ) | ||||
def origin_metadata_get_after_date( | def object_metadata_get_after_date( | ||||
self, origin, authority_type, authority_url, after, *, statement | self, id: str, authority_type, authority_url, after, *, statement | ||||
): | ): | ||||
return self._execute_with_retries( | return self._execute_with_retries( | ||||
statement, [origin, authority_url, after, authority_type] | statement, [id, authority_url, after, authority_type] | ||||
) | ) | ||||
@_prepared_statement( | @_prepared_statement( | ||||
"SELECT * from origin_metadata " | "SELECT * from object_metadata " | ||||
"WHERE origin=? AND authority_type=? AND authority_url=? " | "WHERE id=? AND authority_type=? AND authority_url=? " | ||||
"AND (discovery_date, fetcher_name, fetcher_version) > (?, ?, ?)" | "AND (discovery_date, fetcher_name, fetcher_version) > (?, ?, ?)" | ||||
) | ) | ||||
def origin_metadata_get_after_date_and_fetcher( | def object_metadata_get_after_date_and_fetcher( | ||||
self, | self, | ||||
origin, | id, | ||||
authority_type, | authority_type, | ||||
authority_url, | authority_url, | ||||
after_date, | after_date, | ||||
after_fetcher_name, | after_fetcher_name, | ||||
after_fetcher_version, | after_fetcher_version, | ||||
*, | *, | ||||
statement, | statement, | ||||
): | ): | ||||
return self._execute_with_retries( | return self._execute_with_retries( | ||||
statement, | statement, | ||||
[ | [ | ||||
origin, | id, | ||||
authority_type, | authority_type, | ||||
authority_url, | authority_url, | ||||
after_date, | after_date, | ||||
after_fetcher_name, | after_fetcher_name, | ||||
after_fetcher_version, | after_fetcher_version, | ||||
], | ], | ||||
) | ) | ||||
@_prepared_statement( | @_prepared_statement( | ||||
"SELECT * from origin_metadata " | "SELECT * from object_metadata " | ||||
"WHERE origin=? AND authority_url=? AND authority_type=?" | "WHERE id=? AND authority_url=? AND authority_type=?" | ||||
) | ) | ||||
def origin_metadata_get( | def object_metadata_get( | ||||
self, origin, authority_type, authority_url, *, statement | self, id, authority_type, authority_url, *, statement | ||||
) -> Iterable[Row]: | ) -> Iterable[Row]: | ||||
return self._execute_with_retries( | return self._execute_with_retries( | ||||
statement, [origin, authority_url, authority_type] | statement, [id, authority_url, authority_type] | ||||
) | ) | ||||
########################## | ########################## | ||||
# Miscellaneous | # Miscellaneous | ||||
########################## | ########################## | ||||
@_prepared_statement("SELECT uuid() FROM revision LIMIT 1;") | @_prepared_statement("SELECT uuid() FROM revision LIMIT 1;") | ||||
def check_read(self, *, statement): | def check_read(self, *, statement): | ||||
self._execute_with_retries(statement, []) | self._execute_with_retries(statement, []) | ||||
@_prepared_statement( | @_prepared_statement( | ||||
"SELECT object_type, count FROM object_count WHERE partition_key=0" | "SELECT object_type, count FROM object_count WHERE partition_key=0" | ||||
) | ) | ||||
def stat_counters(self, *, statement) -> ResultSet: | def stat_counters(self, *, statement) -> ResultSet: | ||||
return self._execute_with_retries(statement, []) | return self._execute_with_retries(statement, []) |