Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/cql.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import functools | import functools | ||||
import json | import json | ||||
import logging | import logging | ||||
import random | import random | ||||
from typing import ( | from typing import ( | ||||
Any, Callable, Dict, Generator, Iterable, List, Optional, TypeVar | Any, Callable, Dict, Generator, Iterable, List, Optional, Tuple, TypeVar | ||||
) | ) | ||||
from cassandra import CoordinationFailure | from cassandra import CoordinationFailure | ||||
from cassandra.cluster import ( | from cassandra.cluster import ( | ||||
Cluster, EXEC_PROFILE_DEFAULT, ExecutionProfile, ResultSet) | Cluster, EXEC_PROFILE_DEFAULT, ExecutionProfile, ResultSet) | ||||
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy | from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy | ||||
from cassandra.query import PreparedStatement | from cassandra.query import PreparedStatement, BoundStatement | ||||
from tenacity import ( | from tenacity import ( | ||||
retry, stop_after_attempt, wait_random_exponential, | retry, stop_after_attempt, wait_random_exponential, | ||||
retry_if_exception_type, | retry_if_exception_type, | ||||
) | ) | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Sha1Git, TimestampWithTimezone, Timestamp, Person, Content, | Sha1Git, TimestampWithTimezone, Timestamp, Person, Content, | ||||
SkippedContent, OriginVisit, Origin | SkippedContent, OriginVisit, Origin | ||||
▲ Show 20 Lines • Show All 141 Lines • ▼ Show 20 Lines | class CqlRunner: | ||||
# 'content' table | # 'content' table | ||||
########################## | ########################## | ||||
_content_pk = ['sha1', 'sha1_git', 'sha256', 'blake2s256'] | _content_pk = ['sha1', 'sha1_git', 'sha256', 'blake2s256'] | ||||
_content_keys = [ | _content_keys = [ | ||||
'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', | 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', | ||||
'ctime', 'status'] | 'ctime', 'status'] | ||||
def _content_add_finalize(self, statement: BoundStatement) -> None: | |||||
"""Returned currified by content_add_prepare, to be called when the | |||||
content row should be added to the primary table.""" | |||||
self._execute_with_retries(statement, None) | |||||
self._increment_counter('content', 1) | |||||
@_prepared_insert_statement('content', _content_keys) | @_prepared_insert_statement('content', _content_keys) | ||||
def content_add_one(self, content, *, statement) -> None: | def content_add_prepare( | ||||
self._add_one(statement, 'content', content, self._content_keys) | self, content, *, statement) -> Tuple[int, Callable[[], None]]: | ||||
"""Prepares insertion of a Content to the main 'content' table. | |||||
Returns a token (to be used in secondary tables), and a function to be | |||||
called to perform the insertion in the main table.""" | |||||
statement = statement.bind([ | |||||
getattr(content, key) for key in self._content_keys]) | |||||
# Type used for hashing keys (usually, it will be | |||||
# cassandra.metadata.Murmur3Token) | |||||
token_class = self._cluster.metadata.token_map.token_class | |||||
# Token of the row when it will be inserted. This is equivalent to | |||||
# "SELECT token({', '.join(self._content_pk)}) FROM content WHERE ..." | |||||
# after the row is inserted; but we need the token to insert in the | |||||
# index tables *before* inserting to the main 'content' table | |||||
token = token_class.from_key(statement.routing_key).value | |||||
assert TOKEN_BEGIN <= token <= TOKEN_END | |||||
# Function to be called after the indexes contain their respective | |||||
# row | |||||
finalizer = functools.partial(self._content_add_finalize, statement) | |||||
return (token, finalizer) | |||||
@_prepared_statement('SELECT * FROM content WHERE ' + | @_prepared_statement('SELECT * FROM content WHERE ' + | ||||
' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))) | ' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))) | ||||
def content_get_from_pk( | def content_get_from_pk( | ||||
self, content_hashes: Dict[str, bytes], *, statement | self, content_hashes: Dict[str, bytes], *, statement | ||||
) -> Optional[Row]: | ) -> Optional[Row]: | ||||
rows = list(self._execute_with_retries( | rows = list(self._execute_with_retries( | ||||
statement, [content_hashes[algo] for algo in HASH_ALGORITHMS])) | statement, [content_hashes[algo] for algo in HASH_ALGORITHMS])) | ||||
assert len(rows) <= 1 | assert len(rows) <= 1 | ||||
if rows: | if rows: | ||||
return rows[0] | return rows[0] | ||||
else: | else: | ||||
return None | return None | ||||
@_prepared_statement('SELECT * FROM content WHERE token(' | |||||
+ ', '.join(_content_pk) | |||||
+ ') = ?') | |||||
def content_get_from_token(self, token, *, statement) -> Iterable[Row]: | |||||
return self._execute_with_retries(statement, [token]) | |||||
@_prepared_statement('SELECT * FROM content WHERE token(%s) > ? LIMIT 1' | @_prepared_statement('SELECT * FROM content WHERE token(%s) > ? LIMIT 1' | ||||
% ', '.join(_content_pk)) | % ', '.join(_content_pk)) | ||||
def content_get_random(self, *, statement) -> Optional[Row]: | def content_get_random(self, *, statement) -> Optional[Row]: | ||||
return self._get_random_row(statement) | return self._get_random_row(statement) | ||||
@_prepared_statement(('SELECT token({0}) AS tok, {1} FROM content ' | @_prepared_statement(('SELECT token({0}) AS tok, {1} FROM content ' | ||||
'WHERE token({0}) >= ? AND token({0}) <= ? LIMIT ?') | 'WHERE token({0}) >= ? AND token({0}) <= ? LIMIT ?') | ||||
.format(', '.join(_content_pk), | .format(', '.join(_content_pk), | ||||
', '.join(_content_keys))) | ', '.join(_content_keys))) | ||||
def content_get_token_range( | def content_get_token_range( | ||||
self, start: int, end: int, limit: int, *, statement) -> Row: | self, start: int, end: int, limit: int, *, statement) -> Row: | ||||
return self._execute_with_retries(statement, [start, end, limit]) | return self._execute_with_retries(statement, [start, end, limit]) | ||||
########################## | ########################## | ||||
# 'content_by_*' tables | # 'content_by_*' tables | ||||
########################## | ########################## | ||||
@_prepared_statement('SELECT sha1_git FROM content_by_sha1_git ' | @_prepared_statement('SELECT sha1_git FROM content_by_sha1_git ' | ||||
'WHERE sha1_git IN ?') | 'WHERE sha1_git IN ?') | ||||
def content_missing_by_sha1_git( | def content_missing_by_sha1_git( | ||||
self, ids: List[bytes], *, statement) -> List[bytes]: | self, ids: List[bytes], *, statement) -> List[bytes]: | ||||
return self._missing(statement, ids) | return self._missing(statement, ids) | ||||
def content_index_add_one(self, main_algo: str, content: Content) -> None: | def content_index_add_one( | ||||
query = 'INSERT INTO content_by_{algo} ({cols}) VALUES ({values})' \ | self, algo: str, content: Content, token: int) -> None: | ||||
.format(algo=main_algo, cols=', '.join(self._content_pk), | """Adds a row mapping content[algo] to the token of the Content in | ||||
values=', '.join('%s' for _ in self._content_pk)) | the main 'content' table.""" | ||||
query = ( | |||||
f'INSERT INTO content_by_{algo} ({algo}, target_token) ' | |||||
f'VALUES (%s, %s)') | |||||
self._execute_with_retries( | self._execute_with_retries( | ||||
query, [content.get_hash(algo) for algo in self._content_pk]) | query, [content.get_hash(algo), token]) | ||||
def content_get_pks_from_single_hash( | def content_get_tokens_from_single_hash( | ||||
self, algo: str, hash_: bytes) -> List[Row]: | self, algo: str, hash_: bytes) -> Iterable[int]: | ||||
assert algo in HASH_ALGORITHMS | assert algo in HASH_ALGORITHMS | ||||
query = 'SELECT * FROM content_by_{algo} WHERE {algo} = %s'.format( | query = f'SELECT target_token FROM content_by_{algo} WHERE {algo} = %s' | ||||
algo=algo) | return (tok for (tok,) in self._execute_with_retries(query, [hash_])) | ||||
return list(self._execute_with_retries(query, [hash_])) | |||||
########################## | ########################## | ||||
# 'skipped_content' table | # 'skipped_content' table | ||||
########################## | ########################## | ||||
_skipped_content_pk = ['sha1', 'sha1_git', 'sha256', 'blake2s256'] | _skipped_content_pk = ['sha1', 'sha1_git', 'sha256', 'blake2s256'] | ||||
_skipped_content_keys = [ | _skipped_content_keys = [ | ||||
'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', | 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', | ||||
▲ Show 20 Lines • Show All 451 Lines • Show Last 20 Lines |