Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/cql.py
Show All 28 Lines | |||||
from mypy_extensions import NamedArg | from mypy_extensions import NamedArg | ||||
from tenacity import ( | from tenacity import ( | ||||
retry, | retry, | ||||
retry_if_exception_type, | retry_if_exception_type, | ||||
stop_after_attempt, | stop_after_attempt, | ||||
wait_random_exponential, | wait_random_exponential, | ||||
) | ) | ||||
from swh.model.identifiers import CoreSWHID | |||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Content, | Content, | ||||
Person, | Person, | ||||
Sha1Git, | Sha1Git, | ||||
SkippedContent, | SkippedContent, | ||||
Timestamp, | Timestamp, | ||||
TimestampWithTimezone, | TimestampWithTimezone, | ||||
) | ) | ||||
from swh.storage.interface import ListOrder | from swh.storage.interface import ListOrder | ||||
from ..utils import remove_keys | from ..utils import remove_keys | ||||
from .common import TOKEN_BEGIN, TOKEN_END, hash_url | from .common import TOKEN_BEGIN, TOKEN_END, hash_url | ||||
from .model import ( | from .model import ( | ||||
MAGIC_NULL_PK, | MAGIC_NULL_PK, | ||||
BaseRow, | BaseRow, | ||||
ContentRow, | ContentRow, | ||||
DirectoryEntryRow, | DirectoryEntryRow, | ||||
DirectoryRow, | DirectoryRow, | ||||
ExtIDRow, | |||||
MetadataAuthorityRow, | MetadataAuthorityRow, | ||||
MetadataFetcherRow, | MetadataFetcherRow, | ||||
ObjectCountRow, | ObjectCountRow, | ||||
OriginRow, | OriginRow, | ||||
OriginVisitRow, | OriginVisitRow, | ||||
OriginVisitStatusRow, | OriginVisitStatusRow, | ||||
RawExtrinsicMetadataRow, | RawExtrinsicMetadataRow, | ||||
ReleaseRow, | ReleaseRow, | ||||
▲ Show 20 Lines • Show All 897 Lines • ▼ Show 20 Lines | ) -> Iterable[RawExtrinsicMetadataRow]: | ||||
return map( | return map( | ||||
RawExtrinsicMetadataRow.from_dict, | RawExtrinsicMetadataRow.from_dict, | ||||
self._execute_with_retries( | self._execute_with_retries( | ||||
statement, [target, authority_url, authority_type] | statement, [target, authority_url, authority_type] | ||||
), | ), | ||||
) | ) | ||||
########################## | ########################## | ||||
# 'extid' table | |||||
########################## | |||||
def _extid_add_finalize(self, statement: BoundStatement) -> None: | |||||
"""Returned currified by extid_add_prepare, to be called when the | |||||
extid row should be added to the primary table.""" | |||||
self._execute_with_retries(statement, None) | |||||
self._increment_counter("extid", 1) | |||||
@_prepared_insert_statement(ExtIDRow) | |||||
def extid_add_prepare( | |||||
self, extid: ExtIDRow, *, statement | |||||
) -> Tuple[int, Callable[[], None]]: | |||||
statement = statement.bind(dataclasses.astuple(extid)) | |||||
token_class = self._cluster.metadata.token_map.token_class | |||||
token = token_class.from_key(statement.routing_key).value | |||||
assert TOKEN_BEGIN <= token <= TOKEN_END | |||||
# Function to be called after the indexes contain their respective | |||||
# row | |||||
finalizer = functools.partial(self._extid_add_finalize, statement) | |||||
return (token, finalizer) | |||||
@_prepared_select_statement( | |||||
ExtIDRow, "WHERE extid_type=? AND extid=? AND target_type=? AND target=?", | |||||
) | |||||
def extid_get_from_pk( | |||||
self, extid_type: str, extid: bytes, target: CoreSWHID, *, statement, | |||||
) -> Optional[ExtIDRow]: | |||||
rows = list( | |||||
self._execute_with_retries( | |||||
statement, | |||||
[extid_type, extid, target.object_type.value, target.object_id], | |||||
), | |||||
) | |||||
assert len(rows) <= 1 | |||||
if rows: | |||||
return ExtIDRow(**rows[0]) | |||||
else: | |||||
return None | |||||
@_prepared_select_statement( | |||||
ExtIDRow, "WHERE token(extid_type, extid) = ?", | |||||
) | |||||
def extid_get_from_token(self, token: int, *, statement) -> Iterable[ExtIDRow]: | |||||
return map(ExtIDRow.from_dict, self._execute_with_retries(statement, [token]),) | |||||
@_prepared_select_statement( | |||||
ExtIDRow, "WHERE extid_type=? AND extid=?", | |||||
) | |||||
def extid_get_from_extid( | |||||
self, extid_type: str, extid: bytes, *, statement | |||||
) -> Iterable[ExtIDRow]: | |||||
return map( | |||||
ExtIDRow.from_dict, | |||||
self._execute_with_retries(statement, [extid_type, extid]), | |||||
) | |||||
def extid_get_from_target( | |||||
self, target_type: str, target: bytes | |||||
) -> Iterable[ExtIDRow]: | |||||
for token in self._extid_get_tokens_from_target(target_type, target): | |||||
if token is not None: | |||||
for extid in self.extid_get_from_token(token): | |||||
# re-check the extid against target (in case of murmur3 collision) | |||||
if ( | |||||
extid is not None | |||||
and extid.target_type == target_type | |||||
and extid.target == target | |||||
vlorentz: Please add a short comment explaining why this is needed | |||||
): | |||||
yield extid | |||||
########################## | |||||
# 'extid_by_*' tables | |||||
########################## | |||||
def extid_index_add_one(self, extid: ExtIDRow, token: int) -> None: | |||||
"""Adds a row mapping extid[target_type, target] to the token of the ExtID in | |||||
the main 'extid' table.""" | |||||
query = ( | |||||
"INSERT INTO extid_by_target (target_type, target, target_token) " | |||||
"VALUES (%s, %s, %s)" | |||||
) | |||||
self._execute_with_retries(query, [extid.target_type, extid.target, token]) | |||||
def _extid_get_tokens_from_target( | |||||
self, target_type: str, target: bytes | |||||
) -> Iterable[int]: | |||||
query = ( | |||||
"SELECT target_token " | |||||
"FROM extid_by_target " | |||||
"WHERE target_type = %s AND target = %s" | |||||
) | |||||
return ( | |||||
row["target_token"] | |||||
for row in self._execute_with_retries(query, [target_type, target]) | |||||
) | |||||
########################## | |||||
# Miscellaneous | # Miscellaneous | ||||
########################## | ########################## | ||||
@_prepared_statement("SELECT uuid() FROM revision LIMIT 1;") | @_prepared_statement("SELECT uuid() FROM revision LIMIT 1;") | ||||
def check_read(self, *, statement): | def check_read(self, *, statement): | ||||
self._execute_with_retries(statement, []) | self._execute_with_retries(statement, []) | ||||
@_prepared_select_statement(ObjectCountRow, "WHERE partition_key=0") | @_prepared_select_statement(ObjectCountRow, "WHERE partition_key=0") | ||||
def stat_counters(self, *, statement) -> Iterable[ObjectCountRow]: | def stat_counters(self, *, statement) -> Iterable[ObjectCountRow]: | ||||
return map(ObjectCountRow.from_dict, self._execute_with_retries(statement, [])) | return map(ObjectCountRow.from_dict, self._execute_with_retries(statement, [])) |
Please add a short comment explaining why this is needed