diff --git a/README.md b/README.md index 4aee17fa..5e4ce945 100644 --- a/README.md +++ b/README.md @@ -1,200 +1,223 @@ swh-storage =========== Abstraction layer over the archive, allowing to access all stored source code artifacts as well as their metadata. See the [documentation](https://docs.softwareheritage.org/devel/swh-storage/index.html) for more details. ## Quick start ### Dependencies Python tests for this module include tests that cannot be run without a local Postgresql database, so you need the Postgresql server executable on your machine (no need to have a running Postgresql server). They also expect a cassandra server. #### Debian-like host ``` $ sudo apt install libpq-dev postgresql-11 cassandra ``` #### Non Debian-like host The tests expects the path to `cassandra` to either be unspecified, it is then looked up at `/usr/sbin/cassandra`, either specified through the environment variable `SWH_CASSANDRA_BIN`. Optionally, you can avoid running the cassandra tests. ``` (swh) :~/swh-storage$ tox -- -m 'not cassandra' ``` ### Installation It is strongly recommended to use a virtualenv. In the following, we consider you work in a virtualenv named `swh`. See the [developer setup guide](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup) for a more details on how to setup a working environment. You can install the package directly from [pypi](https://pypi.org/p/swh.storage): ``` (swh) :~$ pip install swh.storage [...] ``` Or from sources: ``` (swh) :~$ git clone https://forge.softwareheritage.org/source/swh-storage.git [...] (swh) :~$ cd swh-storage (swh) :~/swh-storage$ pip install . [...] ``` Then you can check it's properly installed: ``` (swh) :~$ swh storage --help Usage: swh storage [OPTIONS] COMMAND [ARGS]... Software Heritage Storage tools. Options: -h, --help Show this message and exit. Commands: rpc-serve Software Heritage Storage RPC server. ``` ## Tests The best way of running Python tests for this module is to use [tox](https://tox.readthedocs.io/). ``` (swh) :~$ pip install tox ``` ### tox From the sources directory, simply use tox: ``` (swh) :~/swh-storage$ tox [...] ========= 315 passed, 6 skipped, 15 warnings in 40.86 seconds ========== _______________________________ summary ________________________________ flake8: commands succeeded py3: commands succeeded congratulations :) ``` Note: it is possible to set the `JAVA_HOME` environment variable to specify the version of the JVM to be used by Cassandra. For example, at the time of writing this, Cassandra does not support java 14, so one may want to use for example java 11: ``` (swh) :~/swh-storage$ export JAVA_HOME=/usr/lib/jvm/java-14-openjdk-amd64/bin/java (swh) :~/swh-storage$ tox [...] ``` ## Development The storage server can be locally started. It requires a configuration file and a running Postgresql database. ### Sample configuration A typical configuration `storage.yml` file is: ``` storage: - cls: local + cls: postgresql db: "dbname=softwareheritage-dev user= password=" objstorage: cls: pathslicing root: /tmp/swh-storage/ slicing: 0:2/2:4/4:6 ``` which means, this uses: - a local storage instance whose db connection is to `softwareheritage-dev` local instance, - the objstorage uses a local objstorage instance whose: - `root` path is /tmp/swh-storage, - slicing scheme is `0:2/2:4/4:6`. This means that the identifier of the content (sha1) which will be stored on disk at first level with the first 2 hex characters, the second level with the next 2 hex characters and the third level with the next 2 hex characters. And finally the complete hash file holding the raw content. For example: 00062f8bd330715c4f819373653d97b3cd34394c will be stored at 00/06/2f/00062f8bd330715c4f819373653d97b3cd34394c Note that the `root` path should exist on disk before starting the server. ### Starting the storage server If the python package has been properly installed (e.g. in a virtual env), you should be able to use the command: ``` (swh) :~/swh-storage$ swh storage rpc-serve storage.yml ``` This runs a local swh-storage api at 5002 port. ``` (swh) :~/swh-storage$ curl http://127.0.0.1:5002 Software Heritage storage server

You have reached the Software Heritage storage server.
See its documentation and API for more information

``` ### And then what? In your upper layer ([loader-git](https://forge.softwareheritage.org/source/swh-loader-git/), [loader-svn](https://forge.softwareheritage.org/source/swh-loader-svn/), etc...), you can define a remote storage with this snippet of yaml configuration. ``` storage: cls: remote url: http://localhost:5002/ ``` -You could directly define a local storage with the following snippet: +You could directly define a postgresql storage with the following snippet: ``` storage: - cls: local + cls: postgresql db: service=swh-dev objstorage: cls: pathslicing root: /home/storage/swh-storage/ slicing: 0:2/2:4/4:6 ``` + +## Cassandra + +As an alternative to PostgreSQL, swh-storage can use Cassandra as a database backend. +It can be used like this: + +``` +storage: + cls: cassandra + hosts: + - localhost + objstorage: + cls: pathslicing + root: /home/storage/swh-storage/ + slicing: 0:2/2:4/4:6 +``` + +The Cassandra swh-storage implementation supports both Cassandra >= 4.0-alpha2 +and ScyllaDB >= 4.4 (and possibly earlier versions, but this is untested). + +While the main code supports both transparently, running tests +or configuring the schema requires specific code when using ScyllaDB, +enabled by setting the `SWH_USE_SCYLLADB=1` environment variable. diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py index 7af660fc..11f422c5 100644 --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -1,1205 +1,1219 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter import dataclasses import datetime import functools import logging import random from typing import ( Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, Type, TypeVar, Union, ) from cassandra import CoordinationFailure from cassandra.cluster import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, ResultSet from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy from cassandra.query import BoundStatement, PreparedStatement, dict_factory from mypy_extensions import NamedArg from tenacity import ( retry, retry_if_exception_type, stop_after_attempt, wait_random_exponential, ) from swh.model.identifiers import CoreSWHID from swh.model.model import ( Content, Person, Sha1Git, SkippedContent, Timestamp, TimestampWithTimezone, ) from swh.storage.interface import ListOrder from ..utils import remove_keys from .common import TOKEN_BEGIN, TOKEN_END, hash_url from .model import ( MAGIC_NULL_PK, BaseRow, ContentRow, DirectoryEntryRow, DirectoryRow, ExtIDByTargetRow, ExtIDRow, MetadataAuthorityRow, MetadataFetcherRow, ObjectCountRow, OriginRow, OriginVisitRow, OriginVisitStatusRow, RawExtrinsicMetadataRow, ReleaseRow, RevisionParentRow, RevisionRow, SkippedContentRow, SnapshotBranchRow, SnapshotRow, content_index_table_name, ) from .schema import CREATE_TABLES_QUERIES, HASH_ALGORITHMS logger = logging.getLogger(__name__) _execution_profiles = { EXEC_PROFILE_DEFAULT: ExecutionProfile( load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy()), row_factory=dict_factory, ), } # Configuration for cassandra-driver's access to servers: # * hit the right server directly when sending a query (TokenAwarePolicy), # * if there's more than one, then pick one at random that's in the same # datacenter as the client (DCAwareRoundRobinPolicy) def create_keyspace( hosts: List[str], keyspace: str, port: int = 9042, *, durable_writes=True ): cluster = Cluster(hosts, port=port, execution_profiles=_execution_profiles) session = cluster.connect() extra_params = "" if not durable_writes: extra_params = "AND durable_writes = false" session.execute( """CREATE KEYSPACE IF NOT EXISTS "%s" WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 } %s; """ % (keyspace, extra_params) ) session.execute('USE "%s"' % keyspace) for query in CREATE_TABLES_QUERIES: session.execute(query) TRet = TypeVar("TRet") def _prepared_statement( query: str, ) -> Callable[[Callable[..., TRet]], Callable[..., TRet]]: """Returns a decorator usable on methods of CqlRunner, to inject them with a 'statement' argument, that is a prepared statement corresponding to the query. This only works on methods of CqlRunner, as preparing a statement requires a connection to a Cassandra server.""" def decorator(f): @functools.wraps(f) def newf(self, *args, **kwargs) -> TRet: if f.__name__ not in self._prepared_statements: statement: PreparedStatement = self._session.prepare(query) self._prepared_statements[f.__name__] = statement return f( self, *args, **kwargs, statement=self._prepared_statements[f.__name__] ) return newf return decorator TArg = TypeVar("TArg") TSelf = TypeVar("TSelf") def _prepared_insert_statement( row_class: Type[BaseRow], ) -> Callable[ [Callable[[TSelf, TArg, NamedArg(Any, "statement")], TRet]], # noqa Callable[[TSelf, TArg], TRet], ]: """Shorthand for using `_prepared_statement` for `INSERT INTO` statements.""" columns = row_class.cols() return _prepared_statement( "INSERT INTO %s (%s) VALUES (%s)" % (row_class.TABLE, ", ".join(columns), ", ".join("?" for _ in columns),) ) def _prepared_exists_statement( table_name: str, ) -> Callable[ [Callable[[TSelf, TArg, NamedArg(Any, "statement")], TRet]], # noqa Callable[[TSelf, TArg], TRet], ]: """Shorthand for using `_prepared_statement` for queries that only check which ids in a list exist in the table.""" return _prepared_statement(f"SELECT id FROM {table_name} WHERE id IN ?") def _prepared_select_statement( row_class: Type[BaseRow], clauses: str = "", cols: Optional[List[str]] = None, ) -> Callable[[Callable[..., TRet]], Callable[..., TRet]]: if cols is None: cols = row_class.cols() return _prepared_statement( f"SELECT {', '.join(cols)} FROM {row_class.TABLE} {clauses}" ) def _prepared_select_statements( row_class: Type[BaseRow], queries: Dict[Any, str], ) -> Callable[[Callable[..., TRet]], Callable[..., TRet]]: """Like _prepared_statement, but supports multiple statements, passed a dict, and passes a dict of prepared statements to the decorated method""" cols = row_class.cols() statement_start = f"SELECT {', '.join(cols)} FROM {row_class.TABLE} " def decorator(f): @functools.wraps(f) def newf(self, *args, **kwargs) -> TRet: if f.__name__ not in self._prepared_statements: self._prepared_statements[f.__name__] = { key: self._session.prepare(statement_start + query) for (key, query) in queries.items() } return f( self, *args, **kwargs, statements=self._prepared_statements[f.__name__] ) return newf return decorator def _next_bytes_value(value: bytes) -> bytes: """Returns the next bytes value by incrementing the integer representation of the provided value and converting it back to bytes. For instance when prefix is b"abcd", it returns b"abce". """ next_value_int = int.from_bytes(value, byteorder="big") + 1 return next_value_int.to_bytes( (next_value_int.bit_length() + 7) // 8, byteorder="big" ) class CqlRunner: """Class managing prepared statements and building queries to be sent to Cassandra.""" def __init__(self, hosts: List[str], keyspace: str, port: int): self._cluster = Cluster( hosts, port=port, execution_profiles=_execution_profiles ) self._session = self._cluster.connect(keyspace) self._cluster.register_user_type( keyspace, "microtimestamp_with_timezone", TimestampWithTimezone ) self._cluster.register_user_type(keyspace, "microtimestamp", Timestamp) self._cluster.register_user_type(keyspace, "person", Person) # directly a PreparedStatement for methods decorated with # @_prepared_statements (and its wrappers, _prepared_insert_statement, # _prepared_exists_statement, and _prepared_select_statement); # and a dict of PreparedStatements with @_prepared_select_statements self._prepared_statements: Dict[ str, Union[PreparedStatement, Dict[Any, PreparedStatement]] ] = {} ########################## # Common utility functions ########################## MAX_RETRIES = 3 @retry( wait=wait_random_exponential(multiplier=1, max=10), stop=stop_after_attempt(MAX_RETRIES), retry=retry_if_exception_type(CoordinationFailure), ) def _execute_with_retries(self, statement, args) -> ResultSet: return self._session.execute(statement, args, timeout=1000.0) @_prepared_statement( "UPDATE object_count SET count = count + ? " "WHERE partition_key = 0 AND object_type = ?" ) def _increment_counter( self, object_type: str, nb: int, *, statement: PreparedStatement ) -> None: self._execute_with_retries(statement, [nb, object_type]) def _add_one(self, statement, obj: BaseRow) -> None: self._increment_counter(obj.TABLE, 1) self._execute_with_retries(statement, dataclasses.astuple(obj)) _T = TypeVar("_T", bound=BaseRow) def _get_random_row(self, row_class: Type[_T], statement) -> Optional[_T]: # noqa """Takes a prepared statement of the form "SELECT * FROM WHERE token() > ? LIMIT 1" and uses it to return a random row""" token = random.randint(TOKEN_BEGIN, TOKEN_END) rows = self._execute_with_retries(statement, [token]) if not rows: # There are no row with a greater token; wrap around to get # the row with the smallest token rows = self._execute_with_retries(statement, [TOKEN_BEGIN]) if rows: return row_class.from_dict(rows.one()) # type: ignore else: return None def _missing(self, statement, ids): rows = self._execute_with_retries(statement, [ids]) found_ids = {row["id"] for row in rows} return [id_ for id_ in ids if id_ not in found_ids] ########################## # 'content' table ########################## def _content_add_finalize(self, statement: BoundStatement) -> None: """Returned currified by content_add_prepare, to be called when the content row should be added to the primary table.""" self._execute_with_retries(statement, None) self._increment_counter("content", 1) @_prepared_insert_statement(ContentRow) def content_add_prepare( self, content: ContentRow, *, statement ) -> Tuple[int, Callable[[], None]]: """Prepares insertion of a Content to the main 'content' table. Returns a token (to be used in secondary tables), and a function to be called to perform the insertion in the main table.""" statement = statement.bind(dataclasses.astuple(content)) # Type used for hashing keys (usually, it will be # cassandra.metadata.Murmur3Token) token_class = self._cluster.metadata.token_map.token_class # Token of the row when it will be inserted. This is equivalent to # "SELECT token({', '.join(ContentRow.PARTITION_KEY)}) FROM content WHERE ..." # after the row is inserted; but we need the token to insert in the # index tables *before* inserting to the main 'content' table token = token_class.from_key(statement.routing_key).value assert TOKEN_BEGIN <= token <= TOKEN_END # Function to be called after the indexes contain their respective # row finalizer = functools.partial(self._content_add_finalize, statement) return (token, finalizer) @_prepared_select_statement( ContentRow, f"WHERE {' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))}" ) def content_get_from_pk( self, content_hashes: Dict[str, bytes], *, statement ) -> Optional[ContentRow]: rows = list( self._execute_with_retries( statement, [content_hashes[algo] for algo in HASH_ALGORITHMS] ) ) assert len(rows) <= 1 if rows: return ContentRow(**rows[0]) else: return None @_prepared_select_statement( ContentRow, f"WHERE token({', '.join(ContentRow.PARTITION_KEY)}) = ?" ) def content_get_from_token(self, token, *, statement) -> Iterable[ContentRow]: return map(ContentRow.from_dict, self._execute_with_retries(statement, [token])) @_prepared_select_statement( ContentRow, f"WHERE token({', '.join(ContentRow.PARTITION_KEY)}) > ? LIMIT 1" ) def content_get_random(self, *, statement) -> Optional[ContentRow]: return self._get_random_row(ContentRow, statement) @_prepared_statement( """ SELECT token({pk}) AS tok, {cols} FROM {table} WHERE token({pk}) >= ? AND token({pk}) <= ? LIMIT ? """.format( pk=", ".join(ContentRow.PARTITION_KEY), cols=", ".join(ContentRow.cols()), table=ContentRow.TABLE, ) ) def content_get_token_range( self, start: int, end: int, limit: int, *, statement ) -> Iterable[Tuple[int, ContentRow]]: """Returns an iterable of (token, row)""" return ( (row["tok"], ContentRow.from_dict(remove_keys(row, ("tok",)))) for row in self._execute_with_retries(statement, [start, end, limit]) ) ########################## # 'content_by_*' tables ########################## @_prepared_statement( f""" SELECT sha1_git AS id FROM {content_index_table_name("sha1_git", skipped_content=False)} WHERE sha1_git IN ? """ ) def content_missing_by_sha1_git( self, ids: List[bytes], *, statement ) -> List[bytes]: return self._missing(statement, ids) def content_index_add_one(self, algo: str, content: Content, token: int) -> None: """Adds a row mapping content[algo] to the token of the Content in the main 'content' table.""" query = f""" INSERT INTO {content_index_table_name(algo, skipped_content=False)} ({algo}, target_token) VALUES (%s, %s) """ self._execute_with_retries(query, [content.get_hash(algo), token]) def content_get_tokens_from_single_hash( self, algo: str, hash_: bytes ) -> Iterable[int]: assert algo in HASH_ALGORITHMS query = f""" SELECT target_token FROM {content_index_table_name(algo, skipped_content=False)} WHERE {algo} = %s """ return ( row["target_token"] for row in self._execute_with_retries(query, [hash_]) ) ########################## # 'skipped_content' table ########################## def _skipped_content_add_finalize(self, statement: BoundStatement) -> None: """Returned currified by skipped_content_add_prepare, to be called when the content row should be added to the primary table.""" self._execute_with_retries(statement, None) self._increment_counter("skipped_content", 1) @_prepared_insert_statement(SkippedContentRow) def skipped_content_add_prepare( self, content, *, statement ) -> Tuple[int, Callable[[], None]]: """Prepares insertion of a Content to the main 'skipped_content' table. Returns a token (to be used in secondary tables), and a function to be called to perform the insertion in the main table.""" # Replace NULLs (which are not allowed in the partition key) with # an empty byte string for key in SkippedContentRow.PARTITION_KEY: if getattr(content, key) is None: setattr(content, key, MAGIC_NULL_PK) statement = statement.bind(dataclasses.astuple(content)) # Type used for hashing keys (usually, it will be # cassandra.metadata.Murmur3Token) token_class = self._cluster.metadata.token_map.token_class # Token of the row when it will be inserted. This is equivalent to # "SELECT token({', '.join(SkippedContentRow.PARTITION_KEY)}) # FROM skipped_content WHERE ..." # after the row is inserted; but we need the token to insert in the # index tables *before* inserting to the main 'skipped_content' table token = token_class.from_key(statement.routing_key).value assert TOKEN_BEGIN <= token <= TOKEN_END # Function to be called after the indexes contain their respective # row finalizer = functools.partial(self._skipped_content_add_finalize, statement) return (token, finalizer) @_prepared_select_statement( SkippedContentRow, f"WHERE {' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))}", ) def skipped_content_get_from_pk( self, content_hashes: Dict[str, bytes], *, statement ) -> Optional[SkippedContentRow]: rows = list( self._execute_with_retries( statement, [content_hashes[algo] or MAGIC_NULL_PK for algo in HASH_ALGORITHMS], ) ) assert len(rows) <= 1 if rows: return SkippedContentRow.from_dict(rows[0]) else: return None @_prepared_select_statement( SkippedContentRow, f"WHERE token({', '.join(SkippedContentRow.PARTITION_KEY)}) = ?", ) def skipped_content_get_from_token( self, token, *, statement ) -> Iterable[SkippedContentRow]: return map( SkippedContentRow.from_dict, self._execute_with_retries(statement, [token]) ) ########################## # 'skipped_content_by_*' tables ########################## def skipped_content_index_add_one( self, algo: str, content: SkippedContent, token: int ) -> None: """Adds a row mapping content[algo] to the token of the SkippedContent in the main 'skipped_content' table.""" query = ( f"INSERT INTO skipped_content_by_{algo} ({algo}, target_token) " f"VALUES (%s, %s)" ) self._execute_with_retries( query, [content.get_hash(algo) or MAGIC_NULL_PK, token] ) def skipped_content_get_tokens_from_single_hash( self, algo: str, hash_: bytes ) -> Iterable[int]: assert algo in HASH_ALGORITHMS query = f""" SELECT target_token FROM {content_index_table_name(algo, skipped_content=True)} WHERE {algo} = %s """ return ( row["target_token"] for row in self._execute_with_retries(query, [hash_]) ) ########################## # 'revision' table ########################## @_prepared_exists_statement("revision") def revision_missing(self, ids: List[bytes], *, statement) -> List[bytes]: return self._missing(statement, ids) @_prepared_insert_statement(RevisionRow) def revision_add_one(self, revision: RevisionRow, *, statement) -> None: self._add_one(statement, revision) @_prepared_statement(f"SELECT id FROM {RevisionRow.TABLE} WHERE id IN ?") def revision_get_ids(self, revision_ids, *, statement) -> Iterable[int]: return ( row["id"] for row in self._execute_with_retries(statement, [revision_ids]) ) @_prepared_select_statement(RevisionRow, "WHERE id IN ?") def revision_get( self, revision_ids: List[Sha1Git], *, statement ) -> Iterable[RevisionRow]: return map( RevisionRow.from_dict, self._execute_with_retries(statement, [revision_ids]) ) @_prepared_select_statement(RevisionRow, "WHERE token(id) > ? LIMIT 1") def revision_get_random(self, *, statement) -> Optional[RevisionRow]: return self._get_random_row(RevisionRow, statement) ########################## # 'revision_parent' table ########################## @_prepared_insert_statement(RevisionParentRow) def revision_parent_add_one( self, revision_parent: RevisionParentRow, *, statement ) -> None: self._add_one(statement, revision_parent) @_prepared_statement( f"SELECT parent_id FROM {RevisionParentRow.TABLE} WHERE id = ?" ) def revision_parent_get( self, revision_id: Sha1Git, *, statement ) -> Iterable[bytes]: return ( row["parent_id"] for row in self._execute_with_retries(statement, [revision_id]) ) ########################## # 'release' table ########################## @_prepared_exists_statement("release") def release_missing(self, ids: List[bytes], *, statement) -> List[bytes]: return self._missing(statement, ids) @_prepared_insert_statement(ReleaseRow) def release_add_one(self, release: ReleaseRow, *, statement) -> None: self._add_one(statement, release) @_prepared_select_statement(ReleaseRow, "WHERE id in ?") def release_get(self, release_ids: List[str], *, statement) -> Iterable[ReleaseRow]: return map( ReleaseRow.from_dict, self._execute_with_retries(statement, [release_ids]) ) @_prepared_select_statement(ReleaseRow, "WHERE token(id) > ? LIMIT 1") def release_get_random(self, *, statement) -> Optional[ReleaseRow]: return self._get_random_row(ReleaseRow, statement) ########################## # 'directory' table ########################## @_prepared_exists_statement("directory") def directory_missing(self, ids: List[bytes], *, statement) -> List[bytes]: return self._missing(statement, ids) @_prepared_insert_statement(DirectoryRow) def directory_add_one(self, directory: DirectoryRow, *, statement) -> None: """Called after all calls to directory_entry_add_one, to commit/finalize the directory.""" self._add_one(statement, directory) @_prepared_select_statement(DirectoryRow, "WHERE token(id) > ? LIMIT 1") def directory_get_random(self, *, statement) -> Optional[DirectoryRow]: return self._get_random_row(DirectoryRow, statement) ########################## # 'directory_entry' table ########################## @_prepared_insert_statement(DirectoryEntryRow) def directory_entry_add_one(self, entry: DirectoryEntryRow, *, statement) -> None: self._add_one(statement, entry) @_prepared_select_statement(DirectoryEntryRow, "WHERE directory_id IN ?") def directory_entry_get( self, directory_ids, *, statement ) -> Iterable[DirectoryEntryRow]: return map( DirectoryEntryRow.from_dict, self._execute_with_retries(statement, [directory_ids]), ) @_prepared_select_statement( DirectoryEntryRow, "WHERE directory_id = ? AND name >= ? LIMIT ?" ) def directory_entry_get_from_name( self, directory_id: Sha1Git, from_: bytes, limit: int, *, statement ) -> Iterable[DirectoryEntryRow]: return map( DirectoryEntryRow.from_dict, self._execute_with_retries(statement, [directory_id, from_, limit]), ) ########################## # 'snapshot' table ########################## @_prepared_exists_statement("snapshot") def snapshot_missing(self, ids: List[bytes], *, statement) -> List[bytes]: return self._missing(statement, ids) @_prepared_insert_statement(SnapshotRow) def snapshot_add_one(self, snapshot: SnapshotRow, *, statement) -> None: self._add_one(statement, snapshot) @_prepared_select_statement(SnapshotRow, "WHERE token(id) > ? LIMIT 1") def snapshot_get_random(self, *, statement) -> Optional[SnapshotRow]: return self._get_random_row(SnapshotRow, statement) ########################## # 'snapshot_branch' table ########################## @_prepared_insert_statement(SnapshotBranchRow) def snapshot_branch_add_one(self, branch: SnapshotBranchRow, *, statement) -> None: self._add_one(statement, branch) @_prepared_statement( f""" SELECT ascii_bins_count(target_type) AS counts FROM {SnapshotBranchRow.TABLE} WHERE snapshot_id = ? AND name >= ? """ ) def snapshot_count_branches_from_name( self, snapshot_id: Sha1Git, from_: bytes, *, statement ) -> Dict[Optional[str], int]: row = self._execute_with_retries(statement, [snapshot_id, from_]).one() (nb_none, counts) = row["counts"] return {None: nb_none, **counts} @_prepared_statement( f""" SELECT ascii_bins_count(target_type) AS counts FROM {SnapshotBranchRow.TABLE} WHERE snapshot_id = ? AND name < ? """ ) def snapshot_count_branches_before_name( self, snapshot_id: Sha1Git, before: bytes, *, statement, ) -> Dict[Optional[str], int]: row = self._execute_with_retries(statement, [snapshot_id, before]).one() (nb_none, counts) = row["counts"] return {None: nb_none, **counts} def snapshot_count_branches( self, snapshot_id: Sha1Git, branch_name_exclude_prefix: Optional[bytes] = None, ) -> Dict[Optional[str], int]: """Returns a dictionary from type names to the number of branches of that type.""" prefix = branch_name_exclude_prefix if prefix is None: return self.snapshot_count_branches_from_name(snapshot_id, b"") else: # counts branches before exclude prefix counts = Counter( self.snapshot_count_branches_before_name(snapshot_id, prefix) ) # no need to execute that part if each bit of the prefix equals 1 if prefix.replace(b"\xff", b"") != b"": # counts branches after exclude prefix and update counters counts.update( self.snapshot_count_branches_from_name( snapshot_id, _next_bytes_value(prefix) ) ) return counts @_prepared_select_statement( SnapshotBranchRow, "WHERE snapshot_id = ? AND name >= ? LIMIT ?" ) def snapshot_branch_get_from_name( self, snapshot_id: Sha1Git, from_: bytes, limit: int, *, statement ) -> Iterable[SnapshotBranchRow]: return map( SnapshotBranchRow.from_dict, self._execute_with_retries(statement, [snapshot_id, from_, limit]), ) @_prepared_select_statement( SnapshotBranchRow, "WHERE snapshot_id = ? AND name >= ? AND name < ? LIMIT ?" ) def snapshot_branch_get_range( self, snapshot_id: Sha1Git, from_: bytes, before: bytes, limit: int, *, statement, ) -> Iterable[SnapshotBranchRow]: return map( SnapshotBranchRow.from_dict, self._execute_with_retries(statement, [snapshot_id, from_, before, limit]), ) def snapshot_branch_get( self, snapshot_id: Sha1Git, from_: bytes, limit: int, branch_name_exclude_prefix: Optional[bytes] = None, ) -> Iterable[SnapshotBranchRow]: prefix = branch_name_exclude_prefix if prefix is None: return self.snapshot_branch_get_from_name(snapshot_id, from_, limit) else: # get branches before the exclude prefix branches = list( self.snapshot_branch_get_range(snapshot_id, from_, prefix, limit) ) nb_branches = len(branches) # no need to execute that part if limit is reached # or if each bit of the prefix equals 1 if nb_branches < limit and prefix.replace(b"\xff", b"") != b"": # get branches after the exclude prefix and update list to return branches.extend( self.snapshot_branch_get_from_name( snapshot_id, _next_bytes_value(prefix), limit - nb_branches ) ) return branches ########################## # 'origin' table ########################## @_prepared_insert_statement(OriginRow) def origin_add_one(self, origin: OriginRow, *, statement) -> None: self._add_one(statement, origin) @_prepared_select_statement(OriginRow, "WHERE sha1 = ?") def origin_get_by_sha1(self, sha1: bytes, *, statement) -> Iterable[OriginRow]: return map(OriginRow.from_dict, self._execute_with_retries(statement, [sha1])) def origin_get_by_url(self, url: str) -> Iterable[OriginRow]: return self.origin_get_by_sha1(hash_url(url)) @_prepared_statement( f""" SELECT token(sha1) AS tok, {", ".join(OriginRow.cols())} FROM {OriginRow.TABLE} WHERE token(sha1) >= ? LIMIT ? """ ) def origin_list( self, start_token: int, limit: int, *, statement ) -> Iterable[Tuple[int, OriginRow]]: """Returns an iterable of (token, origin)""" return ( (row["tok"], OriginRow.from_dict(remove_keys(row, ("tok",)))) for row in self._execute_with_retries(statement, [start_token, limit]) ) @_prepared_select_statement(OriginRow) def origin_iter_all(self, *, statement) -> Iterable[OriginRow]: return map(OriginRow.from_dict, self._execute_with_retries(statement, [])) @_prepared_statement(f"SELECT next_visit_id FROM {OriginRow.TABLE} WHERE sha1 = ?") def _origin_get_next_visit_id(self, origin_sha1: bytes, *, statement) -> int: rows = list(self._execute_with_retries(statement, [origin_sha1])) assert len(rows) == 1 # TODO: error handling return rows[0]["next_visit_id"] @_prepared_statement( f""" UPDATE {OriginRow.TABLE} SET next_visit_id=? WHERE sha1 = ? IF next_visit_id=? """ ) def origin_generate_unique_visit_id(self, origin_url: str, *, statement) -> int: origin_sha1 = hash_url(origin_url) next_id = self._origin_get_next_visit_id(origin_sha1) while True: res = list( self._execute_with_retries( statement, [next_id + 1, origin_sha1, next_id] ) ) assert len(res) == 1 if res[0]["[applied]"]: # No data race return next_id else: # Someone else updated it before we did, let's try again next_id = res[0]["next_visit_id"] # TODO: abort after too many attempts return next_id ########################## # 'origin_visit' table ########################## @_prepared_select_statements( OriginVisitRow, { (True, ListOrder.ASC): ( "WHERE origin = ? AND visit > ? ORDER BY visit ASC LIMIT ?" ), (True, ListOrder.DESC): ( "WHERE origin = ? AND visit < ? ORDER BY visit DESC LIMIT ?" ), (False, ListOrder.ASC): "WHERE origin = ? ORDER BY visit ASC LIMIT ?", (False, ListOrder.DESC): "WHERE origin = ? ORDER BY visit DESC LIMIT ?", }, ) def origin_visit_get( self, origin_url: str, last_visit: Optional[int], limit: int, order: ListOrder, *, statements, ) -> Iterable[OriginVisitRow]: args: List[Any] = [origin_url] if last_visit is not None: args.append(last_visit) args.append(limit) statement = statements[(last_visit is not None, order)] return map( OriginVisitRow.from_dict, self._execute_with_retries(statement, args) ) @_prepared_insert_statement(OriginVisitRow) def origin_visit_add_one(self, visit: OriginVisitRow, *, statement) -> None: self._add_one(statement, visit) @_prepared_select_statement(OriginVisitRow, "WHERE origin = ? AND visit = ?") def origin_visit_get_one( self, origin_url: str, visit_id: int, *, statement ) -> Optional[OriginVisitRow]: # TODO: error handling rows = list(self._execute_with_retries(statement, [origin_url, visit_id])) if rows: return OriginVisitRow.from_dict(rows[0]) else: return None @_prepared_select_statement(OriginVisitRow, "WHERE origin = ?") def origin_visit_get_all( self, origin_url: str, *, statement ) -> Iterable[OriginVisitRow]: return map( OriginVisitRow.from_dict, self._execute_with_retries(statement, [origin_url]), ) @_prepared_select_statement(OriginVisitRow, "WHERE token(origin) >= ?") def _origin_visit_iter_from( self, min_token: int, *, statement ) -> Iterable[OriginVisitRow]: return map( OriginVisitRow.from_dict, self._execute_with_retries(statement, [min_token]) ) @_prepared_select_statement(OriginVisitRow, "WHERE token(origin) < ?") def _origin_visit_iter_to( self, max_token: int, *, statement ) -> Iterable[OriginVisitRow]: return map( OriginVisitRow.from_dict, self._execute_with_retries(statement, [max_token]) ) def origin_visit_iter(self, start_token: int) -> Iterator[OriginVisitRow]: """Returns all origin visits in order from this token, and wraps around the token space.""" yield from self._origin_visit_iter_from(start_token) yield from self._origin_visit_iter_to(start_token) ########################## # 'origin_visit_status' table ########################## @_prepared_select_statements( OriginVisitStatusRow, { (True, ListOrder.ASC): ( "WHERE origin = ? AND visit = ? AND date >= ? " "ORDER BY visit ASC LIMIT ?" ), (True, ListOrder.DESC): ( "WHERE origin = ? AND visit = ? AND date <= ? " "ORDER BY visit DESC LIMIT ?" ), (False, ListOrder.ASC): ( "WHERE origin = ? AND visit = ? ORDER BY visit ASC LIMIT ?" ), (False, ListOrder.DESC): ( "WHERE origin = ? AND visit = ? ORDER BY visit DESC LIMIT ?" ), }, ) def origin_visit_status_get_range( self, origin: str, visit: int, date_from: Optional[datetime.datetime], limit: int, order: ListOrder, *, statements, ) -> Iterable[OriginVisitStatusRow]: args: List[Any] = [origin, visit] if date_from is not None: args.append(date_from) args.append(limit) statement = statements[(date_from is not None, order)] return map( OriginVisitStatusRow.from_dict, self._execute_with_retries(statement, args) ) @_prepared_insert_statement(OriginVisitStatusRow) def origin_visit_status_add_one( self, visit_update: OriginVisitStatusRow, *, statement ) -> None: self._add_one(statement, visit_update) def origin_visit_status_get_latest( self, origin: str, visit: int, ) -> Optional[OriginVisitStatusRow]: """Given an origin visit id, return its latest origin_visit_status """ return next(self.origin_visit_status_get(origin, visit), None) @_prepared_select_statement( - OriginVisitStatusRow, "WHERE origin = ? AND visit = ? ORDER BY date DESC" + OriginVisitStatusRow, + # 'visit DESC,' is optional with Cassandra 4, but ScyllaDB needs it + "WHERE origin = ? AND visit = ? ORDER BY visit DESC, date DESC", ) def origin_visit_status_get( self, origin: str, visit: int, *, statement, ) -> Iterator[OriginVisitStatusRow]: """Return all origin visit statuses for a given visit """ return map( OriginVisitStatusRow.from_dict, self._execute_with_retries(statement, [origin, visit]), ) ########################## # 'metadata_authority' table ########################## @_prepared_insert_statement(MetadataAuthorityRow) def metadata_authority_add(self, authority: MetadataAuthorityRow, *, statement): self._add_one(statement, authority) @_prepared_select_statement(MetadataAuthorityRow, "WHERE type = ? AND url = ?") def metadata_authority_get( self, type, url, *, statement ) -> Optional[MetadataAuthorityRow]: rows = list(self._execute_with_retries(statement, [type, url])) if rows: return MetadataAuthorityRow.from_dict(rows[0]) else: return None ########################## # 'metadata_fetcher' table ########################## @_prepared_insert_statement(MetadataFetcherRow) def metadata_fetcher_add(self, fetcher, *, statement): self._add_one(statement, fetcher) @_prepared_select_statement(MetadataFetcherRow, "WHERE name = ? AND version = ?") def metadata_fetcher_get( self, name, version, *, statement ) -> Optional[MetadataFetcherRow]: rows = list(self._execute_with_retries(statement, [name, version])) if rows: return MetadataFetcherRow.from_dict(rows[0]) else: return None ######################### # 'raw_extrinsic_metadata' table ######################### @_prepared_insert_statement(RawExtrinsicMetadataRow) def raw_extrinsic_metadata_add(self, raw_extrinsic_metadata, *, statement): self._add_one(statement, raw_extrinsic_metadata) @_prepared_select_statement( RawExtrinsicMetadataRow, "WHERE target=? AND authority_url=? AND discovery_date>? AND authority_type=?", ) def raw_extrinsic_metadata_get_after_date( self, target: str, authority_type: str, authority_url: str, after: datetime.datetime, *, statement, ) -> Iterable[RawExtrinsicMetadataRow]: return map( RawExtrinsicMetadataRow.from_dict, self._execute_with_retries( statement, [target, authority_url, after, authority_type] ), ) @_prepared_select_statement( RawExtrinsicMetadataRow, - "WHERE target=? AND authority_type=? AND authority_url=? " - "AND (discovery_date, id) > (?, ?)", + # This is equivalent to: + # WHERE target=? AND authority_type = ? AND authority_url = ? " + # AND (discovery_date, id) > (?, ?)" + # but it needs to be written this way to work with ScyllaDB. + "WHERE target=? AND (authority_type, authority_url) <= (?, ?) " + "AND (authority_type, authority_url, discovery_date, id) > (?, ?, ?, ?)", ) def raw_extrinsic_metadata_get_after_date_and_id( self, target: str, authority_type: str, authority_url: str, after_date: datetime.datetime, after_id: bytes, *, statement, ) -> Iterable[RawExtrinsicMetadataRow]: return map( RawExtrinsicMetadataRow.from_dict, self._execute_with_retries( statement, - [target, authority_type, authority_url, after_date, after_id,], + [ + target, + authority_type, + authority_url, + authority_type, + authority_url, + after_date, + after_id, + ], ), ) @_prepared_select_statement( RawExtrinsicMetadataRow, "WHERE target=? AND authority_url=? AND authority_type=?", ) def raw_extrinsic_metadata_get( self, target: str, authority_type: str, authority_url: str, *, statement ) -> Iterable[RawExtrinsicMetadataRow]: return map( RawExtrinsicMetadataRow.from_dict, self._execute_with_retries( statement, [target, authority_url, authority_type] ), ) ########################## # 'extid' table ########################## def _extid_add_finalize(self, statement: BoundStatement) -> None: """Returned currified by extid_add_prepare, to be called when the extid row should be added to the primary table.""" self._execute_with_retries(statement, None) self._increment_counter("extid", 1) @_prepared_insert_statement(ExtIDRow) def extid_add_prepare( self, extid: ExtIDRow, *, statement ) -> Tuple[int, Callable[[], None]]: statement = statement.bind(dataclasses.astuple(extid)) token_class = self._cluster.metadata.token_map.token_class token = token_class.from_key(statement.routing_key).value assert TOKEN_BEGIN <= token <= TOKEN_END # Function to be called after the indexes contain their respective # row finalizer = functools.partial(self._extid_add_finalize, statement) return (token, finalizer) @_prepared_select_statement( ExtIDRow, "WHERE extid_type=? AND extid=? AND target_type=? AND target=?", ) def extid_get_from_pk( self, extid_type: str, extid: bytes, target: CoreSWHID, *, statement, ) -> Optional[ExtIDRow]: rows = list( self._execute_with_retries( statement, [extid_type, extid, target.object_type.value, target.object_id], ), ) assert len(rows) <= 1 if rows: return ExtIDRow(**rows[0]) else: return None @_prepared_select_statement( ExtIDRow, "WHERE token(extid_type, extid) = ?", ) def extid_get_from_token(self, token: int, *, statement) -> Iterable[ExtIDRow]: return map(ExtIDRow.from_dict, self._execute_with_retries(statement, [token]),) @_prepared_select_statement( ExtIDRow, "WHERE extid_type=? AND extid=?", ) def extid_get_from_extid( self, extid_type: str, extid: bytes, *, statement ) -> Iterable[ExtIDRow]: return map( ExtIDRow.from_dict, self._execute_with_retries(statement, [extid_type, extid]), ) def extid_get_from_target( self, target_type: str, target: bytes ) -> Iterable[ExtIDRow]: for token in self._extid_get_tokens_from_target(target_type, target): if token is not None: for extid in self.extid_get_from_token(token): # re-check the extid against target (in case of murmur3 collision) if ( extid is not None and extid.target_type == target_type and extid.target == target ): yield extid ########################## # 'extid_by_target' table ########################## @_prepared_insert_statement(ExtIDByTargetRow) def extid_index_add_one(self, row: ExtIDByTargetRow, *, statement) -> None: """Adds a row mapping extid[target_type, target] to the token of the ExtID in the main 'extid' table.""" self._add_one(statement, row) @_prepared_statement( f""" SELECT target_token FROM {ExtIDByTargetRow.TABLE} WHERE target_type = ? AND target = ? """ ) def _extid_get_tokens_from_target( self, target_type: str, target: bytes, *, statement ) -> Iterable[int]: return ( row["target_token"] for row in self._execute_with_retries(statement, [target_type, target]) ) ########################## # Miscellaneous ########################## @_prepared_statement("SELECT uuid() FROM revision LIMIT 1;") def check_read(self, *, statement): self._execute_with_retries(statement, []) @_prepared_select_statement(ObjectCountRow, "WHERE partition_key=0") def stat_counters(self, *, statement) -> Iterable[ObjectCountRow]: return map(ObjectCountRow.from_dict, self._execute_with_retries(statement, [])) diff --git a/swh/storage/cassandra/schema.py b/swh/storage/cassandra/schema.py index 19e62007..400b8fea 100644 --- a/swh/storage/cassandra/schema.py +++ b/swh/storage/cassandra/schema.py @@ -1,315 +1,334 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import os + +_use_scylla = bool(os.environ.get("SWH_USE_SCYLLADB", "")) + +UDF_LANGUAGE = "lua" if _use_scylla else "java" + +if UDF_LANGUAGE == "java": + # For Cassandra + CREATE_TABLES_QUERIES = [ + """ + CREATE OR REPLACE FUNCTION ascii_bins_count_sfunc ( + state tuple>, -- (nb_none, map) + bin_name ascii + ) + CALLED ON NULL INPUT + RETURNS tuple> + LANGUAGE java AS + $$ + if (bin_name == null) { + state.setInt(0, state.getInt(0) + 1); + } + else { + Map counters = state.getMap( + 1, String.class, Integer.class); + Integer nb = counters.get(bin_name); + if (nb == null) { + nb = 0; + } + counters.put(bin_name, nb + 1); + state.setMap(1, counters, String.class, Integer.class); + } + return state; + $$;""", + """ + CREATE OR REPLACE AGGREGATE ascii_bins_count ( ascii ) + SFUNC ascii_bins_count_sfunc + STYPE tuple> + INITCOND (0, {}) + ;""", + ] +elif UDF_LANGUAGE == "lua": + # For ScyllaDB + # TODO: this is not implementable yet, because ScyllaDB does not support + # user-defined aggregates. https://github.com/scylladb/scylla/issues/7201 + CREATE_TABLES_QUERIES = [] +else: + assert False, f"{UDF_LANGUAGE} must be 'lua' or 'java'" CREATE_TABLES_QUERIES = [ - """ -CREATE OR REPLACE FUNCTION ascii_bins_count_sfunc ( - state tuple>, -- (nb_none, map) - bin_name ascii -) -CALLED ON NULL INPUT -RETURNS tuple> -LANGUAGE java AS -$$ - if (bin_name == null) { - state.setInt(0, state.getInt(0) + 1); - } - else { - Map counters = state.getMap( - 1, String.class, Integer.class); - Integer nb = counters.get(bin_name); - if (nb == null) { - nb = 0; - } - counters.put(bin_name, nb + 1); - state.setMap(1, counters, String.class, Integer.class); - } - return state; -$$ -;""", - """ -CREATE OR REPLACE AGGREGATE ascii_bins_count ( ascii ) -SFUNC ascii_bins_count_sfunc -STYPE tuple> -INITCOND (0, {}) -;""", + *CREATE_TABLES_QUERIES, """ CREATE TYPE IF NOT EXISTS microtimestamp ( seconds bigint, microseconds int );""", """ CREATE TYPE IF NOT EXISTS microtimestamp_with_timezone ( timestamp frozen, offset smallint, negative_utc boolean );""", """ CREATE TYPE IF NOT EXISTS person ( fullname blob, name blob, email blob );""", """ CREATE TABLE IF NOT EXISTS content ( sha1 blob, sha1_git blob, sha256 blob, blake2s256 blob, length bigint, ctime timestamp, -- creation time, i.e. time of (first) injection into the storage status ascii, PRIMARY KEY ((sha1, sha1_git, sha256, blake2s256)) );""", """ CREATE TABLE IF NOT EXISTS skipped_content ( sha1 blob, sha1_git blob, sha256 blob, blake2s256 blob, length bigint, ctime timestamp, -- creation time, i.e. time of (first) injection into the storage status ascii, reason text, origin text, PRIMARY KEY ((sha1, sha1_git, sha256, blake2s256)) );""", """ CREATE TABLE IF NOT EXISTS revision ( id blob PRIMARY KEY, date microtimestamp_with_timezone, committer_date microtimestamp_with_timezone, type ascii, directory blob, -- source code "root" directory message blob, author person, committer person, synthetic boolean, -- true iff revision has been created by Software Heritage metadata text, -- extra metadata as JSON(tarball checksums, etc...) extra_headers frozen> > -- extra commit information as (tuple(key, value), ...) );""", """ CREATE TABLE IF NOT EXISTS revision_parent ( id blob, parent_rank int, -- parent position in merge commits, 0-based parent_id blob, PRIMARY KEY ((id), parent_rank) );""", """ CREATE TABLE IF NOT EXISTS release ( id blob PRIMARY KEY, target_type ascii, target blob, date microtimestamp_with_timezone, name blob, message blob, author person, synthetic boolean, -- true iff release has been created by Software Heritage );""", """ CREATE TABLE IF NOT EXISTS directory ( id blob PRIMARY KEY, );""", """ CREATE TABLE IF NOT EXISTS directory_entry ( directory_id blob, name blob, -- path name, relative to containing dir target blob, perms int, -- unix-like permissions type ascii, -- target type PRIMARY KEY ((directory_id), name) );""", """ CREATE TABLE IF NOT EXISTS snapshot ( id blob PRIMARY KEY, );""", """ -- For a given snapshot_id, branches are sorted by their name, -- allowing easy pagination. CREATE TABLE IF NOT EXISTS snapshot_branch ( snapshot_id blob, name blob, target_type ascii, target blob, PRIMARY KEY ((snapshot_id), name) );""", """ CREATE TABLE IF NOT EXISTS origin_visit ( origin text, visit bigint, date timestamp, type text, PRIMARY KEY ((origin), visit) );""", """ CREATE TABLE IF NOT EXISTS origin_visit_status ( origin text, visit bigint, date timestamp, type text, status ascii, metadata text, snapshot blob, PRIMARY KEY ((origin), visit, date) -);""", +) +WITH CLUSTERING ORDER BY (visit DESC, date DESC) +;""", # 'WITH CLUSTERING ORDER BY' is optional with Cassandra 4, but ScyllaDB needs it """ CREATE TABLE IF NOT EXISTS origin ( sha1 blob PRIMARY KEY, url text, next_visit_id int, -- We need integer visit ids for compatibility with the pgsql -- storage, so we're using lightweight transactions with this trick: -- https://stackoverflow.com/a/29391877/539465 );""", """ CREATE TABLE IF NOT EXISTS metadata_authority ( url text, type ascii, PRIMARY KEY ((url), type) );""", """ CREATE TABLE IF NOT EXISTS metadata_fetcher ( name ascii, version ascii, PRIMARY KEY ((name), version) );""", """ CREATE TABLE IF NOT EXISTS raw_extrinsic_metadata ( id blob, type text, target text, -- metadata source authority_type text, authority_url text, discovery_date timestamp, fetcher_name ascii, fetcher_version ascii, -- metadata itself format ascii, metadata blob, -- context origin text, visit bigint, snapshot text, release text, revision text, path blob, directory text, PRIMARY KEY ((target), authority_type, authority_url, discovery_date, id) -- An explanation is in order for this primary key: -- -- Intuitively, the primary key should only be 'id', because two metadata -- entries are the same iff the id is the same; and 'id' is used for -- deduplication. -- -- However, we also want to query by -- (target, authority_type, authority_url, discovery_date) -- The naive solution to this would be an extra table, to use as index; -- but it means 1. extra code to keep them in sync 2. overhead when writing -- 3. overhead + random reads (instead of linear) when reading. -- -- Therefore, we use a single table for both, by adding the column -- we want to query with before the id. -- It solves both a) the query/order issues and b) the uniqueness issue because: -- -- a) adding the id at the end of the primary key does not change the rows' order: -- for two different rows, id1 != id2, so -- (target1, ..., date1) < (target2, ..., date2) -- <=> (target1, ..., date1, id1) < (target2, ..., date2, id2) -- -- b) the id is a hash of all the columns, so: -- rows are the same -- <=> id1 == id2 -- <=> (target1, ..., date1, id1) == (target2, ..., date2, id2) );""", """ CREATE TABLE IF NOT EXISTS object_count ( partition_key smallint, -- Constant, must always be 0 object_type ascii, count counter, PRIMARY KEY ((partition_key), object_type) );""", """ CREATE TABLE IF NOT EXISTS extid ( extid_type ascii, extid blob, target_type ascii, target blob, PRIMARY KEY ((extid_type, extid), target_type, target) );""", """ CREATE TABLE IF NOT EXISTS extid_by_target ( target_type ascii, target blob, target_token bigint, -- value of token(pk) on the "primary" table PRIMARY KEY ((target_type, target), target_token) );""", ] CONTENT_INDEX_TEMPLATE = """ -- Secondary table, used for looking up "content" from a single hash CREATE TABLE IF NOT EXISTS content_by_{main_algo} ( {main_algo} blob, target_token bigint, -- value of token(pk) on the "primary" table PRIMARY KEY (({main_algo}), target_token) ); CREATE TABLE IF NOT EXISTS skipped_content_by_{main_algo} ( {main_algo} blob, target_token bigint, -- value of token(pk) on the "primary" table PRIMARY KEY (({main_algo}), target_token) ); """ TABLES = [ "skipped_content", "content", "revision", "revision_parent", "release", "directory", "directory_entry", "snapshot", "snapshot_branch", "origin_visit", "origin", "raw_extrinsic_metadata", "object_count", "origin_visit_status", "metadata_authority", "metadata_fetcher", "extid", "extid_by_target", ] HASH_ALGORITHMS = ["sha1", "sha1_git", "sha256", "blake2s256"] for main_algo in HASH_ALGORITHMS: CREATE_TABLES_QUERIES.extend( CONTENT_INDEX_TEMPLATE.format( main_algo=main_algo, other_algos=", ".join( [algo for algo in HASH_ALGORITHMS if algo != main_algo] ), ).split("\n\n") ) TABLES.append("content_by_%s" % main_algo) TABLES.append("skipped_content_by_%s" % main_algo) diff --git a/swh/storage/tests/test_cassandra.py b/swh/storage/tests/test_cassandra.py index 3d37ac04..cc81f067 100644 --- a/swh/storage/tests/test_cassandra.py +++ b/swh/storage/tests/test_cassandra.py @@ -1,636 +1,678 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import itertools import os +import resource import signal import socket import subprocess import time from typing import Any, Dict import attr import pytest from swh.core.api.classes import stream_results from swh.model.model import Directory, DirectoryEntry, Snapshot, SnapshotBranch from swh.storage import get_storage from swh.storage.cassandra import create_keyspace from swh.storage.cassandra.model import ContentRow, ExtIDRow from swh.storage.cassandra.schema import HASH_ALGORITHMS, TABLES from swh.storage.tests.storage_data import StorageData from swh.storage.tests.storage_tests import ( TestStorageGeneratedData as _TestStorageGeneratedData, ) from swh.storage.tests.storage_tests import TestStorage as _TestStorage from swh.storage.utils import now, remove_keys CONFIG_TEMPLATE = """ data_file_directories: - {data_dir}/data commitlog_directory: {data_dir}/commitlog hints_directory: {data_dir}/hints saved_caches_directory: {data_dir}/saved_caches commitlog_sync: periodic commitlog_sync_period_in_ms: 1000000 partitioner: org.apache.cassandra.dht.Murmur3Partitioner endpoint_snitch: SimpleSnitch seed_provider: - class_name: org.apache.cassandra.locator.SimpleSeedProvider parameters: - seeds: "127.0.0.1" storage_port: {storage_port} native_transport_port: {native_transport_port} start_native_transport: true listen_address: 127.0.0.1 enable_user_defined_functions: true # speed-up by disabling period saving to disk key_cache_save_period: 0 row_cache_save_period: 0 trickle_fsync: false commitlog_sync_period_in_ms: 100000 """ +SCYLLA_EXTRA_CONFIG_TEMPLATE = """ +experimental_features: + - udf +view_hints_directory: {data_dir}/view_hints +prometheus_port: 0 # disable prometheus server +start_rpc: false # disable thrift server +api_port: {api_port} +""" + def free_port(): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(("127.0.0.1", 0)) port = sock.getsockname()[1] sock.close() return port def wait_for_peer(addr, port): - wait_until = time.time() + 20 + wait_until = time.time() + 60 while time.time() < wait_until: try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((addr, port)) except ConnectionRefusedError: time.sleep(0.1) else: sock.close() return True return False @pytest.fixture(scope="session") def cassandra_cluster(tmpdir_factory): cassandra_conf = tmpdir_factory.mktemp("cassandra_conf") cassandra_data = tmpdir_factory.mktemp("cassandra_data") cassandra_log = tmpdir_factory.mktemp("cassandra_log") native_transport_port = free_port() storage_port = free_port() jmx_port = free_port() + api_port = free_port() + + use_scylla = bool(os.environ.get("SWH_USE_SCYLLADB", "")) + + cassandra_bin = os.environ.get( + "SWH_CASSANDRA_BIN", "/usr/bin/scylla" if use_scylla else "/usr/sbin/cassandra" + ) + + if use_scylla: + os.makedirs(cassandra_conf.join("conf")) + config_path = cassandra_conf.join("conf/scylla.yaml") + config_template = CONFIG_TEMPLATE + SCYLLA_EXTRA_CONFIG_TEMPLATE + else: + config_path = cassandra_conf.join("cassandra.yaml") + config_template = CONFIG_TEMPLATE - with open(str(cassandra_conf.join("cassandra.yaml")), "w") as fd: + with open(str(config_path), "w") as fd: fd.write( - CONFIG_TEMPLATE.format( + config_template.format( data_dir=str(cassandra_data), storage_port=storage_port, native_transport_port=native_transport_port, + api_port=api_port, ) ) if os.environ.get("SWH_CASSANDRA_LOG"): stdout = stderr = None else: stdout = stderr = subprocess.DEVNULL - cassandra_bin = os.environ.get("SWH_CASSANDRA_BIN", "/usr/sbin/cassandra") env = { "MAX_HEAP_SIZE": "300M", "HEAP_NEWSIZE": "50M", "JVM_OPTS": "-Xlog:gc=error:file=%s/gc.log" % cassandra_log, } if "JAVA_HOME" in os.environ: env["JAVA_HOME"] = os.environ["JAVA_HOME"] - proc = subprocess.Popen( - [ - cassandra_bin, - "-Dcassandra.config=file://%s/cassandra.yaml" % cassandra_conf, - "-Dcassandra.logdir=%s" % cassandra_log, - "-Dcassandra.jmx.local.port=%d" % jmx_port, - "-Dcassandra-foreground=yes", - ], - start_new_session=True, - env=env, - stdout=stdout, - stderr=stderr, - ) + if use_scylla: + env = { + **env, + "SCYLLA_HOME": cassandra_conf, + } + # prevent "NOFILE rlimit too low (recommended setting 200000, + # minimum setting 10000; refusing to start." + resource.setrlimit(resource.RLIMIT_NOFILE, (200000, 200000)) + + proc = subprocess.Popen( + [cassandra_bin, "--developer-mode=1",], + start_new_session=True, + env=env, + stdout=stdout, + stderr=stderr, + ) + else: + proc = subprocess.Popen( + [ + cassandra_bin, + "-Dcassandra.config=file://%s/cassandra.yaml" % cassandra_conf, + "-Dcassandra.logdir=%s" % cassandra_log, + "-Dcassandra.jmx.local.port=%d" % jmx_port, + "-Dcassandra-foreground=yes", + ], + start_new_session=True, + env=env, + stdout=stdout, + stderr=stderr, + ) listening = wait_for_peer("127.0.0.1", native_transport_port) if listening: yield (["127.0.0.1"], native_transport_port) if not listening or os.environ.get("SWH_CASSANDRA_LOG"): debug_log_path = str(cassandra_log.join("debug.log")) if os.path.exists(debug_log_path): with open(debug_log_path) as fd: print(fd.read()) if not listening: if proc.poll() is None: raise Exception("cassandra process unexpectedly not listening.") else: raise Exception("cassandra process unexpectedly stopped.") pgrp = os.getpgid(proc.pid) os.killpg(pgrp, signal.SIGKILL) class RequestHandler: def on_request(self, rf): if hasattr(rf.message, "query"): print() print(rf.message.query) @pytest.fixture(scope="session") def keyspace(cassandra_cluster): (hosts, port) = cassandra_cluster keyspace = os.urandom(10).hex() create_keyspace(hosts, keyspace, port) return keyspace # tests are executed using imported classes (TestStorage and # TestStorageGeneratedData) using overloaded swh_storage fixture # below @pytest.fixture def swh_storage_backend_config(cassandra_cluster, keyspace): (hosts, port) = cassandra_cluster storage_config = dict( cls="cassandra", hosts=hosts, port=port, keyspace=keyspace, journal_writer={"cls": "memory"}, objstorage={"cls": "memory"}, ) yield storage_config storage = get_storage(**storage_config) for table in TABLES: storage._cql_runner._session.execute('TRUNCATE TABLE "%s"' % table) storage._cql_runner._cluster.shutdown() @pytest.mark.cassandra class TestCassandraStorage(_TestStorage): def test_content_add_murmur3_collision(self, swh_storage, mocker, sample_data): """The Murmur3 token is used as link from index tables to the main table; and non-matching contents with colliding murmur3-hash are filtered-out when reading the main table. This test checks the content methods do filter out these collision. """ called = 0 cont, cont2 = sample_data.contents[:2] # always return a token def mock_cgtfsh(algo, hash_): nonlocal called called += 1 assert algo in ("sha1", "sha1_git") return [123456] mocker.patch.object( swh_storage._cql_runner, "content_get_tokens_from_single_hash", mock_cgtfsh, ) # For all tokens, always return cont def mock_cgft(token): nonlocal called called += 1 return [ ContentRow( length=10, ctime=datetime.datetime.now(), status="present", **{algo: getattr(cont, algo) for algo in HASH_ALGORITHMS}, ) ] mocker.patch.object( swh_storage._cql_runner, "content_get_from_token", mock_cgft ) actual_result = swh_storage.content_add([cont2]) assert called == 4 assert actual_result == { "content:add": 1, "content:add:bytes": cont2.length, } def test_content_get_metadata_murmur3_collision( self, swh_storage, mocker, sample_data ): """The Murmur3 token is used as link from index tables to the main table; and non-matching contents with colliding murmur3-hash are filtered-out when reading the main table. This test checks the content methods do filter out these collisions. """ called = 0 cont, cont2 = [attr.evolve(c, ctime=now()) for c in sample_data.contents[:2]] # always return a token def mock_cgtfsh(algo, hash_): nonlocal called called += 1 assert algo in ("sha1", "sha1_git") return [123456] mocker.patch.object( swh_storage._cql_runner, "content_get_tokens_from_single_hash", mock_cgtfsh, ) # For all tokens, always return cont and cont2 cols = list(set(cont.to_dict()) - {"data"}) def mock_cgft(token): nonlocal called called += 1 return [ ContentRow(**{col: getattr(cont, col) for col in cols},) for cont in [cont, cont2] ] mocker.patch.object( swh_storage._cql_runner, "content_get_from_token", mock_cgft ) actual_result = swh_storage.content_get([cont.sha1]) assert called == 2 # dropping extra column not returned expected_cont = attr.evolve(cont, data=None) # but cont2 should be filtered out assert actual_result == [expected_cont] def test_content_find_murmur3_collision(self, swh_storage, mocker, sample_data): """The Murmur3 token is used as link from index tables to the main table; and non-matching contents with colliding murmur3-hash are filtered-out when reading the main table. This test checks the content methods do filter out these collisions. """ called = 0 cont, cont2 = [attr.evolve(c, ctime=now()) for c in sample_data.contents[:2]] # always return a token def mock_cgtfsh(algo, hash_): nonlocal called called += 1 assert algo in ("sha1", "sha1_git") return [123456] mocker.patch.object( swh_storage._cql_runner, "content_get_tokens_from_single_hash", mock_cgtfsh, ) # For all tokens, always return cont and cont2 cols = list(set(cont.to_dict()) - {"data"}) def mock_cgft(token): nonlocal called called += 1 return [ ContentRow(**{col: getattr(cont, col) for col in cols}) for cont in [cont, cont2] ] mocker.patch.object( swh_storage._cql_runner, "content_get_from_token", mock_cgft ) expected_content = attr.evolve(cont, data=None) actual_result = swh_storage.content_find({"sha1": cont.sha1}) assert called == 2 # but cont2 should be filtered out assert actual_result == [expected_content] def test_content_get_partition_murmur3_collision( self, swh_storage, mocker, sample_data ): """The Murmur3 token is used as link from index tables to the main table; and non-matching contents with colliding murmur3-hash are filtered-out when reading the main table. This test checks the content_get_partition endpoints return all contents, even the collisions. """ called = 0 rows: Dict[int, Dict] = {} for tok, content in enumerate(sample_data.contents): cont = attr.evolve(content, data=None, ctime=now()) row_d = {**cont.to_dict(), "tok": tok} rows[tok] = row_d # For all tokens, always return cont def mock_content_get_token_range(range_start, range_end, limit): nonlocal called called += 1 for tok in list(rows.keys()) * 3: # yield multiple times the same tok row_d = dict(rows[tok].items()) row_d.pop("tok") yield (tok, ContentRow(**row_d)) mocker.patch.object( swh_storage._cql_runner, "content_get_token_range", mock_content_get_token_range, ) actual_results = list( stream_results( swh_storage.content_get_partition, partition_id=0, nb_partitions=1 ) ) assert called > 0 # everything is listed, even collisions assert len(actual_results) == 3 * len(sample_data.contents) # as we duplicated the returned results, dropping duplicate should yield # the original length assert len(set(actual_results)) == len(sample_data.contents) @pytest.mark.skip("content_update is not yet implemented for Cassandra") def test_content_update(self): pass def test_extid_murmur3_collision(self, swh_storage, mocker, sample_data): """The Murmur3 token is used as link from index table to the main table; and non-matching extid with colliding murmur3-hash are filtered-out when reading the main table. This test checks the extid methods do filter out these collision. """ swh_storage.extid_add(sample_data.extids) # For any token, always return all extids, i.e. make as if all tokens # for all extid entries collide def mock_egft(token): return [ ExtIDRow( extid_type=extid.extid_type, extid=extid.extid, target_type=extid.target.object_type.value, target=extid.target.object_id, ) for extid in sample_data.extids ] mocker.patch.object( swh_storage._cql_runner, "extid_get_from_token", mock_egft, ) for extid in sample_data.extids: extids = swh_storage.extid_get_from_target( target_type=extid.target.object_type, ids=[extid.target.object_id] ) assert extids == [extid] def test_directory_add_atomic(self, swh_storage, sample_data, mocker): """Checks that a crash occurring after some directory entries were written does not cause the directory to be (partially) visible. ie. checks directories are added somewhat atomically.""" # Disable the journal writer, it would detect the CrashyEntry exception too # early for this test to be relevant swh_storage.journal_writer.journal = None class MyException(Exception): pass class CrashyEntry(DirectoryEntry): def __init__(self): pass def to_dict(self): raise MyException() directory = sample_data.directory3 entries = directory.entries directory = attr.evolve(directory, entries=entries + (CrashyEntry(),)) with pytest.raises(MyException): swh_storage.directory_add([directory]) # This should have written some of the entries to the database: entry_rows = swh_storage._cql_runner.directory_entry_get([directory.id]) assert {row.name for row in entry_rows} == {entry.name for entry in entries} # BUT, because not all the entries were written, the directory should # be considered not written. assert swh_storage.directory_missing([directory.id]) == [directory.id] assert list(swh_storage.directory_ls(directory.id)) == [] assert swh_storage.directory_get_entries(directory.id) is None def test_snapshot_add_atomic(self, swh_storage, sample_data, mocker): """Checks that a crash occurring after some snapshot branches were written does not cause the snapshot to be (partially) visible. ie. checks snapshots are added somewhat atomically.""" # Disable the journal writer, it would detect the CrashyBranch exception too # early for this test to be relevant swh_storage.journal_writer.journal = None class MyException(Exception): pass class CrashyBranch(SnapshotBranch): def __getattribute__(self, name): if name == "target" and should_raise: raise MyException() else: return super().__getattribute__(name) snapshot = sample_data.complete_snapshot branches = snapshot.branches should_raise = False # just so that we can construct the object crashy_branch = CrashyBranch.from_dict(branches[b"directory"].to_dict()) should_raise = True snapshot = attr.evolve( snapshot, branches={**branches, b"crashy": crashy_branch,}, ) with pytest.raises(MyException): swh_storage.snapshot_add([snapshot]) # This should have written some of the branches to the database: branch_rows = swh_storage._cql_runner.snapshot_branch_get(snapshot.id, b"", 10) assert {row.name for row in branch_rows} == set(branches) # BUT, because not all the branches were written, the snapshot should # be considered not written. assert swh_storage.snapshot_missing([snapshot.id]) == [snapshot.id] assert swh_storage.snapshot_get(snapshot.id) is None assert swh_storage.snapshot_count_branches(snapshot.id) is None assert swh_storage.snapshot_get_branches(snapshot.id) is None @pytest.mark.skip( 'The "person" table of the pgsql is a legacy thing, and not ' "supported by the cassandra backend." ) def test_person_fullname_unicity(self): pass @pytest.mark.skip( 'The "person" table of the pgsql is a legacy thing, and not ' "supported by the cassandra backend." ) def test_person_get(self): pass @pytest.mark.skip("Not supported by Cassandra") def test_origin_count(self): pass @pytest.mark.cassandra class TestCassandraStorageGeneratedData(_TestStorageGeneratedData): @pytest.mark.skip("Not supported by Cassandra") def test_origin_count(self): pass @pytest.mark.skip("Not supported by Cassandra") def test_origin_count_with_visit_no_visits(self): pass @pytest.mark.skip("Not supported by Cassandra") def test_origin_count_with_visit_with_visits_and_snapshot(self): pass @pytest.mark.skip("Not supported by Cassandra") def test_origin_count_with_visit_with_visits_no_snapshot(self): pass @pytest.mark.parametrize( "allow_overwrite,object_type", itertools.product( [False, True], # Note the absence of "content", it's tested above. ["directory", "revision", "release", "snapshot", "origin", "extid"], ), ) def test_allow_overwrite( allow_overwrite: bool, object_type: str, swh_storage_backend_config ): if object_type in ("origin", "extid"): pytest.skip( f"test_disallow_overwrite not implemented for {object_type} objects, " f"because all their columns are in the primary key." ) swh_storage = get_storage( allow_overwrite=allow_overwrite, **swh_storage_backend_config ) # directory_ls joins with content and directory table, and needs those to return # non-None entries: if object_type == "directory": swh_storage.directory_add([StorageData.directory5]) swh_storage.content_add([StorageData.content, StorageData.content2]) obj1: Any obj2: Any # Get two test objects if object_type == "directory": (obj1, obj2, *_) = StorageData.directories elif object_type == "snapshot": # StorageData.snapshots[1] is the empty snapshot, which is the corner case # that makes this test succeed for the wrong reasons obj1 = StorageData.snapshot obj2 = StorageData.complete_snapshot else: (obj1, obj2, *_) = getattr(StorageData, (object_type + "s")) # Let's make both objects have the same hash, but different content obj1 = attr.evolve(obj1, id=obj2.id) # Get the methods used to add and get these objects add = getattr(swh_storage, object_type + "_add") if object_type == "directory": def get(ids): return [ Directory( id=ids[0], entries=tuple( map( lambda entry: DirectoryEntry( name=entry["name"], type=entry["type"], target=entry["sha1_git"], perms=entry["perms"], ), swh_storage.directory_ls(ids[0]), ) ), ) ] elif object_type == "snapshot": def get(ids): return [ Snapshot.from_dict( remove_keys(swh_storage.snapshot_get(ids[0]), ("next_branch",)) ) ] else: get = getattr(swh_storage, object_type + "_get") # Add the first object add([obj1]) # It should be returned as-is assert get([obj1.id]) == [obj1] # Add the second object add([obj2]) if allow_overwrite: # obj1 was overwritten by obj2 expected = obj2 else: # obj2 was not written, because obj1 already exists and has the same hash expected = obj1 if allow_overwrite and object_type in ("directory", "snapshot"): # TODO pytest.xfail( "directory entries and snapshot branches are concatenated " "instead of being replaced" ) assert get([obj1.id]) == [expected]