diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,4 @@ swh.core[db,http] >= 0.14.0 +swh.counters >= v0.8.0 swh.model >= 2.1.0 swh.objstorage >= 0.2.2 diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py --- a/swh/storage/__init__.py +++ b/swh/storage/__init__.py @@ -19,11 +19,12 @@ # deprecated "local": ".postgresql.storage.Storage", # proxy storages - "filter": ".proxies.filter.FilteringProxyStorage", "buffer": ".proxies.buffer.BufferingProxyStorage", + "counter": ".proxies.counter.CountingProxyStorage", + "filter": ".proxies.filter.FilteringProxyStorage", "retry": ".proxies.retry.RetryingProxyStorage", - "validate": ".proxies.validate.ValidatingProxyStorage", "tenacious": ".proxies.tenacious.TenaciousProxyStorage", + "validate": ".proxies.validate.ValidatingProxyStorage", } diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -7,6 +7,7 @@ import dataclasses import datetime import functools +import itertools import logging import random from typing import ( @@ -17,6 +18,7 @@ Iterator, List, Optional, + Sequence, Tuple, Type, TypeVar, @@ -25,6 +27,7 @@ from cassandra import ConsistencyLevel, CoordinationFailure from cassandra.cluster import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, ResultSet +from cassandra.concurrent import execute_concurrent_with_args from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy from cassandra.query import BoundStatement, PreparedStatement, dict_factory from mypy_extensions import NamedArg @@ -85,6 +88,8 @@ See for details and rationale. """ +BATCH_INSERT_MAX_SIZE = 1000 + logger = logging.getLogger(__name__) @@ -166,6 +171,14 @@ TSelf = TypeVar("TSelf") +def _insert_query(row_class): + columns = row_class.cols() + return ( + f"INSERT INTO {row_class.TABLE} ({', '.join(columns)}) " + f"VALUES ({', '.join('?' for _ in columns)})" + ) + + def _prepared_insert_statement( row_class: Type[BaseRow], ) -> Callable[ @@ -174,11 +187,7 @@ ]: """Shorthand for using `_prepared_statement` for `INSERT INTO` statements.""" - columns = row_class.cols() - return _prepared_statement( - "INSERT INTO %s (%s) VALUES (%s)" - % (row_class.TABLE, ", ".join(columns), ", ".join("?" for _ in columns),) - ) + return _prepared_statement(_insert_query(row_class)) def _prepared_exists_statement( @@ -280,22 +289,28 @@ stop=stop_after_attempt(MAX_RETRIES), retry=retry_if_exception_type(CoordinationFailure), ) - def _execute_with_retries(self, statement, args) -> ResultSet: + def _execute_with_retries(self, statement, args: Optional[Sequence]) -> ResultSet: return self._session.execute(statement, args, timeout=1000.0) - @_prepared_statement( - "UPDATE object_count SET count = count + ? " - "WHERE partition_key = 0 AND object_type = ?" + @retry( + wait=wait_random_exponential(multiplier=1, max=10), + stop=stop_after_attempt(MAX_RETRIES), + retry=retry_if_exception_type(CoordinationFailure), ) - def _increment_counter( - self, object_type: str, nb: int, *, statement: PreparedStatement - ) -> None: - self._execute_with_retries(statement, [nb, object_type]) + def _execute_many_with_retries( + self, statement, args_list: List[Tuple] + ) -> ResultSet: + return execute_concurrent_with_args(self._session, statement, args_list) def _add_one(self, statement, obj: BaseRow) -> None: - self._increment_counter(obj.TABLE, 1) self._execute_with_retries(statement, dataclasses.astuple(obj)) + def _add_many(self, statement, objs: Sequence[BaseRow]) -> None: + tables = {obj.TABLE for obj in objs} + assert len(tables) == 1, f"Cannot insert to multiple tables: {tables}" + (table,) = tables + self._execute_many_with_retries(statement, list(map(dataclasses.astuple, objs))) + _T = TypeVar("_T", bound=BaseRow) def _get_random_row(self, row_class: Type[_T], statement) -> Optional[_T]: # noqa @@ -328,7 +343,6 @@ """Returned currified by content_add_prepare, to be called when the content row should be added to the primary table.""" self._execute_with_retries(statement, None) - self._increment_counter("content", 1) @_prepared_insert_statement(ContentRow) def content_add_prepare( @@ -482,7 +496,6 @@ """Returned currified by skipped_content_add_prepare, to be called when the content row should be added to the primary table.""" self._execute_with_retries(statement, None) - self._increment_counter("skipped_content", 1) @_prepared_insert_statement(SkippedContentRow) def skipped_content_add_prepare( @@ -677,6 +690,56 @@ def directory_entry_add_one(self, entry: DirectoryEntryRow, *, statement) -> None: self._add_one(statement, entry) + @_prepared_insert_statement(DirectoryEntryRow) + def directory_entry_add_concurrent( + self, entries: List[DirectoryEntryRow], *, statement + ) -> None: + if len(entries) == 0: + # nothing to do + return + assert ( + len({entry.directory_id for entry in entries}) == 1 + ), "directory_entry_add_many must be called with entries for a single dir" + self._add_many(statement, entries) + + def directory_entry_add_batch(self, entries: List[DirectoryEntryRow],) -> None: + if len(entries) == 0: + # nothing to do + return + assert ( + len({entry.directory_id for entry in entries}) == 1 + ), "directory_entry_add_many must be called with entries for a single dir" + + # query to INSERT one row + insert_query = _insert_query(DirectoryEntryRow) + ";\n" + + # In "steady state", we insert batches of the maximum allowed size. + # Then, the last one has however many entries remain. + last_batch_size = len(entries) % BATCH_INSERT_MAX_SIZE + if len(entries) >= BATCH_INSERT_MAX_SIZE: + # TODO: the main_statement's size is statically known, so we could avoid + # re-preparing it on every call + main_statement = self._session.prepare( + "BEGIN UNLOGGED BATCH\n" + + insert_query * BATCH_INSERT_MAX_SIZE + + "APPLY BATCH" + ) + last_statement = self._session.prepare( + "BEGIN UNLOGGED BATCH\n" + insert_query * last_batch_size + "APPLY BATCH" + ) + + for entry_group in grouper(entries, BATCH_INSERT_MAX_SIZE): + entry_group = list(map(dataclasses.astuple, entry_group)) + if len(entry_group) == BATCH_INSERT_MAX_SIZE: + self._execute_with_retries( + main_statement, list(itertools.chain.from_iterable(entry_group)) + ) + else: + assert len(entry_group) == last_batch_size + self._execute_with_retries( + last_statement, list(itertools.chain.from_iterable(entry_group)) + ) + @_prepared_select_statement(DirectoryEntryRow, "WHERE directory_id IN ?") def directory_entry_get( self, directory_ids, *, statement @@ -1219,7 +1282,6 @@ """Returned currified by extid_add_prepare, to be called when the extid row should be added to the primary table.""" self._execute_with_retries(statement, None) - self._increment_counter("extid", 1) @_prepared_insert_statement(ExtIDRow) def extid_add_prepare( @@ -1328,10 +1390,11 @@ # Miscellaneous ########################## + def stat_counters(self) -> Iterable[ObjectCountRow]: + raise NotImplementedError( + "stat_counters is not implemented by the Cassandra backend" + ) + @_prepared_statement("SELECT uuid() FROM revision LIMIT 1;") def check_read(self, *, statement): self._execute_with_retries(statement, []) - - @_prepared_select_statement(ObjectCountRow, "WHERE partition_key=0") - def stat_counters(self, *, statement) -> Iterable[ObjectCountRow]: - return map(ObjectCountRow.from_dict, self._execute_with_retries(statement, [])) diff --git a/swh/storage/cassandra/schema.py b/swh/storage/cassandra/schema.py --- a/swh/storage/cassandra/schema.py +++ b/swh/storage/cassandra/schema.py @@ -267,13 +267,6 @@ PRIMARY KEY ((id)) );""", """ -CREATE TABLE IF NOT EXISTS object_count ( - partition_key smallint, -- Constant, must always be 0 - object_type ascii, - count counter, - PRIMARY KEY ((partition_key), object_type) -);""", - """ CREATE TABLE IF NOT EXISTS extid ( extid_type ascii, extid blob, @@ -319,7 +312,6 @@ "origin_visit", "origin", "raw_extrinsic_metadata", - "object_count", "origin_visit_status", "metadata_authority", "metadata_fetcher", diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -100,6 +100,7 @@ journal_writer=None, allow_overwrite=False, consistency_level="ONE", + directory_entries_insert_algo="one-by-one", ): """ A backend of swh-storage backed by Cassandra @@ -120,6 +121,10 @@ Note that a ``False`` value does not guarantee there won't be any overwrite. consistency_level: The default read/write consistency to use + directory_entries_insert_algo: Must be one of: + * one-by-one: naive, one INSERT per directory entry, serialized + * concurrent: one INSERT per directory entry, concurrent + * batch: using UNLOGGED BATCH to insert many entries in a few statements """ self._hosts = hosts self._keyspace = keyspace @@ -129,6 +134,7 @@ self.journal_writer: JournalWriter = JournalWriter(journal_writer) self.objstorage: ObjStorage = ObjStorage(objstorage) self._allow_overwrite = allow_overwrite + self._directory_entries_insert_algo = directory_entries_insert_algo def _set_cql_runner(self): """Used by tests when they need to reset the CqlRunner""" @@ -438,9 +444,21 @@ for directory in directories: # Add directory entries to the 'directory_entry' table - for entry in directory.entries: - self._cql_runner.directory_entry_add_one( - DirectoryEntryRow(directory_id=directory.id, **entry.to_dict()) + rows = [ + DirectoryEntryRow(directory_id=directory.id, **entry.to_dict()) + for entry in directory.entries + ] + if self._directory_entries_insert_algo == "one-by-one": + for row in rows: + self._cql_runner.directory_entry_add_one(row) + elif self._directory_entries_insert_algo == "concurrent": + self._cql_runner.directory_entry_add_concurrent(rows) + elif self._directory_entries_insert_algo == "batch": + self._cql_runner.directory_entry_add_batch(rows) + else: + raise ValueError( + f"Unexpected value for directory_entries_insert_algo: " + f"{self._directory_entries_insert_algo}" ) # Add the directory *after* adding all the entries, so someone diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -729,6 +729,7 @@ self.reset() self.journal_writer = JournalWriter(journal_writer) self._allow_overwrite = False + self._directory_entries_insert_algo = "one-by-one" def reset(self): self._cql_runner = InMemoryCqlRunner() diff --git a/swh/storage/proxies/counter.py b/swh/storage/proxies/counter.py new file mode 100644 --- /dev/null +++ b/swh/storage/proxies/counter.py @@ -0,0 +1,66 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +from typing import Callable + +from swh.counters import get_counters +from swh.counters.interface import CountersInterface +from swh.storage import get_storage +from swh.storage.interface import StorageInterface + +OBJECT_TYPES = [ + "content", + "directory", + "snapshot", + "origin_visit_status", + "origin_visit", + "origin", +] + + +class CountingProxyStorage: + """Counting Storage Proxy. + + This is in charge of adding objects directly to swh-counters, without + going through Kafka/swh-journal. + This is meant as a simple way to setup counters for experiments; production + should use swh-journal to reduce load/latency of the storage server. + + Additionally, unlike the journal-based counting, it does not count persons + or the number of origins per netloc. + + Sample configuration use case for filtering storage: + + .. code-block: yaml + + storage: + cls: counter + counters: + cls: remote + url: http://counters.internal.staging.swh.network:5011/ + storage: + cls: remote + url: http://storage.internal.staging.swh.network:5002/ + + """ + + def __init__(self, counters, storage): + self.counters: CountersInterface = get_counters(**counters) + self.storage: StorageInterface = get_storage(**storage) + + def __getattr__(self, key): + if key == "storage": + raise AttributeError(key) + if key.endswith("_add"): + return self._adder(key[0:-4], getattr(self.storage, key)) + return getattr(self.storage, key) + + def _adder(self, collection: str, backend_function: Callable): + def f(objs): + self.counters.add(collection, [obj.unique_key() for obj in objs]) + return backend_function(objs) + + return f diff --git a/swh/storage/tests/storage_tests.py b/swh/storage/tests/storage_tests.py --- a/swh/storage/tests/storage_tests.py +++ b/swh/storage/tests/storage_tests.py @@ -38,8 +38,10 @@ TargetType, ) from swh.storage import get_storage +from swh.storage.cassandra.storage import CassandraStorage from swh.storage.common import origin_url_to_sha1 as sha1 from swh.storage.exc import HashCollision, StorageArgumentException +from swh.storage.in_memory import InMemoryStorage from swh.storage.interface import ListOrder, PagedResult, StorageInterface from swh.storage.tests.conftest import function_scoped_fixture_check from swh.storage.utils import ( @@ -187,8 +189,11 @@ assert obj.ctime <= insertion_end_time assert obj == expected_cont - swh_storage.refresh_stat_counters() - assert swh_storage.stat_counters()["content"] == 1 + if isinstance(swh_storage, InMemoryStorage) or not isinstance( + swh_storage, CassandraStorage + ): + swh_storage.refresh_stat_counters() + assert swh_storage.stat_counters()["content"] == 1 def test_content_add_from_lazy_content(self, swh_storage, sample_data): cont = sample_data.content @@ -221,8 +226,11 @@ assert obj.ctime <= insertion_end_time assert attr.evolve(obj, ctime=None).to_dict() == expected_cont.to_dict() - swh_storage.refresh_stat_counters() - assert swh_storage.stat_counters()["content"] == 1 + if isinstance(swh_storage, InMemoryStorage) or not isinstance( + swh_storage, CassandraStorage + ): + swh_storage.refresh_stat_counters() + assert swh_storage.stat_counters()["content"] == 1 def test_content_get_data_missing(self, swh_storage, sample_data): cont, cont2 = sample_data.contents[:2] @@ -705,8 +713,11 @@ after_missing = list(swh_storage.directory_missing([directory.id])) assert after_missing == [] - swh_storage.refresh_stat_counters() - assert swh_storage.stat_counters()["directory"] == 1 + if isinstance(swh_storage, InMemoryStorage) or not isinstance( + swh_storage, CassandraStorage + ): + swh_storage.refresh_stat_counters() + assert swh_storage.stat_counters()["directory"] == 1 def test_directory_add_twice(self, swh_storage, sample_data): directory = sample_data.directories[1] @@ -975,8 +986,11 @@ actual_result = swh_storage.revision_add([revision]) assert actual_result == {"revision:add": 0} - swh_storage.refresh_stat_counters() - assert swh_storage.stat_counters()["revision"] == 1 + if isinstance(swh_storage, InMemoryStorage) or not isinstance( + swh_storage, CassandraStorage + ): + swh_storage.refresh_stat_counters() + assert swh_storage.stat_counters()["revision"] == 1 def test_revision_add_twice(self, swh_storage, sample_data): revision, revision2 = sample_data.revisions[:2] @@ -1376,8 +1390,11 @@ actual_result = swh_storage.release_add([release, release2]) assert actual_result == {"release:add": 0} - swh_storage.refresh_stat_counters() - assert swh_storage.stat_counters()["release"] == 2 + if isinstance(swh_storage, InMemoryStorage) or not isinstance( + swh_storage, CassandraStorage + ): + swh_storage.refresh_stat_counters() + assert swh_storage.stat_counters()["release"] == 2 def test_release_add_no_author_date(self, swh_storage, sample_data): full_release = sample_data.release @@ -1482,8 +1499,11 @@ [("origin", origin) for origin in origins] ) - swh_storage.refresh_stat_counters() - assert swh_storage.stat_counters()["origin"] == len(origins) + if isinstance(swh_storage, InMemoryStorage) or not isinstance( + swh_storage, CassandraStorage + ): + swh_storage.refresh_stat_counters() + assert swh_storage.stat_counters()["origin"] == len(origins) def test_origin_add_twice(self, swh_storage, sample_data): origin, origin2 = sample_data.origins[:2] @@ -1921,11 +1941,13 @@ ] ) - swh_storage.refresh_stat_counters() - - stats = swh_storage.stat_counters() - assert stats["origin"] == len(origins) - assert stats["origin_visit"] == len(origins) * len(visits) + if isinstance(swh_storage, InMemoryStorage) or not isinstance( + swh_storage, CassandraStorage + ): + swh_storage.refresh_stat_counters() + stats = swh_storage.stat_counters() + assert stats["origin"] == len(origins) + assert stats["origin_visit"] == len(origins) * len(visits) random_ovs = swh_storage.origin_visit_status_get_random(visit_type) assert random_ovs @@ -3122,8 +3144,11 @@ "next_branch": None, } - swh_storage.refresh_stat_counters() - assert swh_storage.stat_counters()["snapshot"] == 2 + if isinstance(swh_storage, InMemoryStorage) or not isinstance( + swh_storage, CassandraStorage + ): + swh_storage.refresh_stat_counters() + assert swh_storage.stat_counters()["snapshot"] == 2 def test_snapshot_add_many_incremental(self, swh_storage, sample_data): snapshot, _, complete_snapshot = sample_data.snapshots[:3] @@ -3623,6 +3648,10 @@ assert list(missing_snapshots) == [missing_snapshot.id] def test_stat_counters(self, swh_storage, sample_data): + if isinstance(swh_storage, CassandraStorage) and not isinstance( + swh_storage, InMemoryStorage + ): + pytest.skip("Cassandra backend does not support stat counters") origin = sample_data.origin snapshot = sample_data.snapshot revision = sample_data.revision diff --git a/swh/storage/tests/test_cassandra.py b/swh/storage/tests/test_cassandra.py --- a/swh/storage/tests/test_cassandra.py +++ b/swh/storage/tests/test_cassandra.py @@ -494,21 +494,18 @@ # early for this test to be relevant swh_storage.journal_writer.journal = None - class MyException(Exception): - pass - class CrashyEntry(DirectoryEntry): def __init__(self): pass def to_dict(self): - raise MyException() + return {**directory.entries[0].to_dict(), "perms": "abcde"} directory = sample_data.directory3 entries = directory.entries directory = attr.evolve(directory, entries=entries + (CrashyEntry(),)) - with pytest.raises(MyException): + with pytest.raises(TypeError): swh_storage.directory_add([directory]) # This should have written some of the entries to the database: diff --git a/swh/storage/tests/test_counter.py b/swh/storage/tests/test_counter.py new file mode 100644 --- /dev/null +++ b/swh/storage/tests/test_counter.py @@ -0,0 +1,63 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import attr +import pytest + +from swh.storage import get_storage + + +@pytest.fixture +def swh_storage(): + storage_config = { + "cls": "pipeline", + "steps": [ + {"cls": "counter", "counters": {"cls": "memory"}}, + {"cls": "memory"}, + ], + } + + return get_storage(**storage_config) + + +def test_couting_proxy_storage_content(swh_storage, sample_data): + assert swh_storage.counters.counters["content"] == set() + + swh_storage.content_add([sample_data.content]) + + assert swh_storage.counters.counters["content"] == {sample_data.content.sha1} + + swh_storage.content_add([sample_data.content2, sample_data.content3]) + + assert swh_storage.counters.counters["content"] == { + sample_data.content.sha1, + sample_data.content2.sha1, + sample_data.content3.sha1, + } + + assert [ + attr.evolve(cnt, ctime=None) + for cnt in swh_storage.content_find({"sha256": sample_data.content2.sha256}) + ] == [attr.evolve(sample_data.content2, data=None)] + + +def test_couting_proxy_storage_revision(swh_storage, sample_data): + assert swh_storage.counters.counters["revision"] == set() + + swh_storage.revision_add([sample_data.revision]) + + assert swh_storage.counters.counters["revision"] == {sample_data.revision.id} + + swh_storage.revision_add([sample_data.revision2, sample_data.revision3]) + + assert swh_storage.counters.counters["revision"] == { + sample_data.revision.id, + sample_data.revision2.id, + sample_data.revision3.id, + } + + assert swh_storage.revision_get([sample_data.revision2.id]) == [ + sample_data.revision2 + ]