Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/cql.py
Show All 12 Lines | from typing import ( | ||||
Callable, | Callable, | ||||
Dict, | Dict, | ||||
Iterable, | Iterable, | ||||
Iterator, | Iterator, | ||||
List, | List, | ||||
Optional, | Optional, | ||||
Tuple, | Tuple, | ||||
TypeVar, | TypeVar, | ||||
Union, | |||||
) | ) | ||||
from cassandra import CoordinationFailure | from cassandra import CoordinationFailure | ||||
from cassandra.cluster import Cluster, EXEC_PROFILE_DEFAULT, ExecutionProfile, ResultSet | from cassandra.cluster import Cluster, EXEC_PROFILE_DEFAULT, ExecutionProfile, ResultSet | ||||
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy | from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy | ||||
from cassandra.query import PreparedStatement, BoundStatement | from cassandra.query import PreparedStatement, BoundStatement | ||||
from tenacity import ( | from tenacity import ( | ||||
retry, | retry, | ||||
Show All 11 Lines | from swh.model.model import ( | ||||
SkippedContent, | SkippedContent, | ||||
OriginVisit, | OriginVisit, | ||||
OriginVisitStatus, | OriginVisitStatus, | ||||
Origin, | Origin, | ||||
) | ) | ||||
from .common import Row, TOKEN_BEGIN, TOKEN_END, hash_url | from .common import Row, TOKEN_BEGIN, TOKEN_END, hash_url | ||||
from .schema import CREATE_TABLES_QUERIES, HASH_ALGORITHMS | from .schema import CREATE_TABLES_QUERIES, HASH_ALGORITHMS | ||||
from .. import extrinsic_metadata | |||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
_execution_profiles = { | _execution_profiles = { | ||||
EXEC_PROFILE_DEFAULT: ExecutionProfile( | EXEC_PROFILE_DEFAULT: ExecutionProfile( | ||||
load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy()) | load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy()) | ||||
▲ Show 20 Lines • Show All 836 Lines • ▼ Show 20 Lines | _object_metadata_keys = [ | ||||
"directory", | "directory", | ||||
] | ] | ||||
@_prepared_statement( | @_prepared_statement( | ||||
f"INSERT INTO object_metadata ({', '.join(_object_metadata_keys)}) " | f"INSERT INTO object_metadata ({', '.join(_object_metadata_keys)}) " | ||||
f"VALUES ({', '.join('?' for _ in _object_metadata_keys)})" | f"VALUES ({', '.join('?' for _ in _object_metadata_keys)})" | ||||
) | ) | ||||
def object_metadata_add( | def object_metadata_add( | ||||
self, | self, statement, **kwargs, | ||||
object_type: str, | |||||
id: str, | |||||
authority_type, | |||||
authority_url, | |||||
discovery_date, | |||||
fetcher_name, | |||||
fetcher_version, | |||||
format, | |||||
metadata, | |||||
context: Dict[str, Union[str, bytes, int]], | |||||
*, | |||||
statement, | |||||
): | ): | ||||
params = [ | assert set(kwargs) == set( | ||||
object_type, | self._object_metadata_keys | ||||
id, | ), f"Bad kwargs: {set(kwargs)}" | ||||
authority_type, | params = [kwargs[key] for key in self._object_metadata_keys] | ||||
ardumont: ```
, f"Expected {set(kwargs)}"
```
(or something) as it's less ambiguous to read. | |||||
authority_url, | |||||
discovery_date, | |||||
fetcher_name, | |||||
fetcher_version, | |||||
format, | |||||
metadata, | |||||
] | |||||
params.extend( | |||||
context.get(key) for key in extrinsic_metadata.CONTEXT_KEYS[object_type] | |||||
) | |||||
return self._execute_with_retries(statement, params,) | return self._execute_with_retries(statement, params,) | ||||
@_prepared_statement( | @_prepared_statement( | ||||
"SELECT * from object_metadata " | "SELECT * from object_metadata " | ||||
"WHERE id=? AND authority_url=? AND discovery_date>? AND authority_type=?" | "WHERE id=? AND authority_url=? AND discovery_date>? AND authority_type=?" | ||||
) | ) | ||||
def object_metadata_get_after_date( | def object_metadata_get_after_date( | ||||
self, | self, | ||||
▲ Show 20 Lines • Show All 63 Lines • Show Last 20 Lines |
(or something) as it's less ambiguous to read.