diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py index e35815dd..96cf0ffc 100644 --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -1,754 +1,753 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import json import logging import random from typing import ( - Any, Callable, Dict, Generator, Iterable, List, Optional, Tuple, TypeVar + Any, Callable, Dict, Iterable, Iterator, List, Optional, + Tuple, TypeVar ) from cassandra import CoordinationFailure from cassandra.cluster import ( Cluster, EXEC_PROFILE_DEFAULT, ExecutionProfile, ResultSet) from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy from cassandra.query import PreparedStatement, BoundStatement from tenacity import ( retry, stop_after_attempt, wait_random_exponential, retry_if_exception_type, ) from swh.model.model import ( Sha1Git, TimestampWithTimezone, Timestamp, Person, Content, SkippedContent, OriginVisit, Origin ) from .common import Row, TOKEN_BEGIN, TOKEN_END, hash_url from .schema import CREATE_TABLES_QUERIES, HASH_ALGORITHMS logger = logging.getLogger(__name__) _execution_profiles = { EXEC_PROFILE_DEFAULT: ExecutionProfile( load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy())), } # Configuration for cassandra-driver's access to servers: # * hit the right server directly when sending a query (TokenAwarePolicy), # * if there's more than one, then pick one at random that's in the same # datacenter as the client (DCAwareRoundRobinPolicy) def create_keyspace(hosts: List[str], keyspace: str, port: int = 9042, *, durable_writes=True): cluster = Cluster( hosts, port=port, execution_profiles=_execution_profiles) session = cluster.connect() extra_params = '' if not durable_writes: extra_params = 'AND durable_writes = false' session.execute('''CREATE KEYSPACE IF NOT EXISTS "%s" WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 } %s; ''' % (keyspace, extra_params)) session.execute('USE "%s"' % keyspace) for query in CREATE_TABLES_QUERIES: session.execute(query) T = TypeVar('T') def _prepared_statement( query: str) -> Callable[[Callable[..., T]], Callable[..., T]]: """Returns a decorator usable on methods of CqlRunner, to inject them with a 'statement' argument, that is a prepared statement corresponding to the query. This only works on methods of CqlRunner, as preparing a statement requires a connection to a Cassandra server.""" def decorator(f): @functools.wraps(f) def newf(self, *args, **kwargs) -> T: if f.__name__ not in self._prepared_statements: statement: PreparedStatement = self._session.prepare(query) self._prepared_statements[f.__name__] = statement return f(self, *args, **kwargs, statement=self._prepared_statements[f.__name__]) return newf return decorator def _prepared_insert_statement(table_name: str, columns: List[str]): """Shorthand for using `_prepared_statement` for `INSERT INTO` statements.""" return _prepared_statement( 'INSERT INTO %s (%s) VALUES (%s)' % ( table_name, ', '.join(columns), ', '.join('?' for _ in columns), ) ) def _prepared_exists_statement(table_name: str): """Shorthand for using `_prepared_statement` for queries that only check which ids in a list exist in the table.""" return _prepared_statement(f'SELECT id FROM {table_name} WHERE id IN ?') class CqlRunner: """Class managing prepared statements and building queries to be sent to Cassandra.""" def __init__(self, hosts: List[str], keyspace: str, port: int): self._cluster = Cluster( hosts, port=port, execution_profiles=_execution_profiles) self._session = self._cluster.connect(keyspace) self._cluster.register_user_type( keyspace, 'microtimestamp_with_timezone', TimestampWithTimezone) self._cluster.register_user_type( keyspace, 'microtimestamp', Timestamp) self._cluster.register_user_type( keyspace, 'person', Person) self._prepared_statements: Dict[str, PreparedStatement] = {} ########################## # Common utility functions ########################## MAX_RETRIES = 3 @retry(wait=wait_random_exponential(multiplier=1, max=10), stop=stop_after_attempt(MAX_RETRIES), retry=retry_if_exception_type(CoordinationFailure)) def _execute_with_retries(self, statement, args) -> ResultSet: return self._session.execute(statement, args, timeout=1000.) @_prepared_statement('UPDATE object_count SET count = count + ? ' 'WHERE partition_key = 0 AND object_type = ?') def _increment_counter( self, object_type: str, nb: int, *, statement: PreparedStatement ) -> None: self._execute_with_retries(statement, [nb, object_type]) def _add_one( self, statement, object_type: str, obj, keys: List[str] ) -> None: self._increment_counter(object_type, 1) self._execute_with_retries( statement, [getattr(obj, key) for key in keys]) def _get_random_row(self, statement) -> Optional[Row]: """Takes a prepared statement of the form "SELECT * FROM WHERE token() > ? LIMIT 1" and uses it to return a random row""" token = random.randint(TOKEN_BEGIN, TOKEN_END) rows = self._execute_with_retries(statement, [token]) if not rows: # There are no row with a greater token; wrap around to get # the row with the smallest token rows = self._execute_with_retries(statement, [TOKEN_BEGIN]) if rows: return rows.one() else: return None def _missing(self, statement, ids): res = self._execute_with_retries(statement, [ids]) found_ids = {id_ for (id_,) in res} return [id_ for id_ in ids if id_ not in found_ids] ########################## # 'content' table ########################## _content_pk = ['sha1', 'sha1_git', 'sha256', 'blake2s256'] _content_keys = [ 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'ctime', 'status'] def _content_add_finalize(self, statement: BoundStatement) -> None: """Returned currified by content_add_prepare, to be called when the content row should be added to the primary table.""" self._execute_with_retries(statement, None) self._increment_counter('content', 1) @_prepared_insert_statement('content', _content_keys) def content_add_prepare( self, content, *, statement) -> Tuple[int, Callable[[], None]]: """Prepares insertion of a Content to the main 'content' table. Returns a token (to be used in secondary tables), and a function to be called to perform the insertion in the main table.""" statement = statement.bind([ getattr(content, key) for key in self._content_keys]) # Type used for hashing keys (usually, it will be # cassandra.metadata.Murmur3Token) token_class = self._cluster.metadata.token_map.token_class # Token of the row when it will be inserted. This is equivalent to # "SELECT token({', '.join(self._content_pk)}) FROM content WHERE ..." # after the row is inserted; but we need the token to insert in the # index tables *before* inserting to the main 'content' table token = token_class.from_key(statement.routing_key).value assert TOKEN_BEGIN <= token <= TOKEN_END # Function to be called after the indexes contain their respective # row finalizer = functools.partial(self._content_add_finalize, statement) return (token, finalizer) @_prepared_statement('SELECT * FROM content WHERE ' + ' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))) def content_get_from_pk( self, content_hashes: Dict[str, bytes], *, statement ) -> Optional[Row]: rows = list(self._execute_with_retries( statement, [content_hashes[algo] for algo in HASH_ALGORITHMS])) assert len(rows) <= 1 if rows: return rows[0] else: return None @_prepared_statement('SELECT * FROM content WHERE token(' + ', '.join(_content_pk) + ') = ?') def content_get_from_token(self, token, *, statement) -> Iterable[Row]: return self._execute_with_retries(statement, [token]) @_prepared_statement('SELECT * FROM content WHERE token(%s) > ? LIMIT 1' % ', '.join(_content_pk)) def content_get_random(self, *, statement) -> Optional[Row]: return self._get_random_row(statement) @_prepared_statement(('SELECT token({0}) AS tok, {1} FROM content ' 'WHERE token({0}) >= ? AND token({0}) <= ? LIMIT ?') .format(', '.join(_content_pk), ', '.join(_content_keys))) def content_get_token_range( self, start: int, end: int, limit: int, *, statement) -> Row: return self._execute_with_retries(statement, [start, end, limit]) ########################## # 'content_by_*' tables ########################## @_prepared_statement('SELECT sha1_git FROM content_by_sha1_git ' 'WHERE sha1_git IN ?') def content_missing_by_sha1_git( self, ids: List[bytes], *, statement) -> List[bytes]: return self._missing(statement, ids) def content_index_add_one( self, algo: str, content: Content, token: int) -> None: """Adds a row mapping content[algo] to the token of the Content in the main 'content' table.""" query = ( f'INSERT INTO content_by_{algo} ({algo}, target_token) ' f'VALUES (%s, %s)') self._execute_with_retries( query, [content.get_hash(algo), token]) def content_get_tokens_from_single_hash( self, algo: str, hash_: bytes) -> Iterable[int]: assert algo in HASH_ALGORITHMS query = f'SELECT target_token FROM content_by_{algo} WHERE {algo} = %s' return (tok for (tok,) in self._execute_with_retries(query, [hash_])) ########################## # 'skipped_content' table ########################## _skipped_content_pk = ['sha1', 'sha1_git', 'sha256', 'blake2s256'] _skipped_content_keys = [ 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'ctime', 'status', 'reason', 'origin'] _magic_null_pk = b'' """ NULLs (or all-empty blobs) are not allowed in primary keys; instead use a special value that can't possibly be a valid hash. """ def _skipped_content_add_finalize(self, statement: BoundStatement) -> None: """Returned currified by skipped_content_add_prepare, to be called when the content row should be added to the primary table.""" self._execute_with_retries(statement, None) self._increment_counter('skipped_content', 1) @_prepared_insert_statement('skipped_content', _skipped_content_keys) def skipped_content_add_prepare( self, content, *, statement) -> Tuple[int, Callable[[], None]]: """Prepares insertion of a Content to the main 'skipped_content' table. Returns a token (to be used in secondary tables), and a function to be called to perform the insertion in the main table.""" # Replace NULLs (which are not allowed in the partition key) with # an empty byte string content = content.to_dict() for key in self._skipped_content_pk: if content[key] is None: content[key] = self._magic_null_pk statement = statement.bind([ content.get(key) for key in self._skipped_content_keys]) # Type used for hashing keys (usually, it will be # cassandra.metadata.Murmur3Token) token_class = self._cluster.metadata.token_map.token_class # Token of the row when it will be inserted. This is equivalent to # "SELECT token({', '.join(self._content_pk)}) # FROM skipped_content WHERE ..." # after the row is inserted; but we need the token to insert in the # index tables *before* inserting to the main 'skipped_content' table token = token_class.from_key(statement.routing_key).value assert TOKEN_BEGIN <= token <= TOKEN_END # Function to be called after the indexes contain their respective # row finalizer = functools.partial( self._skipped_content_add_finalize, statement) return (token, finalizer) @_prepared_statement('SELECT * FROM skipped_content WHERE ' + ' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))) def skipped_content_get_from_pk( self, content_hashes: Dict[str, bytes], *, statement ) -> Optional[Row]: rows = list(self._execute_with_retries( statement, [content_hashes[algo] or self._magic_null_pk for algo in HASH_ALGORITHMS])) assert len(rows) <= 1 if rows: # TODO: convert _magic_null_pk back to None? return rows[0] else: return None ########################## # 'skipped_content_by_*' tables ########################## def skipped_content_index_add_one( self, algo: str, content: SkippedContent, token: int) -> None: """Adds a row mapping content[algo] to the token of the SkippedContent in the main 'skipped_content' table.""" query = ( f'INSERT INTO skipped_content_by_{algo} ({algo}, target_token) ' f'VALUES (%s, %s)') self._execute_with_retries( query, [content.get_hash(algo) or self._magic_null_pk, token]) ########################## # 'revision' table ########################## _revision_keys = [ 'id', 'date', 'committer_date', 'type', 'directory', 'message', 'author', 'committer', 'synthetic', 'metadata'] @_prepared_exists_statement('revision') def revision_missing(self, ids: List[bytes], *, statement) -> List[bytes]: return self._missing(statement, ids) @_prepared_insert_statement('revision', _revision_keys) def revision_add_one(self, revision: Dict[str, Any], *, statement) -> None: self._add_one(statement, 'revision', revision, self._revision_keys) @_prepared_statement('SELECT id FROM revision WHERE id IN ?') def revision_get_ids(self, revision_ids, *, statement) -> ResultSet: return self._execute_with_retries( statement, [revision_ids]) @_prepared_statement('SELECT * FROM revision WHERE id IN ?') def revision_get(self, revision_ids, *, statement) -> ResultSet: return self._execute_with_retries( statement, [revision_ids]) @_prepared_statement('SELECT * FROM revision WHERE token(id) > ? LIMIT 1') def revision_get_random(self, *, statement) -> Optional[Row]: return self._get_random_row(statement) ########################## # 'revision_parent' table ########################## _revision_parent_keys = ['id', 'parent_rank', 'parent_id'] @_prepared_insert_statement('revision_parent', _revision_parent_keys) def revision_parent_add_one( self, id_: Sha1Git, parent_rank: int, parent_id: Sha1Git, *, statement) -> None: self._execute_with_retries( statement, [id_, parent_rank, parent_id]) @_prepared_statement('SELECT parent_id FROM revision_parent WHERE id = ?') def revision_parent_get( self, revision_id: Sha1Git, *, statement) -> ResultSet: return self._execute_with_retries( statement, [revision_id]) ########################## # 'release' table ########################## _release_keys = [ 'id', 'target', 'target_type', 'date', 'name', 'message', 'author', 'synthetic'] @_prepared_exists_statement('release') def release_missing(self, ids: List[bytes], *, statement) -> List[bytes]: return self._missing(statement, ids) @_prepared_insert_statement('release', _release_keys) def release_add_one(self, release: Dict[str, Any], *, statement) -> None: self._add_one(statement, 'release', release, self._release_keys) @_prepared_statement('SELECT * FROM release WHERE id in ?') def release_get(self, release_ids: List[str], *, statement) -> None: return self._execute_with_retries(statement, [release_ids]) @_prepared_statement('SELECT * FROM release WHERE token(id) > ? LIMIT 1') def release_get_random(self, *, statement) -> Optional[Row]: return self._get_random_row(statement) ########################## # 'directory' table ########################## _directory_keys = ['id'] @_prepared_exists_statement('directory') def directory_missing(self, ids: List[bytes], *, statement) -> List[bytes]: return self._missing(statement, ids) @_prepared_insert_statement('directory', _directory_keys) def directory_add_one(self, directory_id: Sha1Git, *, statement) -> None: """Called after all calls to directory_entry_add_one, to commit/finalize the directory.""" self._execute_with_retries(statement, [directory_id]) self._increment_counter('directory', 1) @_prepared_statement('SELECT * FROM directory WHERE token(id) > ? LIMIT 1') def directory_get_random(self, *, statement) -> Optional[Row]: return self._get_random_row(statement) ########################## # 'directory_entry' table ########################## _directory_entry_keys = ['directory_id', 'name', 'type', 'target', 'perms'] @_prepared_insert_statement('directory_entry', _directory_entry_keys) def directory_entry_add_one( self, entry: Dict[str, Any], *, statement) -> None: self._execute_with_retries( statement, [entry[key] for key in self._directory_entry_keys]) @_prepared_statement('SELECT * FROM directory_entry ' 'WHERE directory_id IN ?') def directory_entry_get(self, directory_ids, *, statement) -> ResultSet: return self._execute_with_retries( statement, [directory_ids]) ########################## # 'snapshot' table ########################## _snapshot_keys = ['id'] @_prepared_exists_statement('snapshot') def snapshot_missing(self, ids: List[bytes], *, statement) -> List[bytes]: return self._missing(statement, ids) @_prepared_insert_statement('snapshot', _snapshot_keys) def snapshot_add_one(self, snapshot_id: Sha1Git, *, statement) -> None: self._execute_with_retries(statement, [snapshot_id]) self._increment_counter('snapshot', 1) @_prepared_statement('SELECT * FROM snapshot ' 'WHERE id = ?') def snapshot_get(self, snapshot_id: Sha1Git, *, statement) -> ResultSet: return self._execute_with_retries(statement, [snapshot_id]) @_prepared_statement('SELECT * FROM snapshot WHERE token(id) > ? LIMIT 1') def snapshot_get_random(self, *, statement) -> Optional[Row]: return self._get_random_row(statement) ########################## # 'snapshot_branch' table ########################## _snapshot_branch_keys = ['snapshot_id', 'name', 'target_type', 'target'] @_prepared_insert_statement('snapshot_branch', _snapshot_branch_keys) def snapshot_branch_add_one( self, branch: Dict[str, Any], *, statement) -> None: self._execute_with_retries( statement, [branch[key] for key in self._snapshot_branch_keys]) @_prepared_statement('SELECT ascii_bins_count(target_type) AS counts ' 'FROM snapshot_branch ' 'WHERE snapshot_id = ? ') def snapshot_count_branches( self, snapshot_id: Sha1Git, *, statement) -> ResultSet: return self._execute_with_retries(statement, [snapshot_id]) @_prepared_statement('SELECT * FROM snapshot_branch ' 'WHERE snapshot_id = ? AND name >= ?' 'LIMIT ?') def snapshot_branch_get( self, snapshot_id: Sha1Git, from_: bytes, limit: int, *, statement) -> None: return self._execute_with_retries( statement, [snapshot_id, from_, limit]) ########################## # 'origin' table ########################## origin_keys = ['sha1', 'url', 'type', 'next_visit_id'] @_prepared_statement('INSERT INTO origin (sha1, url, next_visit_id) ' 'VALUES (?, ?, 1) IF NOT EXISTS') def origin_add_one(self, origin: Origin, *, statement) -> None: self._execute_with_retries( statement, [hash_url(origin.url), origin.url]) self._increment_counter('origin', 1) @_prepared_statement('SELECT * FROM origin WHERE sha1 = ?') def origin_get_by_sha1(self, sha1: bytes, *, statement) -> ResultSet: return self._execute_with_retries(statement, [sha1]) def origin_get_by_url(self, url: str) -> ResultSet: return self.origin_get_by_sha1(hash_url(url)) @_prepared_statement( f'SELECT token(sha1) AS tok, {", ".join(origin_keys)} ' f'FROM origin WHERE token(sha1) >= ? LIMIT ?') def origin_list( self, start_token: int, limit: int, *, statement) -> ResultSet: return self._execute_with_retries( statement, [start_token, limit]) @_prepared_statement('SELECT * FROM origin') def origin_iter_all(self, *, statement) -> ResultSet: return self._execute_with_retries(statement, []) @_prepared_statement('SELECT next_visit_id FROM origin WHERE sha1 = ?') def _origin_get_next_visit_id( self, origin_sha1: bytes, *, statement) -> int: rows = list(self._execute_with_retries(statement, [origin_sha1])) assert len(rows) == 1 # TODO: error handling return rows[0].next_visit_id @_prepared_statement('UPDATE origin SET next_visit_id=? ' 'WHERE sha1 = ? IF next_visit_id=?') def origin_generate_unique_visit_id( self, origin_url: str, *, statement) -> int: origin_sha1 = hash_url(origin_url) next_id = self._origin_get_next_visit_id(origin_sha1) while True: res = list(self._execute_with_retries( statement, [next_id+1, origin_sha1, next_id])) assert len(res) == 1 if res[0].applied: # No data race return next_id else: # Someone else updated it before we did, let's try again next_id = res[0].next_visit_id # TODO: abort after too many attempts return next_id ########################## # 'origin_visit' table ########################## _origin_visit_keys = [ 'origin', 'visit', 'type', 'date', 'status', 'metadata', 'snapshot'] _origin_visit_update_keys = [ 'type', 'date', 'status', 'metadata', 'snapshot'] @_prepared_statement('SELECT * FROM origin_visit ' 'WHERE origin = ? AND visit > ?') def _origin_visit_get_no_limit( self, origin_url: str, last_visit: int, *, statement) -> ResultSet: return self._execute_with_retries(statement, [origin_url, last_visit]) @_prepared_statement('SELECT * FROM origin_visit ' 'WHERE origin = ? AND visit > ? LIMIT ?') def _origin_visit_get_limit( self, origin_url: str, last_visit: int, limit: int, *, statement ) -> ResultSet: return self._execute_with_retries( statement, [origin_url, last_visit, limit]) def origin_visit_get( self, origin_url: str, last_visit: Optional[int], limit: Optional[int]) -> ResultSet: if last_visit is None: last_visit = -1 if limit is None: return self._origin_visit_get_no_limit(origin_url, last_visit) else: return self._origin_visit_get_limit(origin_url, last_visit, limit) def origin_visit_update( self, origin_url: str, visit_id: int, updates: Dict[str, Any] ) -> None: set_parts = [] args: List[Any] = [] for (column, value) in updates.items(): set_parts.append(f'{column} = %s') if column == 'metadata': args.append(json.dumps(value)) else: args.append(value) if not set_parts: return query = ('UPDATE origin_visit SET ' + ', '.join(set_parts) + ' WHERE origin = %s AND visit = %s') self._execute_with_retries( query, args + [origin_url, visit_id]) @_prepared_insert_statement('origin_visit', _origin_visit_keys) def origin_visit_add_one( self, visit: OriginVisit, *, statement) -> None: self._add_one(statement, 'origin_visit', visit, self._origin_visit_keys) @_prepared_statement( 'UPDATE origin_visit SET ' + ', '.join('%s = ?' % key for key in _origin_visit_update_keys) + ' WHERE origin = ? AND visit = ?') def origin_visit_upsert( self, visit: OriginVisit, *, statement) -> None: args: List[Any] = [] for column in self._origin_visit_update_keys: if column == 'metadata': args.append(json.dumps(visit.metadata)) else: args.append(getattr(visit, column)) self._execute_with_retries( statement, args + [visit.origin, visit.visit]) # TODO: check if there is already one self._increment_counter('origin_visit', 1) @_prepared_statement('SELECT * FROM origin_visit ' 'WHERE origin = ? AND visit = ?') def origin_visit_get_one( self, origin_url: str, visit_id: int, *, statement) -> Optional[Row]: # TODO: error handling rows = list(self._execute_with_retries( statement, [origin_url, visit_id])) if rows: return rows[0] else: return None @_prepared_statement('SELECT * FROM origin_visit ' 'WHERE origin = ?') def origin_visit_get_all(self, origin_url: str, *, statement) -> ResultSet: - return self._execute_with_retries( - statement, [origin_url]) + return self._execute_with_retries(statement, [origin_url]) @_prepared_statement('SELECT * FROM origin_visit WHERE origin = ?') def origin_visit_get_latest( self, origin: str, allowed_statuses: Optional[Iterable[str]], require_snapshot: bool, *, statement) -> Optional[Row]: # TODO: do the ordering and filtering in Cassandra rows = list(self._execute_with_retries(statement, [origin])) rows.sort(key=lambda row: (row.date, row.visit), reverse=True) for row in rows: if require_snapshot and row.snapshot is None: continue if allowed_statuses is not None \ and row.status not in allowed_statuses: continue if row.snapshot is not None and \ self.snapshot_missing([row.snapshot]): raise ValueError('visit references unknown snapshot') return row else: return None @_prepared_statement('SELECT * FROM origin_visit WHERE token(origin) >= ?') def _origin_visit_iter_from( - self, min_token: int, *, statement) -> Generator[Row, None, None]: + self, min_token: int, *, statement) -> Iterator[Row]: yield from self._execute_with_retries(statement, [min_token]) @_prepared_statement('SELECT * FROM origin_visit WHERE token(origin) < ?') def _origin_visit_iter_to( - self, max_token: int, *, statement) -> Generator[Row, None, None]: + self, max_token: int, *, statement) -> Iterator[Row]: yield from self._execute_with_retries(statement, [max_token]) - def origin_visit_iter( - self, start_token: int) -> Generator[Row, None, None]: + def origin_visit_iter(self, start_token: int) -> Iterator[Row]: """Returns all origin visits in order from this token, and wraps around the token space.""" yield from self._origin_visit_iter_from(start_token) yield from self._origin_visit_iter_to(start_token) ########################## # 'tool' table ########################## _tool_keys = ['id', 'name', 'version', 'configuration'] @_prepared_insert_statement('tool_by_uuid', _tool_keys) def tool_by_uuid_add_one(self, tool: Dict[str, Any], *, statement) -> None: self._execute_with_retries( statement, [tool[key] for key in self._tool_keys]) @_prepared_insert_statement('tool', _tool_keys) def tool_add_one(self, tool: Dict[str, Any], *, statement) -> None: self._execute_with_retries( statement, [tool[key] for key in self._tool_keys]) self._increment_counter('tool', 1) @_prepared_statement('SELECT id FROM tool ' 'WHERE name = ? AND version = ? ' 'AND configuration = ?') def tool_get_one_uuid( self, name: str, version: str, configuration: Dict[str, Any], *, statement) -> Optional[str]: rows = list(self._execute_with_retries( statement, [name, version, configuration])) if rows: assert len(rows) == 1 return rows[0].id else: return None ########################## # Miscellaneous ########################## @_prepared_statement('SELECT uuid() FROM revision LIMIT 1;') def check_read(self, *, statement): self._execute_with_retries(statement, []) @_prepared_statement('SELECT object_type, count FROM object_count ' 'WHERE partition_key=0') def stat_counters(self, *, statement) -> ResultSet: return self._execute_with_retries( statement, [])