Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/cql.py
Show All 13 Lines | |||||
from cassandra.cluster import ( | from cassandra.cluster import ( | ||||
Cluster, EXEC_PROFILE_DEFAULT, ExecutionProfile, ResultSet) | Cluster, EXEC_PROFILE_DEFAULT, ExecutionProfile, ResultSet) | ||||
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy | from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy | ||||
from cassandra.query import PreparedStatement | from cassandra.query import PreparedStatement | ||||
from tenacity import retry, stop_after_attempt, wait_random_exponential | from tenacity import retry, stop_after_attempt, wait_random_exponential | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Sha1Git, TimestampWithTimezone, Timestamp, Person, Content, | Sha1Git, TimestampWithTimezone, Timestamp, Person, Content, | ||||
SkippedContent, | |||||
) | ) | ||||
from .common import Row, TOKEN_BEGIN, TOKEN_END, hash_url | from .common import Row, TOKEN_BEGIN, TOKEN_END, hash_url | ||||
from .schema import CREATE_TABLES_QUERIES, HASH_ALGORITHMS | from .schema import CREATE_TABLES_QUERIES, HASH_ALGORITHMS | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
▲ Show 20 Lines • Show All 132 Lines • ▼ Show 20 Lines | class CqlRunner: | ||||
# 'content' table | # 'content' table | ||||
########################## | ########################## | ||||
_content_pk = ['sha1', 'sha1_git', 'sha256', 'blake2s256'] | _content_pk = ['sha1', 'sha1_git', 'sha256', 'blake2s256'] | ||||
_content_keys = [ | _content_keys = [ | ||||
'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', | 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', | ||||
'ctime', 'status'] | 'ctime', 'status'] | ||||
@_prepared_insert_statement('content', _content_keys) | |||||
def content_add_one(self, content, *, statement) -> None: | |||||
self._add_one(statement, 'content', content, self._content_keys) | |||||
@_prepared_statement('SELECT * FROM content WHERE ' + | @_prepared_statement('SELECT * FROM content WHERE ' + | ||||
' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))) | ' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))) | ||||
def content_get_from_pk( | def content_get_from_pk( | ||||
self, content_hashes: Dict[str, bytes], *, statement | self, content_hashes: Dict[str, bytes], *, statement | ||||
) -> Optional[Row]: | ) -> Optional[Row]: | ||||
rows = list(self._execute_with_retries( | rows = list(self._execute_with_retries( | ||||
statement, [content_hashes[algo] for algo in HASH_ALGORITHMS])) | statement, [content_hashes[algo] for algo in HASH_ALGORITHMS])) | ||||
assert len(rows) <= 1 | assert len(rows) <= 1 | ||||
Show All 20 Lines | class CqlRunner: | ||||
########################## | ########################## | ||||
@_prepared_statement('SELECT sha1_git FROM content_by_sha1_git ' | @_prepared_statement('SELECT sha1_git FROM content_by_sha1_git ' | ||||
'WHERE sha1_git IN ?') | 'WHERE sha1_git IN ?') | ||||
def content_missing_by_sha1_git( | def content_missing_by_sha1_git( | ||||
self, ids: List[bytes], *, statement) -> List[bytes]: | self, ids: List[bytes], *, statement) -> List[bytes]: | ||||
return self._missing(statement, ids) | return self._missing(statement, ids) | ||||
@_prepared_insert_statement('content', _content_keys) | |||||
def content_add_one(self, content, *, statement) -> None: | |||||
self._add_one(statement, 'content', content, self._content_keys) | |||||
def content_index_add_one(self, main_algo: str, content: Content) -> None: | def content_index_add_one(self, main_algo: str, content: Content) -> None: | ||||
query = 'INSERT INTO content_by_{algo} ({cols}) VALUES ({values})' \ | query = 'INSERT INTO content_by_{algo} ({cols}) VALUES ({values})' \ | ||||
.format(algo=main_algo, cols=', '.join(self._content_pk), | .format(algo=main_algo, cols=', '.join(self._content_pk), | ||||
values=', '.join('%s' for _ in self._content_pk)) | values=', '.join('%s' for _ in self._content_pk)) | ||||
self._execute_with_retries( | self._execute_with_retries( | ||||
query, [content.get_hash(algo) for algo in self._content_pk]) | query, [content.get_hash(algo) for algo in self._content_pk]) | ||||
def content_get_pks_from_single_hash( | def content_get_pks_from_single_hash( | ||||
self, algo: str, hash_: bytes) -> List[Row]: | self, algo: str, hash_: bytes) -> List[Row]: | ||||
assert algo in HASH_ALGORITHMS | assert algo in HASH_ALGORITHMS | ||||
query = 'SELECT * FROM content_by_{algo} WHERE {algo} = %s'.format( | query = 'SELECT * FROM content_by_{algo} WHERE {algo} = %s'.format( | ||||
algo=algo) | algo=algo) | ||||
return list(self._execute_with_retries(query, [hash_])) | return list(self._execute_with_retries(query, [hash_])) | ||||
########################## | ########################## | ||||
# 'skipped_content' table | |||||
########################## | |||||
_skipped_content_pk = ['sha1', 'sha1_git', 'sha256', 'blake2s256'] | |||||
_skipped_content_keys = [ | |||||
'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', | |||||
'ctime', 'status', 'reason', 'origin'] | |||||
_magic_null_pk = b'' | |||||
""" | |||||
NULLs are not allowed in primary keys; instead use an empty | |||||
value | |||||
""" | |||||
@_prepared_insert_statement('skipped_content', _skipped_content_keys) | |||||
def skipped_content_add_one(self, content, *, statement) -> None: | |||||
content = content.to_dict() | |||||
for key in self._skipped_content_pk: | |||||
if content[key] is None: | |||||
content[key] = self._magic_null_pk | |||||
content = SkippedContent.from_dict(content) | |||||
self._add_one(statement, 'skipped_content', content, | |||||
self._skipped_content_keys) | |||||
@_prepared_statement('SELECT * FROM skipped_content WHERE ' + | |||||
' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))) | |||||
def skipped_content_get_from_pk( | |||||
self, content_hashes: Dict[str, bytes], *, statement | |||||
) -> Optional[Row]: | |||||
rows = list(self._execute_with_retries( | |||||
statement, [content_hashes[algo] or self._magic_null_pk | |||||
for algo in HASH_ALGORITHMS])) | |||||
assert len(rows) <= 1 | |||||
if rows: | |||||
# TODO: convert _magic_null_pk back to None? | |||||
return rows[0] | |||||
else: | |||||
return None | |||||
########################## | |||||
# 'skipped_content_by_*' tables | |||||
########################## | |||||
def skipped_content_index_add_one( | |||||
self, main_algo: str, content: Content) -> None: | |||||
assert content.get_hash(main_algo) is not None | |||||
query = ('INSERT INTO skipped_content_by_{algo} ({cols}) ' | |||||
'VALUES ({values})').format( | |||||
algo=main_algo, cols=', '.join(self._content_pk), | |||||
values=', '.join('%s' for _ in self._content_pk)) | |||||
self._execute_with_retries( | |||||
query, [content.get_hash(algo) or self._magic_null_pk | |||||
for algo in self._content_pk]) | |||||
########################## | |||||
# 'revision' table | # 'revision' table | ||||
########################## | ########################## | ||||
_revision_keys = [ | _revision_keys = [ | ||||
'id', 'date', 'committer_date', 'type', 'directory', 'message', | 'id', 'date', 'committer_date', 'type', 'directory', 'message', | ||||
'author', 'committer', | 'author', 'committer', | ||||
'synthetic', 'metadata'] | 'synthetic', 'metadata'] | ||||
▲ Show 20 Lines • Show All 391 Lines • Show Last 20 Lines |