diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -7,6 +7,7 @@ import dataclasses import datetime import functools +import itertools import logging import random from typing import ( @@ -17,6 +18,7 @@ Iterator, List, Optional, + Sequence, Tuple, Type, TypeVar, @@ -25,6 +27,7 @@ from cassandra import ConsistencyLevel, CoordinationFailure from cassandra.cluster import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, ResultSet +from cassandra.concurrent import execute_concurrent_with_args from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy from cassandra.query import BoundStatement, PreparedStatement, dict_factory from mypy_extensions import NamedArg @@ -85,6 +88,8 @@ See for details and rationale. """ +BATCH_INSERT_MAX_SIZE = 1000 + logger = logging.getLogger(__name__) @@ -166,6 +171,14 @@ TSelf = TypeVar("TSelf") +def _insert_query(row_class): + columns = row_class.cols() + return ( + f"INSERT INTO {row_class.TABLE} ({', '.join(columns)}) " + f"VALUES ({', '.join('?' for _ in columns)})" + ) + + def _prepared_insert_statement( row_class: Type[BaseRow], ) -> Callable[ @@ -174,11 +187,7 @@ ]: """Shorthand for using `_prepared_statement` for `INSERT INTO` statements.""" - columns = row_class.cols() - return _prepared_statement( - "INSERT INTO %s (%s) VALUES (%s)" - % (row_class.TABLE, ", ".join(columns), ", ".join("?" for _ in columns),) - ) + return _prepared_statement(_insert_query(row_class)) def _prepared_exists_statement( @@ -280,12 +289,28 @@ stop=stop_after_attempt(MAX_RETRIES), retry=retry_if_exception_type(CoordinationFailure), ) - def _execute_with_retries(self, statement, args) -> ResultSet: + def _execute_with_retries(self, statement, args: Optional[Sequence]) -> ResultSet: return self._session.execute(statement, args, timeout=1000.0) + @retry( + wait=wait_random_exponential(multiplier=1, max=10), + stop=stop_after_attempt(MAX_RETRIES), + retry=retry_if_exception_type(CoordinationFailure), + ) + def _execute_many_with_retries( + self, statement, args_list: List[Tuple] + ) -> ResultSet: + return execute_concurrent_with_args(self._session, statement, args_list) + def _add_one(self, statement, obj: BaseRow) -> None: self._execute_with_retries(statement, dataclasses.astuple(obj)) + def _add_many(self, statement, objs: Sequence[BaseRow]) -> None: + tables = {obj.TABLE for obj in objs} + assert len(tables) == 1, f"Cannot insert to multiple tables: {tables}" + (table,) = tables + self._execute_many_with_retries(statement, list(map(dataclasses.astuple, objs))) + _T = TypeVar("_T", bound=BaseRow) def _get_random_row(self, row_class: Type[_T], statement) -> Optional[_T]: # noqa @@ -665,6 +690,56 @@ def directory_entry_add_one(self, entry: DirectoryEntryRow, *, statement) -> None: self._add_one(statement, entry) + @_prepared_insert_statement(DirectoryEntryRow) + def directory_entry_add_concurrent( + self, entries: List[DirectoryEntryRow], *, statement + ) -> None: + if len(entries) == 0: + # nothing to do + return + assert ( + len({entry.directory_id for entry in entries}) == 1 + ), "directory_entry_add_many must be called with entries for a single dir" + self._add_many(statement, entries) + + def directory_entry_add_batch(self, entries: List[DirectoryEntryRow],) -> None: + if len(entries) == 0: + # nothing to do + return + assert ( + len({entry.directory_id for entry in entries}) == 1 + ), "directory_entry_add_many must be called with entries for a single dir" + + # query to INSERT one row + insert_query = _insert_query(DirectoryEntryRow) + ";\n" + + # In "steady state", we insert batches of the maximum allowed size. + # Then, the last one has however many entries remain. + last_batch_size = len(entries) % BATCH_INSERT_MAX_SIZE + if len(entries) >= BATCH_INSERT_MAX_SIZE: + # TODO: the main_statement's size is statically known, so we could avoid + # re-preparing it on every call + main_statement = self._session.prepare( + "BEGIN UNLOGGED BATCH\n" + + insert_query * BATCH_INSERT_MAX_SIZE + + "APPLY BATCH" + ) + last_statement = self._session.prepare( + "BEGIN UNLOGGED BATCH\n" + insert_query * last_batch_size + "APPLY BATCH" + ) + + for entry_group in grouper(entries, BATCH_INSERT_MAX_SIZE): + entry_group = list(map(dataclasses.astuple, entry_group)) + if len(entry_group) == BATCH_INSERT_MAX_SIZE: + self._execute_with_retries( + main_statement, list(itertools.chain.from_iterable(entry_group)) + ) + else: + assert len(entry_group) == last_batch_size + self._execute_with_retries( + last_statement, list(itertools.chain.from_iterable(entry_group)) + ) + @_prepared_select_statement(DirectoryEntryRow, "WHERE directory_id IN ?") def directory_entry_get( self, directory_ids, *, statement diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -90,6 +90,8 @@ # Max block size of contents to return BULK_BLOCK_CONTENT_LEN_MAX = 10000 +DIRECTORY_ENTRIES_INSERT_ALGOS = ["one-by-one", "concurrent", "batch"] + class CassandraStorage: def __init__( @@ -101,6 +103,7 @@ journal_writer=None, allow_overwrite=False, consistency_level="ONE", + directory_entries_insert_algo="one-by-one", ): """ A backend of swh-storage backed by Cassandra @@ -121,6 +124,10 @@ Note that a ``False`` value does not guarantee there won't be any overwrite. consistency_level: The default read/write consistency to use + directory_entries_insert_algo: Must be one of: + * one-by-one: naive, one INSERT per directory entry, serialized + * concurrent: one INSERT per directory entry, concurrent + * batch: using UNLOGGED BATCH to insert many entries in a few statements """ self._hosts = hosts self._keyspace = keyspace @@ -131,6 +138,13 @@ self.objstorage: ObjStorage = ObjStorage(objstorage) self._allow_overwrite = allow_overwrite + if directory_entries_insert_algo not in DIRECTORY_ENTRIES_INSERT_ALGOS: + raise ValueError( + f"directory_entries_insert_algo must be one of: " + f"{', '.join(DIRECTORY_ENTRIES_INSERT_ALGOS)}" + ) + self._directory_entries_insert_algo = directory_entries_insert_algo + def _set_cql_runner(self): """Used by tests when they need to reset the CqlRunner""" self._cql_runner: CqlRunner = CqlRunner( @@ -458,9 +472,21 @@ for directory in directories: # Add directory entries to the 'directory_entry' table - for entry in directory.entries: - self._cql_runner.directory_entry_add_one( - DirectoryEntryRow(directory_id=directory.id, **entry.to_dict()) + rows = [ + DirectoryEntryRow(directory_id=directory.id, **entry.to_dict()) + for entry in directory.entries + ] + if self._directory_entries_insert_algo == "one-by-one": + for row in rows: + self._cql_runner.directory_entry_add_one(row) + elif self._directory_entries_insert_algo == "concurrent": + self._cql_runner.directory_entry_add_concurrent(rows) + elif self._directory_entries_insert_algo == "batch": + self._cql_runner.directory_entry_add_batch(rows) + else: + raise ValueError( + f"Unexpected value for directory_entries_insert_algo: " + f"{self._directory_entries_insert_algo}" ) # Add the directory *after* adding all the entries, so someone diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -729,6 +729,7 @@ self.reset() self.journal_writer = JournalWriter(journal_writer) self._allow_overwrite = False + self._directory_entries_insert_algo = "one-by-one" def reset(self): self._cql_runner = InMemoryCqlRunner() diff --git a/swh/storage/tests/test_cassandra.py b/swh/storage/tests/test_cassandra.py --- a/swh/storage/tests/test_cassandra.py +++ b/swh/storage/tests/test_cassandra.py @@ -23,6 +23,7 @@ from swh.storage.cassandra import create_keyspace from swh.storage.cassandra.model import ContentRow, ExtIDRow from swh.storage.cassandra.schema import HASH_ALGORITHMS, TABLES +from swh.storage.cassandra.storage import DIRECTORY_ENTRIES_INSERT_ALGOS from swh.storage.tests.storage_data import StorageData from swh.storage.tests.storage_tests import ( TestStorageGeneratedData as _TestStorageGeneratedData, @@ -486,29 +487,47 @@ ) assert extids == [extid] - def test_directory_add_atomic(self, swh_storage, sample_data, mocker): + @pytest.mark.parametrize( + "insert_algo,batch_size", + [ + ("one-by-one", None), + ("concurrent", None), + ("batch", 1), + ("batch", 2), + ("batch", 10), + ("batch", 100), + ], + ) + def test_directory_add_algos( + self, swh_storage, sample_data, mocker, insert_algo, batch_size, + ): + mocker.patch.object(swh_storage, "_directory_entries_insert_algo", insert_algo) + mocker.patch("swh.storage.cassandra.cql.BATCH_INSERT_MAX_SIZE", batch_size) + self.test_directory_add(swh_storage, sample_data) + + @pytest.mark.parametrize("insert_algo", DIRECTORY_ENTRIES_INSERT_ALGOS) + def test_directory_add_atomic(self, swh_storage, sample_data, mocker, insert_algo): """Checks that a crash occurring after some directory entries were written does not cause the directory to be (partially) visible. ie. checks directories are added somewhat atomically.""" # Disable the journal writer, it would detect the CrashyEntry exception too # early for this test to be relevant swh_storage.journal_writer.journal = None - - class MyException(Exception): - pass + mocker.patch.object(swh_storage, "_directory_entries_insert_algo", insert_algo) + mocker.patch("swh.storage.cassandra.cql.BATCH_INSERT_MAX_SIZE", 1) class CrashyEntry(DirectoryEntry): def __init__(self): pass def to_dict(self): - raise MyException() + return {**directory.entries[0].to_dict(), "perms": "abcde"} directory = sample_data.directory3 entries = directory.entries directory = attr.evolve(directory, entries=entries + (CrashyEntry(),)) - with pytest.raises(MyException): + with pytest.raises(TypeError): swh_storage.directory_add([directory]) # This should have written some of the entries to the database: