Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/cql.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import dataclasses | |||||
import datetime | import datetime | ||||
import functools | import functools | ||||
import json | |||||
import logging | import logging | ||||
import random | import random | ||||
from typing import ( | from typing import ( | ||||
Any, | Any, | ||||
Callable, | Callable, | ||||
Dict, | Dict, | ||||
Iterable, | Iterable, | ||||
Iterator, | Iterator, | ||||
List, | List, | ||||
Optional, | Optional, | ||||
Tuple, | Tuple, | ||||
Type, | |||||
TypeVar, | TypeVar, | ||||
) | ) | ||||
from cassandra import CoordinationFailure | from cassandra import CoordinationFailure | ||||
from cassandra.cluster import Cluster, EXEC_PROFILE_DEFAULT, ExecutionProfile, ResultSet | from cassandra.cluster import Cluster, EXEC_PROFILE_DEFAULT, ExecutionProfile, ResultSet | ||||
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy | from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy | ||||
from cassandra.query import PreparedStatement, BoundStatement | from cassandra.query import PreparedStatement, BoundStatement, dict_factory | ||||
from tenacity import ( | from tenacity import ( | ||||
retry, | retry, | ||||
stop_after_attempt, | stop_after_attempt, | ||||
wait_random_exponential, | wait_random_exponential, | ||||
retry_if_exception_type, | retry_if_exception_type, | ||||
) | ) | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Content, | |||||
SkippedContent, | |||||
Sha1Git, | Sha1Git, | ||||
TimestampWithTimezone, | TimestampWithTimezone, | ||||
Timestamp, | Timestamp, | ||||
Person, | Person, | ||||
Content, | |||||
SkippedContent, | |||||
OriginVisit, | |||||
OriginVisitStatus, | |||||
Origin, | |||||
) | ) | ||||
from swh.storage.interface import ListOrder | from swh.storage.interface import ListOrder | ||||
from .common import Row, TOKEN_BEGIN, TOKEN_END, hash_url | from .common import TOKEN_BEGIN, TOKEN_END, hash_url, remove_keys | ||||
from .model import ( | |||||
BaseRow, | |||||
ContentRow, | |||||
DirectoryEntryRow, | |||||
DirectoryRow, | |||||
MetadataAuthorityRow, | |||||
MetadataFetcherRow, | |||||
ObjectCountRow, | |||||
OriginRow, | |||||
OriginVisitRow, | |||||
OriginVisitStatusRow, | |||||
RawExtrinsicMetadataRow, | |||||
ReleaseRow, | |||||
RevisionParentRow, | |||||
RevisionRow, | |||||
SkippedContentRow, | |||||
SnapshotBranchRow, | |||||
SnapshotRow, | |||||
) | |||||
from .schema import CREATE_TABLES_QUERIES, HASH_ALGORITHMS | from .schema import CREATE_TABLES_QUERIES, HASH_ALGORITHMS | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
_execution_profiles = { | _execution_profiles = { | ||||
EXEC_PROFILE_DEFAULT: ExecutionProfile( | EXEC_PROFILE_DEFAULT: ExecutionProfile( | ||||
load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy()) | load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy()), | ||||
row_factory=dict_factory, | |||||
), | ), | ||||
} | } | ||||
# Configuration for cassandra-driver's access to servers: | # Configuration for cassandra-driver's access to servers: | ||||
# * hit the right server directly when sending a query (TokenAwarePolicy), | # * hit the right server directly when sending a query (TokenAwarePolicy), | ||||
# * if there's more than one, then pick one at random that's in the same | # * if there's more than one, then pick one at random that's in the same | ||||
# datacenter as the client (DCAwareRoundRobinPolicy) | # datacenter as the client (DCAwareRoundRobinPolicy) | ||||
▲ Show 20 Lines • Show All 95 Lines • ▼ Show 20 Lines | @_prepared_statement( | ||||
"UPDATE object_count SET count = count + ? " | "UPDATE object_count SET count = count + ? " | ||||
"WHERE partition_key = 0 AND object_type = ?" | "WHERE partition_key = 0 AND object_type = ?" | ||||
) | ) | ||||
def _increment_counter( | def _increment_counter( | ||||
self, object_type: str, nb: int, *, statement: PreparedStatement | self, object_type: str, nb: int, *, statement: PreparedStatement | ||||
) -> None: | ) -> None: | ||||
self._execute_with_retries(statement, [nb, object_type]) | self._execute_with_retries(statement, [nb, object_type]) | ||||
def _add_one(self, statement, object_type: str, obj, keys: List[str]) -> None: | def _add_one(self, statement, object_type: Optional[str], obj: BaseRow) -> None: | ||||
if object_type: | |||||
self._increment_counter(object_type, 1) | self._increment_counter(object_type, 1) | ||||
self._execute_with_retries(statement, [getattr(obj, key) for key in keys]) | self._execute_with_retries(statement, dataclasses.astuple(obj)) | ||||
def _get_random_row(self, statement) -> Optional[Row]: | _T = TypeVar("_T", bound=BaseRow) | ||||
def _get_random_row(self, row_class: Type[_T], statement) -> Optional[_T]: # noqa | |||||
"""Takes a prepared statement of the form | """Takes a prepared statement of the form | ||||
"SELECT * FROM <table> WHERE token(<keys>) > ? LIMIT 1" | "SELECT * FROM <table> WHERE token(<keys>) > ? LIMIT 1" | ||||
and uses it to return a random row""" | and uses it to return a random row""" | ||||
token = random.randint(TOKEN_BEGIN, TOKEN_END) | token = random.randint(TOKEN_BEGIN, TOKEN_END) | ||||
rows = self._execute_with_retries(statement, [token]) | rows = self._execute_with_retries(statement, [token]) | ||||
if not rows: | if not rows: | ||||
# There are no row with a greater token; wrap around to get | # There are no row with a greater token; wrap around to get | ||||
# the row with the smallest token | # the row with the smallest token | ||||
rows = self._execute_with_retries(statement, [TOKEN_BEGIN]) | rows = self._execute_with_retries(statement, [TOKEN_BEGIN]) | ||||
if rows: | if rows: | ||||
return rows.one() | return row_class.from_dict(rows.one()) # type: ignore | ||||
else: | else: | ||||
return None | return None | ||||
def _missing(self, statement, ids): | def _missing(self, statement, ids): | ||||
res = self._execute_with_retries(statement, [ids]) | rows = self._execute_with_retries(statement, [ids]) | ||||
found_ids = {id_ for (id_,) in res} | found_ids = {row["id"] for row in rows} | ||||
return [id_ for id_ in ids if id_ not in found_ids] | return [id_ for id_ in ids if id_ not in found_ids] | ||||
########################## | ########################## | ||||
# 'content' table | # 'content' table | ||||
########################## | ########################## | ||||
_content_pk = ["sha1", "sha1_git", "sha256", "blake2s256"] | _content_pk = ["sha1", "sha1_git", "sha256", "blake2s256"] | ||||
_content_keys = [ | |||||
"sha1", | |||||
"sha1_git", | |||||
"sha256", | |||||
"blake2s256", | |||||
"length", | |||||
"ctime", | |||||
"status", | |||||
] | |||||
def _content_add_finalize(self, statement: BoundStatement) -> None: | def _content_add_finalize(self, statement: BoundStatement) -> None: | ||||
"""Returned currified by content_add_prepare, to be called when the | """Returned currified by content_add_prepare, to be called when the | ||||
content row should be added to the primary table.""" | content row should be added to the primary table.""" | ||||
self._execute_with_retries(statement, None) | self._execute_with_retries(statement, None) | ||||
self._increment_counter("content", 1) | self._increment_counter("content", 1) | ||||
@_prepared_insert_statement("content", _content_keys) | @_prepared_insert_statement("content", ContentRow.cols()) | ||||
def content_add_prepare( | def content_add_prepare( | ||||
self, content, *, statement | self, content: ContentRow, *, statement | ||||
) -> Tuple[int, Callable[[], None]]: | ) -> Tuple[int, Callable[[], None]]: | ||||
"""Prepares insertion of a Content to the main 'content' table. | """Prepares insertion of a Content to the main 'content' table. | ||||
Returns a token (to be used in secondary tables), and a function to be | Returns a token (to be used in secondary tables), and a function to be | ||||
called to perform the insertion in the main table.""" | called to perform the insertion in the main table.""" | ||||
statement = statement.bind( | statement = statement.bind(dataclasses.astuple(content)) | ||||
[getattr(content, key) for key in self._content_keys] | |||||
) | |||||
# Type used for hashing keys (usually, it will be | # Type used for hashing keys (usually, it will be | ||||
# cassandra.metadata.Murmur3Token) | # cassandra.metadata.Murmur3Token) | ||||
token_class = self._cluster.metadata.token_map.token_class | token_class = self._cluster.metadata.token_map.token_class | ||||
# Token of the row when it will be inserted. This is equivalent to | # Token of the row when it will be inserted. This is equivalent to | ||||
# "SELECT token({', '.join(self._content_pk)}) FROM content WHERE ..." | # "SELECT token({', '.join(self._content_pk)}) FROM content WHERE ..." | ||||
# after the row is inserted; but we need the token to insert in the | # after the row is inserted; but we need the token to insert in the | ||||
# index tables *before* inserting to the main 'content' table | # index tables *before* inserting to the main 'content' table | ||||
token = token_class.from_key(statement.routing_key).value | token = token_class.from_key(statement.routing_key).value | ||||
assert TOKEN_BEGIN <= token <= TOKEN_END | assert TOKEN_BEGIN <= token <= TOKEN_END | ||||
# Function to be called after the indexes contain their respective | # Function to be called after the indexes contain their respective | ||||
# row | # row | ||||
finalizer = functools.partial(self._content_add_finalize, statement) | finalizer = functools.partial(self._content_add_finalize, statement) | ||||
return (token, finalizer) | return (token, finalizer) | ||||
@_prepared_statement( | @_prepared_statement( | ||||
"SELECT * FROM content WHERE " | "SELECT * FROM content WHERE " | ||||
+ " AND ".join(map("%s = ?".__mod__, HASH_ALGORITHMS)) | + " AND ".join(map("%s = ?".__mod__, HASH_ALGORITHMS)) | ||||
) | ) | ||||
def content_get_from_pk( | def content_get_from_pk( | ||||
self, content_hashes: Dict[str, bytes], *, statement | self, content_hashes: Dict[str, bytes], *, statement | ||||
) -> Optional[Row]: | ) -> Optional[ContentRow]: | ||||
rows = list( | rows = list( | ||||
self._execute_with_retries( | self._execute_with_retries( | ||||
statement, [content_hashes[algo] for algo in HASH_ALGORITHMS] | statement, [content_hashes[algo] for algo in HASH_ALGORITHMS] | ||||
) | ) | ||||
) | ) | ||||
assert len(rows) <= 1 | assert len(rows) <= 1 | ||||
if rows: | if rows: | ||||
return rows[0] | return ContentRow(**rows[0]) | ||||
else: | else: | ||||
return None | return None | ||||
@_prepared_statement( | @_prepared_statement( | ||||
"SELECT * FROM content WHERE token(" + ", ".join(_content_pk) + ") = ?" | "SELECT * FROM content WHERE token(" + ", ".join(_content_pk) + ") = ?" | ||||
) | ) | ||||
def content_get_from_token(self, token, *, statement) -> Iterable[Row]: | def content_get_from_token(self, token, *, statement) -> Iterable[ContentRow]: | ||||
return self._execute_with_retries(statement, [token]) | return map(ContentRow.from_dict, self._execute_with_retries(statement, [token])) | ||||
@_prepared_statement( | @_prepared_statement( | ||||
"SELECT * FROM content WHERE token(%s) > ? LIMIT 1" % ", ".join(_content_pk) | "SELECT * FROM content WHERE token(%s) > ? LIMIT 1" % ", ".join(_content_pk) | ||||
) | ) | ||||
def content_get_random(self, *, statement) -> Optional[Row]: | def content_get_random(self, *, statement) -> Optional[ContentRow]: | ||||
return self._get_random_row(statement) | return self._get_random_row(ContentRow, statement) | ||||
@_prepared_statement( | @_prepared_statement( | ||||
( | ( | ||||
"SELECT token({0}) AS tok, {1} FROM content " | "SELECT token({0}) AS tok, {1} FROM content " | ||||
"WHERE token({0}) >= ? AND token({0}) <= ? LIMIT ?" | "WHERE token({0}) >= ? AND token({0}) <= ? LIMIT ?" | ||||
).format(", ".join(_content_pk), ", ".join(_content_keys)) | ).format(", ".join(_content_pk), ", ".join(ContentRow.cols())) | ||||
) | ) | ||||
def content_get_token_range( | def content_get_token_range( | ||||
self, start: int, end: int, limit: int, *, statement | self, start: int, end: int, limit: int, *, statement | ||||
) -> Iterable[Row]: | ) -> Iterable[Tuple[int, ContentRow]]: | ||||
return self._execute_with_retries(statement, [start, end, limit]) | """Returns an iterable of (token, row)""" | ||||
return ( | |||||
(row["tok"], ContentRow.from_dict(remove_keys(row, ("tok",)))) | |||||
for row in self._execute_with_retries(statement, [start, end, limit]) | |||||
) | |||||
########################## | ########################## | ||||
# 'content_by_*' tables | # 'content_by_*' tables | ||||
########################## | ########################## | ||||
@_prepared_statement("SELECT sha1_git FROM content_by_sha1_git WHERE sha1_git IN ?") | @_prepared_statement( | ||||
"SELECT sha1_git AS id FROM content_by_sha1_git WHERE sha1_git IN ?" | |||||
) | |||||
def content_missing_by_sha1_git( | def content_missing_by_sha1_git( | ||||
self, ids: List[bytes], *, statement | self, ids: List[bytes], *, statement | ||||
) -> List[bytes]: | ) -> List[bytes]: | ||||
return self._missing(statement, ids) | return self._missing(statement, ids) | ||||
def content_index_add_one(self, algo: str, content: Content, token: int) -> None: | def content_index_add_one(self, algo: str, content: Content, token: int) -> None: | ||||
"""Adds a row mapping content[algo] to the token of the Content in | """Adds a row mapping content[algo] to the token of the Content in | ||||
the main 'content' table.""" | the main 'content' table.""" | ||||
query = ( | query = ( | ||||
f"INSERT INTO content_by_{algo} ({algo}, target_token) " f"VALUES (%s, %s)" | f"INSERT INTO content_by_{algo} ({algo}, target_token) " f"VALUES (%s, %s)" | ||||
) | ) | ||||
self._execute_with_retries(query, [content.get_hash(algo), token]) | self._execute_with_retries(query, [content.get_hash(algo), token]) | ||||
def content_get_tokens_from_single_hash( | def content_get_tokens_from_single_hash( | ||||
self, algo: str, hash_: bytes | self, algo: str, hash_: bytes | ||||
) -> Iterable[int]: | ) -> Iterable[int]: | ||||
assert algo in HASH_ALGORITHMS | assert algo in HASH_ALGORITHMS | ||||
query = f"SELECT target_token FROM content_by_{algo} WHERE {algo} = %s" | query = f"SELECT target_token FROM content_by_{algo} WHERE {algo} = %s" | ||||
return (tok for (tok,) in self._execute_with_retries(query, [hash_])) | return ( | ||||
row["target_token"] for row in self._execute_with_retries(query, [hash_]) | |||||
) | |||||
########################## | ########################## | ||||
# 'skipped_content' table | # 'skipped_content' table | ||||
########################## | ########################## | ||||
_skipped_content_pk = ["sha1", "sha1_git", "sha256", "blake2s256"] | _skipped_content_pk = ["sha1", "sha1_git", "sha256", "blake2s256"] | ||||
_skipped_content_keys = [ | |||||
"sha1", | |||||
"sha1_git", | |||||
"sha256", | |||||
"blake2s256", | |||||
"length", | |||||
"ctime", | |||||
"status", | |||||
"reason", | |||||
"origin", | |||||
] | |||||
_magic_null_pk = b"<null>" | _magic_null_pk = b"<null>" | ||||
""" | """ | ||||
NULLs (or all-empty blobs) are not allowed in primary keys; instead use a | NULLs (or all-empty blobs) are not allowed in primary keys; instead use a | ||||
special value that can't possibly be a valid hash. | special value that can't possibly be a valid hash. | ||||
""" | """ | ||||
def _skipped_content_add_finalize(self, statement: BoundStatement) -> None: | def _skipped_content_add_finalize(self, statement: BoundStatement) -> None: | ||||
"""Returned currified by skipped_content_add_prepare, to be called | """Returned currified by skipped_content_add_prepare, to be called | ||||
when the content row should be added to the primary table.""" | when the content row should be added to the primary table.""" | ||||
self._execute_with_retries(statement, None) | self._execute_with_retries(statement, None) | ||||
self._increment_counter("skipped_content", 1) | self._increment_counter("skipped_content", 1) | ||||
@_prepared_insert_statement("skipped_content", _skipped_content_keys) | @_prepared_insert_statement("skipped_content", SkippedContentRow.cols()) | ||||
def skipped_content_add_prepare( | def skipped_content_add_prepare( | ||||
self, content, *, statement | self, content, *, statement | ||||
) -> Tuple[int, Callable[[], None]]: | ) -> Tuple[int, Callable[[], None]]: | ||||
"""Prepares insertion of a Content to the main 'skipped_content' table. | """Prepares insertion of a Content to the main 'skipped_content' table. | ||||
Returns a token (to be used in secondary tables), and a function to be | Returns a token (to be used in secondary tables), and a function to be | ||||
called to perform the insertion in the main table.""" | called to perform the insertion in the main table.""" | ||||
# Replace NULLs (which are not allowed in the partition key) with | # Replace NULLs (which are not allowed in the partition key) with | ||||
# an empty byte string | # an empty byte string | ||||
content = content.to_dict() | |||||
for key in self._skipped_content_pk: | for key in self._skipped_content_pk: | ||||
if content[key] is None: | if getattr(content, key) is None: | ||||
content[key] = self._magic_null_pk | setattr(content, key, self._magic_null_pk) | ||||
statement = statement.bind( | statement = statement.bind(dataclasses.astuple(content)) | ||||
[content.get(key) for key in self._skipped_content_keys] | |||||
) | |||||
# Type used for hashing keys (usually, it will be | # Type used for hashing keys (usually, it will be | ||||
# cassandra.metadata.Murmur3Token) | # cassandra.metadata.Murmur3Token) | ||||
token_class = self._cluster.metadata.token_map.token_class | token_class = self._cluster.metadata.token_map.token_class | ||||
# Token of the row when it will be inserted. This is equivalent to | # Token of the row when it will be inserted. This is equivalent to | ||||
# "SELECT token({', '.join(self._content_pk)}) | # "SELECT token({', '.join(self._content_pk)}) | ||||
# FROM skipped_content WHERE ..." | # FROM skipped_content WHERE ..." | ||||
Show All 9 Lines | ) -> Tuple[int, Callable[[], None]]: | ||||
return (token, finalizer) | return (token, finalizer) | ||||
@_prepared_statement( | @_prepared_statement( | ||||
"SELECT * FROM skipped_content WHERE " | "SELECT * FROM skipped_content WHERE " | ||||
+ " AND ".join(map("%s = ?".__mod__, HASH_ALGORITHMS)) | + " AND ".join(map("%s = ?".__mod__, HASH_ALGORITHMS)) | ||||
) | ) | ||||
def skipped_content_get_from_pk( | def skipped_content_get_from_pk( | ||||
self, content_hashes: Dict[str, bytes], *, statement | self, content_hashes: Dict[str, bytes], *, statement | ||||
) -> Optional[Row]: | ) -> Optional[SkippedContentRow]: | ||||
rows = list( | rows = list( | ||||
self._execute_with_retries( | self._execute_with_retries( | ||||
statement, | statement, | ||||
[ | [ | ||||
content_hashes[algo] or self._magic_null_pk | content_hashes[algo] or self._magic_null_pk | ||||
for algo in HASH_ALGORITHMS | for algo in HASH_ALGORITHMS | ||||
], | ], | ||||
) | ) | ||||
) | ) | ||||
assert len(rows) <= 1 | assert len(rows) <= 1 | ||||
if rows: | if rows: | ||||
# TODO: convert _magic_null_pk back to None? | # TODO: convert _magic_null_pk back to None? | ||||
return rows[0] | return SkippedContentRow.from_dict(rows[0]) | ||||
else: | else: | ||||
return None | return None | ||||
########################## | ########################## | ||||
# 'skipped_content_by_*' tables | # 'skipped_content_by_*' tables | ||||
########################## | ########################## | ||||
def skipped_content_index_add_one( | def skipped_content_index_add_one( | ||||
self, algo: str, content: SkippedContent, token: int | self, algo: str, content: SkippedContent, token: int | ||||
) -> None: | ) -> None: | ||||
"""Adds a row mapping content[algo] to the token of the SkippedContent | """Adds a row mapping content[algo] to the token of the SkippedContent | ||||
in the main 'skipped_content' table.""" | in the main 'skipped_content' table.""" | ||||
query = ( | query = ( | ||||
f"INSERT INTO skipped_content_by_{algo} ({algo}, target_token) " | f"INSERT INTO skipped_content_by_{algo} ({algo}, target_token) " | ||||
f"VALUES (%s, %s)" | f"VALUES (%s, %s)" | ||||
) | ) | ||||
self._execute_with_retries( | self._execute_with_retries( | ||||
query, [content.get_hash(algo) or self._magic_null_pk, token] | query, [content.get_hash(algo) or self._magic_null_pk, token] | ||||
) | ) | ||||
########################## | ########################## | ||||
# 'revision' table | # 'revision' table | ||||
########################## | ########################## | ||||
_revision_keys = [ | |||||
"id", | |||||
"date", | |||||
"committer_date", | |||||
"type", | |||||
"directory", | |||||
"message", | |||||
"author", | |||||
"committer", | |||||
"synthetic", | |||||
"metadata", | |||||
"extra_headers", | |||||
] | |||||
@_prepared_exists_statement("revision") | @_prepared_exists_statement("revision") | ||||
def revision_missing(self, ids: List[bytes], *, statement) -> List[bytes]: | def revision_missing(self, ids: List[bytes], *, statement) -> List[bytes]: | ||||
return self._missing(statement, ids) | return self._missing(statement, ids) | ||||
@_prepared_insert_statement("revision", _revision_keys) | @_prepared_insert_statement("revision", RevisionRow.cols()) | ||||
def revision_add_one(self, revision: Dict[str, Any], *, statement) -> None: | def revision_add_one(self, revision: RevisionRow, *, statement) -> None: | ||||
self._execute_with_retries( | self._add_one(statement, "revision", revision) | ||||
statement, [revision[key] for key in self._revision_keys] | |||||
) | |||||
self._increment_counter("revision", 1) | |||||
@_prepared_statement("SELECT id FROM revision WHERE id IN ?") | @_prepared_statement("SELECT id FROM revision WHERE id IN ?") | ||||
def revision_get_ids(self, revision_ids, *, statement) -> ResultSet: | def revision_get_ids(self, revision_ids, *, statement) -> Iterable[int]: | ||||
return self._execute_with_retries(statement, [revision_ids]) | return ( | ||||
row["id"] for row in self._execute_with_retries(statement, [revision_ids]) | |||||
) | |||||
@_prepared_statement("SELECT * FROM revision WHERE id IN ?") | @_prepared_statement("SELECT * FROM revision WHERE id IN ?") | ||||
def revision_get(self, revision_ids, *, statement) -> ResultSet: | def revision_get(self, revision_ids, *, statement) -> Iterable[RevisionRow]: | ||||
return self._execute_with_retries(statement, [revision_ids]) | return map( | ||||
RevisionRow.from_dict, self._execute_with_retries(statement, [revision_ids]) | |||||
) | |||||
@_prepared_statement("SELECT * FROM revision WHERE token(id) > ? LIMIT 1") | @_prepared_statement("SELECT * FROM revision WHERE token(id) > ? LIMIT 1") | ||||
def revision_get_random(self, *, statement) -> Optional[Row]: | def revision_get_random(self, *, statement) -> Optional[RevisionRow]: | ||||
return self._get_random_row(statement) | return self._get_random_row(RevisionRow, statement) | ||||
########################## | ########################## | ||||
# 'revision_parent' table | # 'revision_parent' table | ||||
########################## | ########################## | ||||
_revision_parent_keys = ["id", "parent_rank", "parent_id"] | @_prepared_insert_statement("revision_parent", RevisionParentRow.cols()) | ||||
@_prepared_insert_statement("revision_parent", _revision_parent_keys) | |||||
def revision_parent_add_one( | def revision_parent_add_one( | ||||
self, id_: Sha1Git, parent_rank: int, parent_id: Sha1Git, *, statement | self, revision_parent: RevisionParentRow, *, statement | ||||
) -> None: | ) -> None: | ||||
self._execute_with_retries(statement, [id_, parent_rank, parent_id]) | self._add_one(statement, None, revision_parent) | ||||
@_prepared_statement("SELECT parent_id FROM revision_parent WHERE id = ?") | @_prepared_statement("SELECT parent_id FROM revision_parent WHERE id = ?") | ||||
def revision_parent_get(self, revision_id: Sha1Git, *, statement) -> ResultSet: | def revision_parent_get( | ||||
return self._execute_with_retries(statement, [revision_id]) | self, revision_id: Sha1Git, *, statement | ||||
) -> Iterable[bytes]: | |||||
return ( | |||||
row["parent_id"] | |||||
for row in self._execute_with_retries(statement, [revision_id]) | |||||
) | |||||
########################## | ########################## | ||||
# 'release' table | # 'release' table | ||||
########################## | ########################## | ||||
_release_keys = [ | |||||
"id", | |||||
"target", | |||||
"target_type", | |||||
"date", | |||||
"name", | |||||
"message", | |||||
"author", | |||||
"synthetic", | |||||
] | |||||
@_prepared_exists_statement("release") | @_prepared_exists_statement("release") | ||||
def release_missing(self, ids: List[bytes], *, statement) -> List[bytes]: | def release_missing(self, ids: List[bytes], *, statement) -> List[bytes]: | ||||
return self._missing(statement, ids) | return self._missing(statement, ids) | ||||
@_prepared_insert_statement("release", _release_keys) | @_prepared_insert_statement("release", ReleaseRow.cols()) | ||||
def release_add_one(self, release: Dict[str, Any], *, statement) -> None: | def release_add_one(self, release: ReleaseRow, *, statement) -> None: | ||||
self._execute_with_retries( | self._add_one(statement, "release", release) | ||||
statement, [release[key] for key in self._release_keys] | |||||
) | |||||
self._increment_counter("release", 1) | |||||
@_prepared_statement("SELECT * FROM release WHERE id in ?") | @_prepared_statement("SELECT * FROM release WHERE id in ?") | ||||
def release_get(self, release_ids: List[str], *, statement) -> None: | def release_get(self, release_ids: List[str], *, statement) -> Iterable[ReleaseRow]: | ||||
return self._execute_with_retries(statement, [release_ids]) | return map( | ||||
ReleaseRow.from_dict, self._execute_with_retries(statement, [release_ids]) | |||||
) | |||||
@_prepared_statement("SELECT * FROM release WHERE token(id) > ? LIMIT 1") | @_prepared_statement("SELECT * FROM release WHERE token(id) > ? LIMIT 1") | ||||
def release_get_random(self, *, statement) -> Optional[Row]: | def release_get_random(self, *, statement) -> Optional[ReleaseRow]: | ||||
return self._get_random_row(statement) | return self._get_random_row(ReleaseRow, statement) | ||||
########################## | ########################## | ||||
# 'directory' table | # 'directory' table | ||||
########################## | ########################## | ||||
_directory_keys = ["id"] | |||||
@_prepared_exists_statement("directory") | @_prepared_exists_statement("directory") | ||||
def directory_missing(self, ids: List[bytes], *, statement) -> List[bytes]: | def directory_missing(self, ids: List[bytes], *, statement) -> List[bytes]: | ||||
return self._missing(statement, ids) | return self._missing(statement, ids) | ||||
@_prepared_insert_statement("directory", _directory_keys) | @_prepared_insert_statement("directory", DirectoryRow.cols()) | ||||
def directory_add_one(self, directory_id: Sha1Git, *, statement) -> None: | def directory_add_one(self, directory: DirectoryRow, *, statement) -> None: | ||||
"""Called after all calls to directory_entry_add_one, to | """Called after all calls to directory_entry_add_one, to | ||||
commit/finalize the directory.""" | commit/finalize the directory.""" | ||||
self._execute_with_retries(statement, [directory_id]) | self._add_one(statement, "directory", directory) | ||||
self._increment_counter("directory", 1) | |||||
@_prepared_statement("SELECT * FROM directory WHERE token(id) > ? LIMIT 1") | @_prepared_statement("SELECT * FROM directory WHERE token(id) > ? LIMIT 1") | ||||
def directory_get_random(self, *, statement) -> Optional[Row]: | def directory_get_random(self, *, statement) -> Optional[DirectoryRow]: | ||||
return self._get_random_row(statement) | return self._get_random_row(DirectoryRow, statement) | ||||
########################## | ########################## | ||||
# 'directory_entry' table | # 'directory_entry' table | ||||
########################## | ########################## | ||||
_directory_entry_keys = ["directory_id", "name", "type", "target", "perms"] | @_prepared_insert_statement("directory_entry", DirectoryEntryRow.cols()) | ||||
def directory_entry_add_one(self, entry: DirectoryEntryRow, *, statement) -> None: | |||||
@_prepared_insert_statement("directory_entry", _directory_entry_keys) | self._add_one(statement, None, entry) | ||||
def directory_entry_add_one(self, entry: Dict[str, Any], *, statement) -> None: | |||||
self._execute_with_retries( | |||||
statement, [entry[key] for key in self._directory_entry_keys] | |||||
) | |||||
@_prepared_statement("SELECT * FROM directory_entry WHERE directory_id IN ?") | @_prepared_statement("SELECT * FROM directory_entry WHERE directory_id IN ?") | ||||
def directory_entry_get(self, directory_ids, *, statement) -> ResultSet: | def directory_entry_get( | ||||
return self._execute_with_retries(statement, [directory_ids]) | self, directory_ids, *, statement | ||||
) -> Iterable[DirectoryEntryRow]: | |||||
return map( | |||||
DirectoryEntryRow.from_dict, | |||||
self._execute_with_retries(statement, [directory_ids]), | |||||
) | |||||
########################## | ########################## | ||||
# 'snapshot' table | # 'snapshot' table | ||||
########################## | ########################## | ||||
_snapshot_keys = ["id"] | |||||
@_prepared_exists_statement("snapshot") | @_prepared_exists_statement("snapshot") | ||||
def snapshot_missing(self, ids: List[bytes], *, statement) -> List[bytes]: | def snapshot_missing(self, ids: List[bytes], *, statement) -> List[bytes]: | ||||
return self._missing(statement, ids) | return self._missing(statement, ids) | ||||
@_prepared_insert_statement("snapshot", _snapshot_keys) | @_prepared_insert_statement("snapshot", SnapshotRow.cols()) | ||||
def snapshot_add_one(self, snapshot_id: Sha1Git, *, statement) -> None: | def snapshot_add_one(self, snapshot: SnapshotRow, *, statement) -> None: | ||||
self._execute_with_retries(statement, [snapshot_id]) | self._add_one(statement, "snapshot", snapshot) | ||||
self._increment_counter("snapshot", 1) | |||||
@_prepared_statement("SELECT * FROM snapshot WHERE id = ?") | @_prepared_statement("SELECT * FROM snapshot WHERE id = ?") | ||||
def snapshot_get(self, snapshot_id: Sha1Git, *, statement) -> ResultSet: | def snapshot_get(self, snapshot_id: Sha1Git, *, statement) -> ResultSet: | ||||
return self._execute_with_retries(statement, [snapshot_id]) | return map( | ||||
SnapshotRow.from_dict, self._execute_with_retries(statement, [snapshot_id]) | |||||
) | |||||
@_prepared_statement("SELECT * FROM snapshot WHERE token(id) > ? LIMIT 1") | @_prepared_statement("SELECT * FROM snapshot WHERE token(id) > ? LIMIT 1") | ||||
def snapshot_get_random(self, *, statement) -> Optional[Row]: | def snapshot_get_random(self, *, statement) -> Optional[SnapshotRow]: | ||||
return self._get_random_row(statement) | return self._get_random_row(SnapshotRow, statement) | ||||
########################## | ########################## | ||||
# 'snapshot_branch' table | # 'snapshot_branch' table | ||||
########################## | ########################## | ||||
_snapshot_branch_keys = ["snapshot_id", "name", "target_type", "target"] | @_prepared_insert_statement("snapshot_branch", SnapshotBranchRow.cols()) | ||||
def snapshot_branch_add_one(self, branch: SnapshotBranchRow, *, statement) -> None: | |||||
@_prepared_insert_statement("snapshot_branch", _snapshot_branch_keys) | self._add_one(statement, None, branch) | ||||
def snapshot_branch_add_one(self, branch: Dict[str, Any], *, statement) -> None: | |||||
self._execute_with_retries( | |||||
statement, [branch[key] for key in self._snapshot_branch_keys] | |||||
) | |||||
@_prepared_statement( | @_prepared_statement( | ||||
"SELECT ascii_bins_count(target_type) AS counts " | "SELECT ascii_bins_count(target_type) AS counts " | ||||
"FROM snapshot_branch " | "FROM snapshot_branch " | ||||
"WHERE snapshot_id = ? " | "WHERE snapshot_id = ? " | ||||
) | ) | ||||
def snapshot_count_branches(self, snapshot_id: Sha1Git, *, statement) -> ResultSet: | def snapshot_count_branches( | ||||
return self._execute_with_retries(statement, [snapshot_id]) | self, snapshot_id: Sha1Git, *, statement | ||||
) -> Dict[Optional[str], int]: | |||||
"""Returns a dictionary from type names to the number of branches | |||||
of that type.""" | |||||
row = self._execute_with_retries(statement, [snapshot_id]).one() | |||||
(nb_none, counts) = row["counts"] | |||||
return {None: nb_none, **counts} | |||||
@_prepared_statement( | @_prepared_statement( | ||||
"SELECT * FROM snapshot_branch WHERE snapshot_id = ? AND name >= ? LIMIT ?" | "SELECT * FROM snapshot_branch WHERE snapshot_id = ? AND name >= ? LIMIT ?" | ||||
) | ) | ||||
def snapshot_branch_get( | def snapshot_branch_get( | ||||
self, snapshot_id: Sha1Git, from_: bytes, limit: int, *, statement | self, snapshot_id: Sha1Git, from_: bytes, limit: int, *, statement | ||||
) -> ResultSet: | ) -> Iterable[SnapshotBranchRow]: | ||||
return self._execute_with_retries(statement, [snapshot_id, from_, limit]) | return map( | ||||
SnapshotBranchRow.from_dict, | |||||
self._execute_with_retries(statement, [snapshot_id, from_, limit]), | |||||
) | |||||
########################## | ########################## | ||||
# 'origin' table | # 'origin' table | ||||
########################## | ########################## | ||||
origin_keys = ["sha1", "url", "type", "next_visit_id"] | @_prepared_insert_statement("origin", OriginRow.cols()) | ||||
def origin_add_one(self, origin: OriginRow, *, statement) -> None: | |||||
@_prepared_statement( | self._add_one(statement, "origin", origin) | ||||
"INSERT INTO origin (sha1, url, next_visit_id) " | |||||
"VALUES (?, ?, 1) IF NOT EXISTS" | |||||
) | |||||
def origin_add_one(self, origin: Origin, *, statement) -> None: | |||||
self._execute_with_retries(statement, [hash_url(origin.url), origin.url]) | |||||
self._increment_counter("origin", 1) | |||||
@_prepared_statement("SELECT * FROM origin WHERE sha1 = ?") | @_prepared_statement("SELECT * FROM origin WHERE sha1 = ?") | ||||
def origin_get_by_sha1(self, sha1: bytes, *, statement) -> ResultSet: | def origin_get_by_sha1(self, sha1: bytes, *, statement) -> Iterable[OriginRow]: | ||||
return self._execute_with_retries(statement, [sha1]) | return map(OriginRow.from_dict, self._execute_with_retries(statement, [sha1])) | ||||
def origin_get_by_url(self, url: str) -> ResultSet: | def origin_get_by_url(self, url: str) -> Iterable[OriginRow]: | ||||
return self.origin_get_by_sha1(hash_url(url)) | return self.origin_get_by_sha1(hash_url(url)) | ||||
@_prepared_statement( | @_prepared_statement( | ||||
f'SELECT token(sha1) AS tok, {", ".join(origin_keys)} ' | f'SELECT token(sha1) AS tok, {", ".join(OriginRow.cols())} ' | ||||
f"FROM origin WHERE token(sha1) >= ? LIMIT ?" | f"FROM origin WHERE token(sha1) >= ? LIMIT ?" | ||||
) | ) | ||||
def origin_list(self, start_token: int, limit: int, *, statement) -> ResultSet: | def origin_list( | ||||
return self._execute_with_retries(statement, [start_token, limit]) | self, start_token: int, limit: int, *, statement | ||||
) -> Iterable[Tuple[int, OriginRow]]: | |||||
"""Returns an iterable of (token, origin)""" | |||||
return ( | |||||
(row["tok"], OriginRow.from_dict(remove_keys(row, ("tok",)))) | |||||
for row in self._execute_with_retries(statement, [start_token, limit]) | |||||
) | |||||
@_prepared_statement("SELECT * FROM origin") | @_prepared_statement("SELECT * FROM origin") | ||||
def origin_iter_all(self, *, statement) -> ResultSet: | def origin_iter_all(self, *, statement) -> Iterable[OriginRow]: | ||||
return self._execute_with_retries(statement, []) | return map(OriginRow.from_dict, self._execute_with_retries(statement, [])) | ||||
@_prepared_statement("SELECT next_visit_id FROM origin WHERE sha1 = ?") | @_prepared_statement("SELECT next_visit_id FROM origin WHERE sha1 = ?") | ||||
def _origin_get_next_visit_id(self, origin_sha1: bytes, *, statement) -> int: | def _origin_get_next_visit_id(self, origin_sha1: bytes, *, statement) -> int: | ||||
rows = list(self._execute_with_retries(statement, [origin_sha1])) | rows = list(self._execute_with_retries(statement, [origin_sha1])) | ||||
assert len(rows) == 1 # TODO: error handling | assert len(rows) == 1 # TODO: error handling | ||||
return rows[0].next_visit_id | return rows[0]["next_visit_id"] | ||||
@_prepared_statement( | @_prepared_statement( | ||||
"UPDATE origin SET next_visit_id=? WHERE sha1 = ? IF next_visit_id=?" | "UPDATE origin SET next_visit_id=? WHERE sha1 = ? IF next_visit_id=?" | ||||
) | ) | ||||
def origin_generate_unique_visit_id(self, origin_url: str, *, statement) -> int: | def origin_generate_unique_visit_id(self, origin_url: str, *, statement) -> int: | ||||
origin_sha1 = hash_url(origin_url) | origin_sha1 = hash_url(origin_url) | ||||
next_id = self._origin_get_next_visit_id(origin_sha1) | next_id = self._origin_get_next_visit_id(origin_sha1) | ||||
while True: | while True: | ||||
res = list( | res = list( | ||||
self._execute_with_retries( | self._execute_with_retries( | ||||
statement, [next_id + 1, origin_sha1, next_id] | statement, [next_id + 1, origin_sha1, next_id] | ||||
) | ) | ||||
) | ) | ||||
assert len(res) == 1 | assert len(res) == 1 | ||||
if res[0].applied: | if res[0]["[applied]"]: | ||||
# No data race | # No data race | ||||
return next_id | return next_id | ||||
else: | else: | ||||
# Someone else updated it before we did, let's try again | # Someone else updated it before we did, let's try again | ||||
next_id = res[0].next_visit_id | next_id = res[0]["next_visit_id"] | ||||
# TODO: abort after too many attempts | # TODO: abort after too many attempts | ||||
return next_id | return next_id | ||||
########################## | ########################## | ||||
# 'origin_visit' table | # 'origin_visit' table | ||||
########################## | ########################## | ||||
_origin_visit_keys = [ | |||||
"origin", | |||||
"visit", | |||||
"type", | |||||
"date", | |||||
] | |||||
@_prepared_statement( | @_prepared_statement( | ||||
"SELECT * FROM origin_visit WHERE origin = ? AND visit > ? " | "SELECT * FROM origin_visit WHERE origin = ? AND visit > ? " | ||||
"ORDER BY visit ASC" | "ORDER BY visit ASC" | ||||
) | ) | ||||
def _origin_visit_get_pagination_asc_no_limit( | def _origin_visit_get_pagination_asc_no_limit( | ||||
self, origin_url: str, last_visit: int, *, statement | self, origin_url: str, last_visit: int, *, statement | ||||
) -> ResultSet: | ) -> ResultSet: | ||||
return self._execute_with_retries(statement, [origin_url, last_visit]) | return self._execute_with_retries(statement, [origin_url, last_visit]) | ||||
▲ Show 20 Lines • Show All 60 Lines • ▼ Show 20 Lines | ) -> ResultSet: | ||||
return self._execute_with_retries(statement, [origin_url, limit]) | return self._execute_with_retries(statement, [origin_url, limit]) | ||||
def origin_visit_get( | def origin_visit_get( | ||||
self, | self, | ||||
origin_url: str, | origin_url: str, | ||||
last_visit: Optional[int], | last_visit: Optional[int], | ||||
limit: Optional[int], | limit: Optional[int], | ||||
order: ListOrder, | order: ListOrder, | ||||
) -> ResultSet: | ) -> Iterable[OriginVisitRow]: | ||||
args: List[Any] = [origin_url] | args: List[Any] = [origin_url] | ||||
if last_visit is not None: | if last_visit is not None: | ||||
page_name = "pagination" | page_name = "pagination" | ||||
args.append(last_visit) | args.append(last_visit) | ||||
else: | else: | ||||
page_name = "no_pagination" | page_name = "no_pagination" | ||||
if limit is not None: | if limit is not None: | ||||
limit_name = "limit" | limit_name = "limit" | ||||
args.append(limit) | args.append(limit) | ||||
else: | else: | ||||
limit_name = "no_limit" | limit_name = "no_limit" | ||||
method_name = f"_origin_visit_get_{page_name}_{order.value}_{limit_name}" | method_name = f"_origin_visit_get_{page_name}_{order.value}_{limit_name}" | ||||
origin_visit_get_method = getattr(self, method_name) | origin_visit_get_method = getattr(self, method_name) | ||||
return origin_visit_get_method(*args) | return map(OriginVisitRow.from_dict, origin_visit_get_method(*args)) | ||||
@_prepared_statement( | @_prepared_statement( | ||||
"SELECT * FROM origin_visit_status WHERE origin = ? " | "SELECT * FROM origin_visit_status WHERE origin = ? " | ||||
"AND visit = ? AND date >= ? " | "AND visit = ? AND date >= ? " | ||||
"ORDER BY date ASC " | "ORDER BY date ASC " | ||||
"LIMIT ?" | "LIMIT ?" | ||||
) | ) | ||||
def _origin_visit_status_get_with_date_asc_limit( | def _origin_visit_status_get_with_date_asc_limit( | ||||
▲ Show 20 Lines • Show All 46 Lines • ▼ Show 20 Lines | class CqlRunner: | ||||
def origin_visit_status_get_range( | def origin_visit_status_get_range( | ||||
self, | self, | ||||
origin: str, | origin: str, | ||||
visit: int, | visit: int, | ||||
date_from: Optional[datetime.datetime], | date_from: Optional[datetime.datetime], | ||||
limit: int, | limit: int, | ||||
order: ListOrder, | order: ListOrder, | ||||
) -> ResultSet: | ) -> Iterable[OriginVisitStatusRow]: | ||||
args: List[Any] = [origin, visit] | args: List[Any] = [origin, visit] | ||||
if date_from is not None: | if date_from is not None: | ||||
date_name = "date" | date_name = "date" | ||||
args.append(date_from) | args.append(date_from) | ||||
else: | else: | ||||
date_name = "no_date" | date_name = "no_date" | ||||
args.append(limit) | args.append(limit) | ||||
method_name = f"_origin_visit_status_get_with_{date_name}_{order.value}_limit" | method_name = f"_origin_visit_status_get_with_{date_name}_{order.value}_limit" | ||||
origin_visit_status_get_method = getattr(self, method_name) | origin_visit_status_get_method = getattr(self, method_name) | ||||
return origin_visit_status_get_method(*args) | return map( | ||||
OriginVisitStatusRow.from_dict, origin_visit_status_get_method(*args) | |||||
) | |||||
@_prepared_insert_statement("origin_visit", _origin_visit_keys) | @_prepared_insert_statement("origin_visit", OriginVisitRow.cols()) | ||||
def origin_visit_add_one(self, visit: OriginVisit, *, statement) -> None: | def origin_visit_add_one(self, visit: OriginVisitRow, *, statement) -> None: | ||||
self._add_one(statement, "origin_visit", visit, self._origin_visit_keys) | self._add_one(statement, "origin_visit", visit) | ||||
_origin_visit_status_keys = [ | |||||
"origin", | |||||
"visit", | |||||
"date", | |||||
"status", | |||||
"snapshot", | |||||
"metadata", | |||||
] | |||||
@_prepared_insert_statement("origin_visit_status", _origin_visit_status_keys) | @_prepared_insert_statement("origin_visit_status", OriginVisitStatusRow.cols()) | ||||
def origin_visit_status_add_one( | def origin_visit_status_add_one( | ||||
self, visit_update: OriginVisitStatus, *, statement | self, visit_update: OriginVisitStatusRow, *, statement | ||||
) -> None: | ) -> None: | ||||
assert self._origin_visit_status_keys[-1] == "metadata" | self._add_one(statement, None, visit_update) | ||||
keys = self._origin_visit_status_keys | |||||
metadata = json.dumps( | |||||
dict(visit_update.metadata) if visit_update.metadata is not None else None | |||||
) | |||||
self._execute_with_retries( | |||||
statement, [getattr(visit_update, key) for key in keys[:-1]] + [metadata] | |||||
) | |||||
def origin_visit_status_get_latest(self, origin: str, visit: int,) -> Optional[Row]: | def origin_visit_status_get_latest( | ||||
self, origin: str, visit: int, | |||||
) -> Optional[OriginVisitStatusRow]: | |||||
"""Given an origin visit id, return its latest origin_visit_status | """Given an origin visit id, return its latest origin_visit_status | ||||
""" | """ | ||||
rows = self.origin_visit_status_get(origin, visit) | return next(self.origin_visit_status_get(origin, visit), None) | ||||
return rows[0] if rows else None | |||||
@_prepared_statement( | @_prepared_statement( | ||||
"SELECT * FROM origin_visit_status " | "SELECT * FROM origin_visit_status " | ||||
"WHERE origin = ? AND visit = ? " | "WHERE origin = ? AND visit = ? " | ||||
"ORDER BY date DESC" | "ORDER BY date DESC" | ||||
) | ) | ||||
def origin_visit_status_get( | def origin_visit_status_get( | ||||
self, | self, | ||||
origin: str, | origin: str, | ||||
visit: int, | visit: int, | ||||
allowed_statuses: Optional[List[str]] = None, | allowed_statuses: Optional[List[str]] = None, | ||||
require_snapshot: bool = False, | require_snapshot: bool = False, | ||||
*, | *, | ||||
statement, | statement, | ||||
) -> List[Row]: | ) -> Iterator[OriginVisitStatusRow]: | ||||
"""Return all origin visit statuses for a given visit | """Return all origin visit statuses for a given visit | ||||
""" | """ | ||||
return list(self._execute_with_retries(statement, [origin, visit])) | return map( | ||||
OriginVisitStatusRow.from_dict, | |||||
self._execute_with_retries(statement, [origin, visit]), | |||||
) | |||||
@_prepared_statement("SELECT * FROM origin_visit WHERE origin = ? AND visit = ?") | @_prepared_statement("SELECT * FROM origin_visit WHERE origin = ? AND visit = ?") | ||||
def origin_visit_get_one( | def origin_visit_get_one( | ||||
self, origin_url: str, visit_id: int, *, statement | self, origin_url: str, visit_id: int, *, statement | ||||
) -> Optional[Row]: | ) -> Optional[OriginVisitRow]: | ||||
# TODO: error handling | # TODO: error handling | ||||
rows = list(self._execute_with_retries(statement, [origin_url, visit_id])) | rows = list(self._execute_with_retries(statement, [origin_url, visit_id])) | ||||
if rows: | if rows: | ||||
return rows[0] | return OriginVisitRow.from_dict(rows[0]) | ||||
else: | else: | ||||
return None | return None | ||||
@_prepared_statement("SELECT * FROM origin_visit WHERE origin = ?") | @_prepared_statement("SELECT * FROM origin_visit WHERE origin = ?") | ||||
def origin_visit_get_all(self, origin_url: str, *, statement) -> ResultSet: | def origin_visit_get_all( | ||||
return self._execute_with_retries(statement, [origin_url]) | self, origin_url: str, *, statement | ||||
) -> Iterable[OriginVisitRow]: | |||||
return map( | |||||
OriginVisitRow.from_dict, | |||||
self._execute_with_retries(statement, [origin_url]), | |||||
) | |||||
@_prepared_statement("SELECT * FROM origin_visit WHERE token(origin) >= ?") | @_prepared_statement("SELECT * FROM origin_visit WHERE token(origin) >= ?") | ||||
def _origin_visit_iter_from(self, min_token: int, *, statement) -> Iterator[Row]: | def _origin_visit_iter_from( | ||||
yield from self._execute_with_retries(statement, [min_token]) | self, min_token: int, *, statement | ||||
) -> Iterable[OriginVisitRow]: | |||||
return map( | |||||
OriginVisitRow.from_dict, self._execute_with_retries(statement, [min_token]) | |||||
) | |||||
@_prepared_statement("SELECT * FROM origin_visit WHERE token(origin) < ?") | @_prepared_statement("SELECT * FROM origin_visit WHERE token(origin) < ?") | ||||
def _origin_visit_iter_to(self, max_token: int, *, statement) -> Iterator[Row]: | def _origin_visit_iter_to( | ||||
yield from self._execute_with_retries(statement, [max_token]) | self, max_token: int, *, statement | ||||
) -> Iterable[OriginVisitRow]: | |||||
return map( | |||||
OriginVisitRow.from_dict, self._execute_with_retries(statement, [max_token]) | |||||
) | |||||
def origin_visit_iter(self, start_token: int) -> Iterator[Row]: | def origin_visit_iter(self, start_token: int) -> Iterator[OriginVisitRow]: | ||||
"""Returns all origin visits in order from this token, | """Returns all origin visits in order from this token, | ||||
and wraps around the token space.""" | and wraps around the token space.""" | ||||
yield from self._origin_visit_iter_from(start_token) | yield from self._origin_visit_iter_from(start_token) | ||||
yield from self._origin_visit_iter_to(start_token) | yield from self._origin_visit_iter_to(start_token) | ||||
########################## | ########################## | ||||
# 'metadata_authority' table | # 'metadata_authority' table | ||||
########################## | ########################## | ||||
_metadata_authority_keys = ["url", "type", "metadata"] | @_prepared_insert_statement("metadata_authority", MetadataAuthorityRow.cols()) | ||||
def metadata_authority_add(self, authority: MetadataAuthorityRow, *, statement): | |||||
@_prepared_insert_statement("metadata_authority", _metadata_authority_keys) | self._add_one(statement, None, authority) | ||||
def metadata_authority_add(self, url, type, metadata, *, statement): | |||||
return self._execute_with_retries(statement, [url, type, metadata]) | |||||
@_prepared_statement("SELECT * from metadata_authority WHERE type = ? AND url = ?") | @_prepared_statement("SELECT * from metadata_authority WHERE type = ? AND url = ?") | ||||
def metadata_authority_get(self, type, url, *, statement) -> Optional[Row]: | def metadata_authority_get( | ||||
return next(iter(self._execute_with_retries(statement, [type, url])), None) | self, type, url, *, statement | ||||
) -> Optional[MetadataAuthorityRow]: | |||||
rows = list(self._execute_with_retries(statement, [type, url])) | |||||
if rows: | |||||
return MetadataAuthorityRow.from_dict(rows[0]) | |||||
else: | |||||
return None | |||||
########################## | ########################## | ||||
# 'metadata_fetcher' table | # 'metadata_fetcher' table | ||||
########################## | ########################## | ||||
_metadata_fetcher_keys = ["name", "version", "metadata"] | @_prepared_insert_statement("metadata_fetcher", MetadataFetcherRow.cols()) | ||||
def metadata_fetcher_add(self, fetcher, *, statement): | |||||
@_prepared_insert_statement("metadata_fetcher", _metadata_fetcher_keys) | self._add_one(statement, None, fetcher) | ||||
def metadata_fetcher_add(self, name, version, metadata, *, statement): | |||||
return self._execute_with_retries(statement, [name, version, metadata]) | |||||
@_prepared_statement( | @_prepared_statement( | ||||
"SELECT * from metadata_fetcher WHERE name = ? AND version = ?" | "SELECT * from metadata_fetcher WHERE name = ? AND version = ?" | ||||
) | ) | ||||
def metadata_fetcher_get(self, name, version, *, statement) -> Optional[Row]: | def metadata_fetcher_get( | ||||
return next(iter(self._execute_with_retries(statement, [name, version])), None) | self, name, version, *, statement | ||||
) -> Optional[MetadataFetcherRow]: | |||||
rows = list(self._execute_with_retries(statement, [name, version])) | |||||
if rows: | |||||
return MetadataFetcherRow.from_dict(rows[0]) | |||||
else: | |||||
return None | |||||
######################### | ######################### | ||||
# 'raw_extrinsic_metadata' table | # 'raw_extrinsic_metadata' table | ||||
######################### | ######################### | ||||
_raw_extrinsic_metadata_keys = [ | @_prepared_insert_statement( | ||||
"type", | "raw_extrinsic_metadata", RawExtrinsicMetadataRow.cols() | ||||
"id", | |||||
"authority_type", | |||||
"authority_url", | |||||
"discovery_date", | |||||
"fetcher_name", | |||||
"fetcher_version", | |||||
"format", | |||||
"metadata", | |||||
"origin", | |||||
"visit", | |||||
"snapshot", | |||||
"release", | |||||
"revision", | |||||
"path", | |||||
"directory", | |||||
] | |||||
@_prepared_statement( | |||||
f"INSERT INTO raw_extrinsic_metadata " | |||||
f" ({', '.join(_raw_extrinsic_metadata_keys)}) " | |||||
f"VALUES ({', '.join('?' for _ in _raw_extrinsic_metadata_keys)})" | |||||
) | ) | ||||
def raw_extrinsic_metadata_add( | def raw_extrinsic_metadata_add(self, raw_extrinsic_metadata, *, statement): | ||||
self, statement, **kwargs, | self._add_one(statement, None, raw_extrinsic_metadata) | ||||
): | |||||
assert set(kwargs) == set( | |||||
self._raw_extrinsic_metadata_keys | |||||
), f"Bad kwargs: {set(kwargs)}" | |||||
params = [kwargs[key] for key in self._raw_extrinsic_metadata_keys] | |||||
return self._execute_with_retries(statement, params,) | |||||
@_prepared_statement( | @_prepared_statement( | ||||
"SELECT * from raw_extrinsic_metadata " | "SELECT * from raw_extrinsic_metadata " | ||||
"WHERE id=? AND authority_url=? AND discovery_date>? AND authority_type=?" | "WHERE id=? AND authority_url=? AND discovery_date>? AND authority_type=?" | ||||
) | ) | ||||
def raw_extrinsic_metadata_get_after_date( | def raw_extrinsic_metadata_get_after_date( | ||||
self, | self, | ||||
id: str, | id: str, | ||||
authority_type: str, | authority_type: str, | ||||
authority_url: str, | authority_url: str, | ||||
after: datetime.datetime, | after: datetime.datetime, | ||||
*, | *, | ||||
statement, | statement, | ||||
): | ) -> Iterable[RawExtrinsicMetadataRow]: | ||||
return self._execute_with_retries( | return map( | ||||
RawExtrinsicMetadataRow.from_dict, | |||||
self._execute_with_retries( | |||||
statement, [id, authority_url, after, authority_type] | statement, [id, authority_url, after, authority_type] | ||||
), | |||||
) | ) | ||||
@_prepared_statement( | @_prepared_statement( | ||||
"SELECT * from raw_extrinsic_metadata " | "SELECT * from raw_extrinsic_metadata " | ||||
"WHERE id=? AND authority_type=? AND authority_url=? " | "WHERE id=? AND authority_type=? AND authority_url=? " | ||||
"AND (discovery_date, fetcher_name, fetcher_version) > (?, ?, ?)" | "AND (discovery_date, fetcher_name, fetcher_version) > (?, ?, ?)" | ||||
) | ) | ||||
def raw_extrinsic_metadata_get_after_date_and_fetcher( | def raw_extrinsic_metadata_get_after_date_and_fetcher( | ||||
self, | self, | ||||
id: str, | id: str, | ||||
authority_type: str, | authority_type: str, | ||||
authority_url: str, | authority_url: str, | ||||
after_date: datetime.datetime, | after_date: datetime.datetime, | ||||
after_fetcher_name: str, | after_fetcher_name: str, | ||||
after_fetcher_version: str, | after_fetcher_version: str, | ||||
*, | *, | ||||
statement, | statement, | ||||
): | ) -> Iterable[RawExtrinsicMetadataRow]: | ||||
return self._execute_with_retries( | return map( | ||||
RawExtrinsicMetadataRow.from_dict, | |||||
self._execute_with_retries( | |||||
statement, | statement, | ||||
[ | [ | ||||
id, | id, | ||||
authority_type, | authority_type, | ||||
authority_url, | authority_url, | ||||
after_date, | after_date, | ||||
after_fetcher_name, | after_fetcher_name, | ||||
after_fetcher_version, | after_fetcher_version, | ||||
], | ], | ||||
), | |||||
) | ) | ||||
@_prepared_statement( | @_prepared_statement( | ||||
"SELECT * from raw_extrinsic_metadata " | "SELECT * from raw_extrinsic_metadata " | ||||
"WHERE id=? AND authority_url=? AND authority_type=?" | "WHERE id=? AND authority_url=? AND authority_type=?" | ||||
) | ) | ||||
def raw_extrinsic_metadata_get( | def raw_extrinsic_metadata_get( | ||||
self, id: str, authority_type: str, authority_url: str, *, statement | self, id: str, authority_type: str, authority_url: str, *, statement | ||||
) -> Iterable[Row]: | ) -> Iterable[RawExtrinsicMetadataRow]: | ||||
return self._execute_with_retries( | return map( | ||||
statement, [id, authority_url, authority_type] | RawExtrinsicMetadataRow.from_dict, | ||||
self._execute_with_retries(statement, [id, authority_url, authority_type]), | |||||
) | ) | ||||
########################## | ########################## | ||||
# Miscellaneous | # Miscellaneous | ||||
########################## | ########################## | ||||
@_prepared_statement("SELECT uuid() FROM revision LIMIT 1;") | @_prepared_statement("SELECT uuid() FROM revision LIMIT 1;") | ||||
def check_read(self, *, statement): | def check_read(self, *, statement): | ||||
self._execute_with_retries(statement, []) | self._execute_with_retries(statement, []) | ||||
@_prepared_statement( | @_prepared_statement("SELECT * FROM object_count WHERE partition_key=0") | ||||
"SELECT object_type, count FROM object_count WHERE partition_key=0" | |||||
) | |||||
def stat_counters(self, *, statement) -> ResultSet: | def stat_counters(self, *, statement) -> ResultSet: | ||||
return self._execute_with_retries(statement, []) | return map(ObjectCountRow.from_dict, self._execute_with_retries(statement, [])) |