diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -7,6 +7,7 @@ import dataclasses import datetime import functools +import itertools import logging import random from typing import ( @@ -17,6 +18,7 @@ Iterator, List, Optional, + Sequence, Tuple, Type, TypeVar, @@ -25,6 +27,7 @@ from cassandra import ConsistencyLevel, CoordinationFailure from cassandra.cluster import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, ResultSet +from cassandra.concurrent import execute_concurrent_with_args from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy from cassandra.query import BoundStatement, PreparedStatement, dict_factory from mypy_extensions import NamedArg @@ -85,6 +88,8 @@ See for details and rationale. """ +BATCH_INSERT_MAX_SIZE = 1000 + logger = logging.getLogger(__name__) @@ -166,6 +171,14 @@ TSelf = TypeVar("TSelf") +def _insert_query(row_class): + columns = row_class.cols() + return ( + f"INSERT INTO {row_class.TABLE} ({', '.join(columns)}) " + f"VALUES ({', '.join('?' for _ in columns)})" + ) + + def _prepared_insert_statement( row_class: Type[BaseRow], ) -> Callable[ @@ -174,11 +187,7 @@ ]: """Shorthand for using `_prepared_statement` for `INSERT INTO` statements.""" - columns = row_class.cols() - return _prepared_statement( - "INSERT INTO %s (%s) VALUES (%s)" - % (row_class.TABLE, ", ".join(columns), ", ".join("?" for _ in columns),) - ) + return _prepared_statement(_insert_query(row_class)) def _prepared_exists_statement( @@ -280,9 +289,19 @@ stop=stop_after_attempt(MAX_RETRIES), retry=retry_if_exception_type(CoordinationFailure), ) - def _execute_with_retries(self, statement, args) -> ResultSet: + def _execute_with_retries(self, statement, args: Optional[Sequence]) -> ResultSet: return self._session.execute(statement, args, timeout=1000.0) + @retry( + wait=wait_random_exponential(multiplier=1, max=10), + stop=stop_after_attempt(MAX_RETRIES), + retry=retry_if_exception_type(CoordinationFailure), + ) + def _execute_many_with_retries( + self, statement, args_list: List[Tuple] + ) -> ResultSet: + return execute_concurrent_with_args(self._session, statement, args_list) + @_prepared_statement( "UPDATE object_count SET count = count + ? " "WHERE partition_key = 0 AND object_type = ?" @@ -296,6 +315,13 @@ self._increment_counter(obj.TABLE, 1) self._execute_with_retries(statement, dataclasses.astuple(obj)) + def _add_many(self, statement, objs: Sequence[BaseRow]) -> None: + tables = {obj.TABLE for obj in objs} + assert len(tables) == 1, f"Cannot insert to multiple tables: {tables}" + (table,) = tables + self._increment_counter(table, len(objs)) + self._execute_many_with_retries(statement, list(map(dataclasses.astuple, objs))) + _T = TypeVar("_T", bound=BaseRow) def _get_random_row(self, row_class: Type[_T], statement) -> Optional[_T]: # noqa @@ -677,6 +703,56 @@ def directory_entry_add_one(self, entry: DirectoryEntryRow, *, statement) -> None: self._add_one(statement, entry) + @_prepared_insert_statement(DirectoryEntryRow) + def directory_entry_add_concurrent( + self, entries: List[DirectoryEntryRow], *, statement + ) -> None: + if len(entries) == 0: + # nothing to do + return + assert ( + len({entry.directory_id for entry in entries}) == 1 + ), "directory_entry_add_many must be called with entries for a single dir" + self._add_many(statement, entries) + + def directory_entry_add_batch(self, entries: List[DirectoryEntryRow],) -> None: + if len(entries) == 0: + # nothing to do + return + assert ( + len({entry.directory_id for entry in entries}) == 1 + ), "directory_entry_add_many must be called with entries for a single dir" + + # query to INSERT one row + insert_query = _insert_query(DirectoryEntryRow) + ";\n" + + # In "steady state", we insert batches of the maximum allowed size. + # Then, the last one has however many entries remain. + last_batch_size = len(entries) % BATCH_INSERT_MAX_SIZE + if len(entries) >= BATCH_INSERT_MAX_SIZE: + # TODO: the main_statement's size is statically known, so we could avoid + # re-preparing it on every call + main_statement = self._session.prepare( + "BEGIN UNLOGGED BATCH\n" + + insert_query * BATCH_INSERT_MAX_SIZE + + "APPLY BATCH" + ) + last_statement = self._session.prepare( + "BEGIN UNLOGGED BATCH\n" + insert_query * last_batch_size + "APPLY BATCH" + ) + + for entry_group in grouper(entries, BATCH_INSERT_MAX_SIZE): + entry_group = list(map(dataclasses.astuple, entry_group)) + if len(entry_group) == BATCH_INSERT_MAX_SIZE: + self._execute_with_retries( + main_statement, list(itertools.chain.from_iterable(entry_group)) + ) + else: + assert len(entry_group) == last_batch_size + self._execute_with_retries( + last_statement, list(itertools.chain.from_iterable(entry_group)) + ) + @_prepared_select_statement(DirectoryEntryRow, "WHERE directory_id IN ?") def directory_entry_get( self, directory_ids, *, statement diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -100,6 +100,7 @@ journal_writer=None, allow_overwrite=False, consistency_level="ONE", + directory_entries_insert_algo="one-by-one", ): """ A backend of swh-storage backed by Cassandra @@ -120,6 +121,10 @@ Note that a ``False`` value does not guarantee there won't be any overwrite. consistency_level: The default read/write consistency to use + directory_entries_insert_algo: Must be one of: + * one-by-one: naive, one INSERT per directory entry, serialized + * concurrent: one INSERT per directory entry, concurrent + * batch: using UNLOGGED BATCH to insert many entries in a few statements """ self._hosts = hosts self._keyspace = keyspace @@ -129,6 +134,7 @@ self.journal_writer: JournalWriter = JournalWriter(journal_writer) self.objstorage: ObjStorage = ObjStorage(objstorage) self._allow_overwrite = allow_overwrite + self._directory_entries_insert_algo = directory_entries_insert_algo def _set_cql_runner(self): """Used by tests when they need to reset the CqlRunner""" @@ -438,9 +444,21 @@ for directory in directories: # Add directory entries to the 'directory_entry' table - for entry in directory.entries: - self._cql_runner.directory_entry_add_one( - DirectoryEntryRow(directory_id=directory.id, **entry.to_dict()) + rows = [ + DirectoryEntryRow(directory_id=directory.id, **entry.to_dict()) + for entry in directory.entries + ] + if self._directory_entries_insert_algo == "one-by-one": + for row in rows: + self._cql_runner.directory_entry_add_one(row) + elif self._directory_entries_insert_algo == "concurrent": + self._cql_runner.directory_entry_add_concurrent(rows) + elif self._directory_entries_insert_algo == "batch": + self._cql_runner.directory_entry_add_batch(rows) + else: + raise ValueError( + f"Unexpected value for directory_entries_insert_algo: " + f"{self._directory_entries_insert_algo}" ) # Add the directory *after* adding all the entries, so someone diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -729,6 +729,7 @@ self.reset() self.journal_writer = JournalWriter(journal_writer) self._allow_overwrite = False + self._directory_entries_insert_algo = "one-by-one" def reset(self): self._cql_runner = InMemoryCqlRunner()