diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py
index e35815dd..96cf0ffc 100644
--- a/swh/storage/cassandra/cql.py
+++ b/swh/storage/cassandra/cql.py
@@ -1,754 +1,753 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import functools
import json
import logging
import random
from typing import (
- Any, Callable, Dict, Generator, Iterable, List, Optional, Tuple, TypeVar
+ Any, Callable, Dict, Iterable, Iterator, List, Optional,
+ Tuple, TypeVar
)
from cassandra import CoordinationFailure
from cassandra.cluster import (
Cluster, EXEC_PROFILE_DEFAULT, ExecutionProfile, ResultSet)
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy
from cassandra.query import PreparedStatement, BoundStatement
from tenacity import (
retry, stop_after_attempt, wait_random_exponential,
retry_if_exception_type,
)
from swh.model.model import (
Sha1Git, TimestampWithTimezone, Timestamp, Person, Content,
SkippedContent, OriginVisit, Origin
)
from .common import Row, TOKEN_BEGIN, TOKEN_END, hash_url
from .schema import CREATE_TABLES_QUERIES, HASH_ALGORITHMS
logger = logging.getLogger(__name__)
_execution_profiles = {
EXEC_PROFILE_DEFAULT: ExecutionProfile(
load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy())),
}
# Configuration for cassandra-driver's access to servers:
# * hit the right server directly when sending a query (TokenAwarePolicy),
# * if there's more than one, then pick one at random that's in the same
# datacenter as the client (DCAwareRoundRobinPolicy)
def create_keyspace(hosts: List[str], keyspace: str, port: int = 9042,
*, durable_writes=True):
cluster = Cluster(
hosts, port=port, execution_profiles=_execution_profiles)
session = cluster.connect()
extra_params = ''
if not durable_writes:
extra_params = 'AND durable_writes = false'
session.execute('''CREATE KEYSPACE IF NOT EXISTS "%s"
WITH REPLICATION = {
'class' : 'SimpleStrategy',
'replication_factor' : 1
} %s;
''' % (keyspace, extra_params))
session.execute('USE "%s"' % keyspace)
for query in CREATE_TABLES_QUERIES:
session.execute(query)
T = TypeVar('T')
def _prepared_statement(
query: str) -> Callable[[Callable[..., T]], Callable[..., T]]:
"""Returns a decorator usable on methods of CqlRunner, to
inject them with a 'statement' argument, that is a prepared
statement corresponding to the query.
This only works on methods of CqlRunner, as preparing a
statement requires a connection to a Cassandra server."""
def decorator(f):
@functools.wraps(f)
def newf(self, *args, **kwargs) -> T:
if f.__name__ not in self._prepared_statements:
statement: PreparedStatement = self._session.prepare(query)
self._prepared_statements[f.__name__] = statement
return f(self, *args, **kwargs,
statement=self._prepared_statements[f.__name__])
return newf
return decorator
def _prepared_insert_statement(table_name: str, columns: List[str]):
"""Shorthand for using `_prepared_statement` for `INSERT INTO`
statements."""
return _prepared_statement(
'INSERT INTO %s (%s) VALUES (%s)' % (
table_name,
', '.join(columns), ', '.join('?' for _ in columns),
)
)
def _prepared_exists_statement(table_name: str):
"""Shorthand for using `_prepared_statement` for queries that only
check which ids in a list exist in the table."""
return _prepared_statement(f'SELECT id FROM {table_name} WHERE id IN ?')
class CqlRunner:
"""Class managing prepared statements and building queries to be sent
to Cassandra."""
def __init__(self, hosts: List[str], keyspace: str, port: int):
self._cluster = Cluster(
hosts, port=port, execution_profiles=_execution_profiles)
self._session = self._cluster.connect(keyspace)
self._cluster.register_user_type(
keyspace, 'microtimestamp_with_timezone', TimestampWithTimezone)
self._cluster.register_user_type(
keyspace, 'microtimestamp', Timestamp)
self._cluster.register_user_type(
keyspace, 'person', Person)
self._prepared_statements: Dict[str, PreparedStatement] = {}
##########################
# Common utility functions
##########################
MAX_RETRIES = 3
@retry(wait=wait_random_exponential(multiplier=1, max=10),
stop=stop_after_attempt(MAX_RETRIES),
retry=retry_if_exception_type(CoordinationFailure))
def _execute_with_retries(self, statement, args) -> ResultSet:
return self._session.execute(statement, args, timeout=1000.)
@_prepared_statement('UPDATE object_count SET count = count + ? '
'WHERE partition_key = 0 AND object_type = ?')
def _increment_counter(
self, object_type: str, nb: int, *, statement: PreparedStatement
) -> None:
self._execute_with_retries(statement, [nb, object_type])
def _add_one(
self, statement, object_type: str, obj, keys: List[str]
) -> None:
self._increment_counter(object_type, 1)
self._execute_with_retries(
statement, [getattr(obj, key) for key in keys])
def _get_random_row(self, statement) -> Optional[Row]:
"""Takes a prepared statement of the form
"SELECT * FROM
WHERE token() > ? LIMIT 1"
and uses it to return a random row"""
token = random.randint(TOKEN_BEGIN, TOKEN_END)
rows = self._execute_with_retries(statement, [token])
if not rows:
# There are no row with a greater token; wrap around to get
# the row with the smallest token
rows = self._execute_with_retries(statement, [TOKEN_BEGIN])
if rows:
return rows.one()
else:
return None
def _missing(self, statement, ids):
res = self._execute_with_retries(statement, [ids])
found_ids = {id_ for (id_,) in res}
return [id_ for id_ in ids if id_ not in found_ids]
##########################
# 'content' table
##########################
_content_pk = ['sha1', 'sha1_git', 'sha256', 'blake2s256']
_content_keys = [
'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length',
'ctime', 'status']
def _content_add_finalize(self, statement: BoundStatement) -> None:
"""Returned currified by content_add_prepare, to be called when the
content row should be added to the primary table."""
self._execute_with_retries(statement, None)
self._increment_counter('content', 1)
@_prepared_insert_statement('content', _content_keys)
def content_add_prepare(
self, content, *, statement) -> Tuple[int, Callable[[], None]]:
"""Prepares insertion of a Content to the main 'content' table.
Returns a token (to be used in secondary tables), and a function to be
called to perform the insertion in the main table."""
statement = statement.bind([
getattr(content, key) for key in self._content_keys])
# Type used for hashing keys (usually, it will be
# cassandra.metadata.Murmur3Token)
token_class = self._cluster.metadata.token_map.token_class
# Token of the row when it will be inserted. This is equivalent to
# "SELECT token({', '.join(self._content_pk)}) FROM content WHERE ..."
# after the row is inserted; but we need the token to insert in the
# index tables *before* inserting to the main 'content' table
token = token_class.from_key(statement.routing_key).value
assert TOKEN_BEGIN <= token <= TOKEN_END
# Function to be called after the indexes contain their respective
# row
finalizer = functools.partial(self._content_add_finalize, statement)
return (token, finalizer)
@_prepared_statement('SELECT * FROM content WHERE ' +
' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS)))
def content_get_from_pk(
self, content_hashes: Dict[str, bytes], *, statement
) -> Optional[Row]:
rows = list(self._execute_with_retries(
statement, [content_hashes[algo] for algo in HASH_ALGORITHMS]))
assert len(rows) <= 1
if rows:
return rows[0]
else:
return None
@_prepared_statement('SELECT * FROM content WHERE token('
+ ', '.join(_content_pk)
+ ') = ?')
def content_get_from_token(self, token, *, statement) -> Iterable[Row]:
return self._execute_with_retries(statement, [token])
@_prepared_statement('SELECT * FROM content WHERE token(%s) > ? LIMIT 1'
% ', '.join(_content_pk))
def content_get_random(self, *, statement) -> Optional[Row]:
return self._get_random_row(statement)
@_prepared_statement(('SELECT token({0}) AS tok, {1} FROM content '
'WHERE token({0}) >= ? AND token({0}) <= ? LIMIT ?')
.format(', '.join(_content_pk),
', '.join(_content_keys)))
def content_get_token_range(
self, start: int, end: int, limit: int, *, statement) -> Row:
return self._execute_with_retries(statement, [start, end, limit])
##########################
# 'content_by_*' tables
##########################
@_prepared_statement('SELECT sha1_git FROM content_by_sha1_git '
'WHERE sha1_git IN ?')
def content_missing_by_sha1_git(
self, ids: List[bytes], *, statement) -> List[bytes]:
return self._missing(statement, ids)
def content_index_add_one(
self, algo: str, content: Content, token: int) -> None:
"""Adds a row mapping content[algo] to the token of the Content in
the main 'content' table."""
query = (
f'INSERT INTO content_by_{algo} ({algo}, target_token) '
f'VALUES (%s, %s)')
self._execute_with_retries(
query, [content.get_hash(algo), token])
def content_get_tokens_from_single_hash(
self, algo: str, hash_: bytes) -> Iterable[int]:
assert algo in HASH_ALGORITHMS
query = f'SELECT target_token FROM content_by_{algo} WHERE {algo} = %s'
return (tok for (tok,) in self._execute_with_retries(query, [hash_]))
##########################
# 'skipped_content' table
##########################
_skipped_content_pk = ['sha1', 'sha1_git', 'sha256', 'blake2s256']
_skipped_content_keys = [
'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length',
'ctime', 'status', 'reason', 'origin']
_magic_null_pk = b''
"""
NULLs (or all-empty blobs) are not allowed in primary keys; instead use a
special value that can't possibly be a valid hash.
"""
def _skipped_content_add_finalize(self, statement: BoundStatement) -> None:
"""Returned currified by skipped_content_add_prepare, to be called
when the content row should be added to the primary table."""
self._execute_with_retries(statement, None)
self._increment_counter('skipped_content', 1)
@_prepared_insert_statement('skipped_content', _skipped_content_keys)
def skipped_content_add_prepare(
self, content, *, statement) -> Tuple[int, Callable[[], None]]:
"""Prepares insertion of a Content to the main 'skipped_content' table.
Returns a token (to be used in secondary tables), and a function to be
called to perform the insertion in the main table."""
# Replace NULLs (which are not allowed in the partition key) with
# an empty byte string
content = content.to_dict()
for key in self._skipped_content_pk:
if content[key] is None:
content[key] = self._magic_null_pk
statement = statement.bind([
content.get(key) for key in self._skipped_content_keys])
# Type used for hashing keys (usually, it will be
# cassandra.metadata.Murmur3Token)
token_class = self._cluster.metadata.token_map.token_class
# Token of the row when it will be inserted. This is equivalent to
# "SELECT token({', '.join(self._content_pk)})
# FROM skipped_content WHERE ..."
# after the row is inserted; but we need the token to insert in the
# index tables *before* inserting to the main 'skipped_content' table
token = token_class.from_key(statement.routing_key).value
assert TOKEN_BEGIN <= token <= TOKEN_END
# Function to be called after the indexes contain their respective
# row
finalizer = functools.partial(
self._skipped_content_add_finalize, statement)
return (token, finalizer)
@_prepared_statement('SELECT * FROM skipped_content WHERE ' +
' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS)))
def skipped_content_get_from_pk(
self, content_hashes: Dict[str, bytes], *, statement
) -> Optional[Row]:
rows = list(self._execute_with_retries(
statement, [content_hashes[algo] or self._magic_null_pk
for algo in HASH_ALGORITHMS]))
assert len(rows) <= 1
if rows:
# TODO: convert _magic_null_pk back to None?
return rows[0]
else:
return None
##########################
# 'skipped_content_by_*' tables
##########################
def skipped_content_index_add_one(
self, algo: str, content: SkippedContent, token: int) -> None:
"""Adds a row mapping content[algo] to the token of the SkippedContent
in the main 'skipped_content' table."""
query = (
f'INSERT INTO skipped_content_by_{algo} ({algo}, target_token) '
f'VALUES (%s, %s)')
self._execute_with_retries(
query, [content.get_hash(algo) or self._magic_null_pk, token])
##########################
# 'revision' table
##########################
_revision_keys = [
'id', 'date', 'committer_date', 'type', 'directory', 'message',
'author', 'committer',
'synthetic', 'metadata']
@_prepared_exists_statement('revision')
def revision_missing(self, ids: List[bytes], *, statement) -> List[bytes]:
return self._missing(statement, ids)
@_prepared_insert_statement('revision', _revision_keys)
def revision_add_one(self, revision: Dict[str, Any], *, statement) -> None:
self._add_one(statement, 'revision', revision, self._revision_keys)
@_prepared_statement('SELECT id FROM revision WHERE id IN ?')
def revision_get_ids(self, revision_ids, *, statement) -> ResultSet:
return self._execute_with_retries(
statement, [revision_ids])
@_prepared_statement('SELECT * FROM revision WHERE id IN ?')
def revision_get(self, revision_ids, *, statement) -> ResultSet:
return self._execute_with_retries(
statement, [revision_ids])
@_prepared_statement('SELECT * FROM revision WHERE token(id) > ? LIMIT 1')
def revision_get_random(self, *, statement) -> Optional[Row]:
return self._get_random_row(statement)
##########################
# 'revision_parent' table
##########################
_revision_parent_keys = ['id', 'parent_rank', 'parent_id']
@_prepared_insert_statement('revision_parent', _revision_parent_keys)
def revision_parent_add_one(
self, id_: Sha1Git, parent_rank: int, parent_id: Sha1Git, *,
statement) -> None:
self._execute_with_retries(
statement, [id_, parent_rank, parent_id])
@_prepared_statement('SELECT parent_id FROM revision_parent WHERE id = ?')
def revision_parent_get(
self, revision_id: Sha1Git, *, statement) -> ResultSet:
return self._execute_with_retries(
statement, [revision_id])
##########################
# 'release' table
##########################
_release_keys = [
'id', 'target', 'target_type', 'date', 'name', 'message', 'author',
'synthetic']
@_prepared_exists_statement('release')
def release_missing(self, ids: List[bytes], *, statement) -> List[bytes]:
return self._missing(statement, ids)
@_prepared_insert_statement('release', _release_keys)
def release_add_one(self, release: Dict[str, Any], *, statement) -> None:
self._add_one(statement, 'release', release, self._release_keys)
@_prepared_statement('SELECT * FROM release WHERE id in ?')
def release_get(self, release_ids: List[str], *, statement) -> None:
return self._execute_with_retries(statement, [release_ids])
@_prepared_statement('SELECT * FROM release WHERE token(id) > ? LIMIT 1')
def release_get_random(self, *, statement) -> Optional[Row]:
return self._get_random_row(statement)
##########################
# 'directory' table
##########################
_directory_keys = ['id']
@_prepared_exists_statement('directory')
def directory_missing(self, ids: List[bytes], *, statement) -> List[bytes]:
return self._missing(statement, ids)
@_prepared_insert_statement('directory', _directory_keys)
def directory_add_one(self, directory_id: Sha1Git, *, statement) -> None:
"""Called after all calls to directory_entry_add_one, to
commit/finalize the directory."""
self._execute_with_retries(statement, [directory_id])
self._increment_counter('directory', 1)
@_prepared_statement('SELECT * FROM directory WHERE token(id) > ? LIMIT 1')
def directory_get_random(self, *, statement) -> Optional[Row]:
return self._get_random_row(statement)
##########################
# 'directory_entry' table
##########################
_directory_entry_keys = ['directory_id', 'name', 'type', 'target', 'perms']
@_prepared_insert_statement('directory_entry', _directory_entry_keys)
def directory_entry_add_one(
self, entry: Dict[str, Any], *, statement) -> None:
self._execute_with_retries(
statement, [entry[key] for key in self._directory_entry_keys])
@_prepared_statement('SELECT * FROM directory_entry '
'WHERE directory_id IN ?')
def directory_entry_get(self, directory_ids, *, statement) -> ResultSet:
return self._execute_with_retries(
statement, [directory_ids])
##########################
# 'snapshot' table
##########################
_snapshot_keys = ['id']
@_prepared_exists_statement('snapshot')
def snapshot_missing(self, ids: List[bytes], *, statement) -> List[bytes]:
return self._missing(statement, ids)
@_prepared_insert_statement('snapshot', _snapshot_keys)
def snapshot_add_one(self, snapshot_id: Sha1Git, *, statement) -> None:
self._execute_with_retries(statement, [snapshot_id])
self._increment_counter('snapshot', 1)
@_prepared_statement('SELECT * FROM snapshot '
'WHERE id = ?')
def snapshot_get(self, snapshot_id: Sha1Git, *, statement) -> ResultSet:
return self._execute_with_retries(statement, [snapshot_id])
@_prepared_statement('SELECT * FROM snapshot WHERE token(id) > ? LIMIT 1')
def snapshot_get_random(self, *, statement) -> Optional[Row]:
return self._get_random_row(statement)
##########################
# 'snapshot_branch' table
##########################
_snapshot_branch_keys = ['snapshot_id', 'name', 'target_type', 'target']
@_prepared_insert_statement('snapshot_branch', _snapshot_branch_keys)
def snapshot_branch_add_one(
self, branch: Dict[str, Any], *, statement) -> None:
self._execute_with_retries(
statement, [branch[key] for key in self._snapshot_branch_keys])
@_prepared_statement('SELECT ascii_bins_count(target_type) AS counts '
'FROM snapshot_branch '
'WHERE snapshot_id = ? ')
def snapshot_count_branches(
self, snapshot_id: Sha1Git, *, statement) -> ResultSet:
return self._execute_with_retries(statement, [snapshot_id])
@_prepared_statement('SELECT * FROM snapshot_branch '
'WHERE snapshot_id = ? AND name >= ?'
'LIMIT ?')
def snapshot_branch_get(
self, snapshot_id: Sha1Git, from_: bytes, limit: int, *,
statement) -> None:
return self._execute_with_retries(
statement, [snapshot_id, from_, limit])
##########################
# 'origin' table
##########################
origin_keys = ['sha1', 'url', 'type', 'next_visit_id']
@_prepared_statement('INSERT INTO origin (sha1, url, next_visit_id) '
'VALUES (?, ?, 1) IF NOT EXISTS')
def origin_add_one(self, origin: Origin, *, statement) -> None:
self._execute_with_retries(
statement, [hash_url(origin.url), origin.url])
self._increment_counter('origin', 1)
@_prepared_statement('SELECT * FROM origin WHERE sha1 = ?')
def origin_get_by_sha1(self, sha1: bytes, *, statement) -> ResultSet:
return self._execute_with_retries(statement, [sha1])
def origin_get_by_url(self, url: str) -> ResultSet:
return self.origin_get_by_sha1(hash_url(url))
@_prepared_statement(
f'SELECT token(sha1) AS tok, {", ".join(origin_keys)} '
f'FROM origin WHERE token(sha1) >= ? LIMIT ?')
def origin_list(
self, start_token: int, limit: int, *, statement) -> ResultSet:
return self._execute_with_retries(
statement, [start_token, limit])
@_prepared_statement('SELECT * FROM origin')
def origin_iter_all(self, *, statement) -> ResultSet:
return self._execute_with_retries(statement, [])
@_prepared_statement('SELECT next_visit_id FROM origin WHERE sha1 = ?')
def _origin_get_next_visit_id(
self, origin_sha1: bytes, *, statement) -> int:
rows = list(self._execute_with_retries(statement, [origin_sha1]))
assert len(rows) == 1 # TODO: error handling
return rows[0].next_visit_id
@_prepared_statement('UPDATE origin SET next_visit_id=? '
'WHERE sha1 = ? IF next_visit_id=?')
def origin_generate_unique_visit_id(
self, origin_url: str, *, statement) -> int:
origin_sha1 = hash_url(origin_url)
next_id = self._origin_get_next_visit_id(origin_sha1)
while True:
res = list(self._execute_with_retries(
statement, [next_id+1, origin_sha1, next_id]))
assert len(res) == 1
if res[0].applied:
# No data race
return next_id
else:
# Someone else updated it before we did, let's try again
next_id = res[0].next_visit_id
# TODO: abort after too many attempts
return next_id
##########################
# 'origin_visit' table
##########################
_origin_visit_keys = [
'origin', 'visit', 'type', 'date', 'status', 'metadata', 'snapshot']
_origin_visit_update_keys = [
'type', 'date', 'status', 'metadata', 'snapshot']
@_prepared_statement('SELECT * FROM origin_visit '
'WHERE origin = ? AND visit > ?')
def _origin_visit_get_no_limit(
self, origin_url: str, last_visit: int, *, statement) -> ResultSet:
return self._execute_with_retries(statement, [origin_url, last_visit])
@_prepared_statement('SELECT * FROM origin_visit '
'WHERE origin = ? AND visit > ? LIMIT ?')
def _origin_visit_get_limit(
self, origin_url: str, last_visit: int, limit: int, *, statement
) -> ResultSet:
return self._execute_with_retries(
statement, [origin_url, last_visit, limit])
def origin_visit_get(
self, origin_url: str, last_visit: Optional[int],
limit: Optional[int]) -> ResultSet:
if last_visit is None:
last_visit = -1
if limit is None:
return self._origin_visit_get_no_limit(origin_url, last_visit)
else:
return self._origin_visit_get_limit(origin_url, last_visit, limit)
def origin_visit_update(
self, origin_url: str, visit_id: int, updates: Dict[str, Any]
) -> None:
set_parts = []
args: List[Any] = []
for (column, value) in updates.items():
set_parts.append(f'{column} = %s')
if column == 'metadata':
args.append(json.dumps(value))
else:
args.append(value)
if not set_parts:
return
query = ('UPDATE origin_visit SET ' + ', '.join(set_parts) +
' WHERE origin = %s AND visit = %s')
self._execute_with_retries(
query, args + [origin_url, visit_id])
@_prepared_insert_statement('origin_visit', _origin_visit_keys)
def origin_visit_add_one(
self, visit: OriginVisit, *, statement) -> None:
self._add_one(statement, 'origin_visit', visit,
self._origin_visit_keys)
@_prepared_statement(
'UPDATE origin_visit SET ' +
', '.join('%s = ?' % key for key in _origin_visit_update_keys) +
' WHERE origin = ? AND visit = ?')
def origin_visit_upsert(
self, visit: OriginVisit, *, statement) -> None:
args: List[Any] = []
for column in self._origin_visit_update_keys:
if column == 'metadata':
args.append(json.dumps(visit.metadata))
else:
args.append(getattr(visit, column))
self._execute_with_retries(
statement, args + [visit.origin, visit.visit])
# TODO: check if there is already one
self._increment_counter('origin_visit', 1)
@_prepared_statement('SELECT * FROM origin_visit '
'WHERE origin = ? AND visit = ?')
def origin_visit_get_one(
self, origin_url: str, visit_id: int, *,
statement) -> Optional[Row]:
# TODO: error handling
rows = list(self._execute_with_retries(
statement, [origin_url, visit_id]))
if rows:
return rows[0]
else:
return None
@_prepared_statement('SELECT * FROM origin_visit '
'WHERE origin = ?')
def origin_visit_get_all(self, origin_url: str, *, statement) -> ResultSet:
- return self._execute_with_retries(
- statement, [origin_url])
+ return self._execute_with_retries(statement, [origin_url])
@_prepared_statement('SELECT * FROM origin_visit WHERE origin = ?')
def origin_visit_get_latest(
self, origin: str, allowed_statuses: Optional[Iterable[str]],
require_snapshot: bool, *, statement) -> Optional[Row]:
# TODO: do the ordering and filtering in Cassandra
rows = list(self._execute_with_retries(statement, [origin]))
rows.sort(key=lambda row: (row.date, row.visit), reverse=True)
for row in rows:
if require_snapshot and row.snapshot is None:
continue
if allowed_statuses is not None \
and row.status not in allowed_statuses:
continue
if row.snapshot is not None and \
self.snapshot_missing([row.snapshot]):
raise ValueError('visit references unknown snapshot')
return row
else:
return None
@_prepared_statement('SELECT * FROM origin_visit WHERE token(origin) >= ?')
def _origin_visit_iter_from(
- self, min_token: int, *, statement) -> Generator[Row, None, None]:
+ self, min_token: int, *, statement) -> Iterator[Row]:
yield from self._execute_with_retries(statement, [min_token])
@_prepared_statement('SELECT * FROM origin_visit WHERE token(origin) < ?')
def _origin_visit_iter_to(
- self, max_token: int, *, statement) -> Generator[Row, None, None]:
+ self, max_token: int, *, statement) -> Iterator[Row]:
yield from self._execute_with_retries(statement, [max_token])
- def origin_visit_iter(
- self, start_token: int) -> Generator[Row, None, None]:
+ def origin_visit_iter(self, start_token: int) -> Iterator[Row]:
"""Returns all origin visits in order from this token,
and wraps around the token space."""
yield from self._origin_visit_iter_from(start_token)
yield from self._origin_visit_iter_to(start_token)
##########################
# 'tool' table
##########################
_tool_keys = ['id', 'name', 'version', 'configuration']
@_prepared_insert_statement('tool_by_uuid', _tool_keys)
def tool_by_uuid_add_one(self, tool: Dict[str, Any], *, statement) -> None:
self._execute_with_retries(
statement, [tool[key] for key in self._tool_keys])
@_prepared_insert_statement('tool', _tool_keys)
def tool_add_one(self, tool: Dict[str, Any], *, statement) -> None:
self._execute_with_retries(
statement, [tool[key] for key in self._tool_keys])
self._increment_counter('tool', 1)
@_prepared_statement('SELECT id FROM tool '
'WHERE name = ? AND version = ? '
'AND configuration = ?')
def tool_get_one_uuid(
self, name: str, version: str, configuration: Dict[str, Any], *,
statement) -> Optional[str]:
rows = list(self._execute_with_retries(
statement, [name, version, configuration]))
if rows:
assert len(rows) == 1
return rows[0].id
else:
return None
##########################
# Miscellaneous
##########################
@_prepared_statement('SELECT uuid() FROM revision LIMIT 1;')
def check_read(self, *, statement):
self._execute_with_retries(statement, [])
@_prepared_statement('SELECT object_type, count FROM object_count '
'WHERE partition_key=0')
def stat_counters(self, *, statement) -> ResultSet:
return self._execute_with_retries(
statement, [])