Page MenuHomeSoftware Heritage

D6139.id22479.diff
No OneTemporary

D6139.id22479.diff

diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py
--- a/swh/storage/cassandra/cql.py
+++ b/swh/storage/cassandra/cql.py
@@ -7,6 +7,7 @@
import dataclasses
import datetime
import functools
+import itertools
import logging
import random
from typing import (
@@ -17,6 +18,7 @@
Iterator,
List,
Optional,
+ Sequence,
Tuple,
Type,
TypeVar,
@@ -25,6 +27,7 @@
from cassandra import ConsistencyLevel, CoordinationFailure
from cassandra.cluster import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, ResultSet
+from cassandra.concurrent import execute_concurrent_with_args
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy
from cassandra.query import BoundStatement, PreparedStatement, dict_factory
from mypy_extensions import NamedArg
@@ -85,6 +88,8 @@
See <https://github.com/scylladb/scylla/pull/4797> for details and rationale.
"""
+BATCH_INSERT_MAX_SIZE = 1000
+
logger = logging.getLogger(__name__)
@@ -166,6 +171,14 @@
TSelf = TypeVar("TSelf")
+def _insert_query(row_class):
+ columns = row_class.cols()
+ return (
+ f"INSERT INTO {row_class.TABLE} ({', '.join(columns)}) "
+ f"VALUES ({', '.join('?' for _ in columns)})"
+ )
+
+
def _prepared_insert_statement(
row_class: Type[BaseRow],
) -> Callable[
@@ -174,11 +187,7 @@
]:
"""Shorthand for using `_prepared_statement` for `INSERT INTO`
statements."""
- columns = row_class.cols()
- return _prepared_statement(
- "INSERT INTO %s (%s) VALUES (%s)"
- % (row_class.TABLE, ", ".join(columns), ", ".join("?" for _ in columns),)
- )
+ return _prepared_statement(_insert_query(row_class))
def _prepared_exists_statement(
@@ -280,12 +289,28 @@
stop=stop_after_attempt(MAX_RETRIES),
retry=retry_if_exception_type(CoordinationFailure),
)
- def _execute_with_retries(self, statement, args) -> ResultSet:
+ def _execute_with_retries(self, statement, args: Optional[Sequence]) -> ResultSet:
return self._session.execute(statement, args, timeout=1000.0)
+ @retry(
+ wait=wait_random_exponential(multiplier=1, max=10),
+ stop=stop_after_attempt(MAX_RETRIES),
+ retry=retry_if_exception_type(CoordinationFailure),
+ )
+ def _execute_many_with_retries(
+ self, statement, args_list: List[Tuple]
+ ) -> ResultSet:
+ return execute_concurrent_with_args(self._session, statement, args_list)
+
def _add_one(self, statement, obj: BaseRow) -> None:
self._execute_with_retries(statement, dataclasses.astuple(obj))
+ def _add_many(self, statement, objs: Sequence[BaseRow]) -> None:
+ tables = {obj.TABLE for obj in objs}
+ assert len(tables) == 1, f"Cannot insert to multiple tables: {tables}"
+ (table,) = tables
+ self._execute_many_with_retries(statement, list(map(dataclasses.astuple, objs)))
+
_T = TypeVar("_T", bound=BaseRow)
def _get_random_row(self, row_class: Type[_T], statement) -> Optional[_T]: # noqa
@@ -665,6 +690,55 @@
def directory_entry_add_one(self, entry: DirectoryEntryRow, *, statement) -> None:
self._add_one(statement, entry)
+ @_prepared_insert_statement(DirectoryEntryRow)
+ def directory_entry_add_concurrent(
+ self, entries: List[DirectoryEntryRow], *, statement
+ ) -> None:
+ if len(entries) == 0:
+ # nothing to do
+ return
+ assert (
+ len({entry.directory_id for entry in entries}) == 1
+ ), "directory_entry_add_many must be called with entries for a single dir"
+ self._add_many(statement, entries)
+
+ @_prepared_statement(
+ "BEGIN UNLOGGED BATCH\n"
+ + (_insert_query(DirectoryEntryRow) + ";\n") * BATCH_INSERT_MAX_SIZE
+ + "APPLY BATCH"
+ )
+ def directory_entry_add_batch(
+ self, entries: List[DirectoryEntryRow], *, statement
+ ) -> None:
+ if len(entries) == 0:
+ # nothing to do
+ return
+ assert (
+ len({entry.directory_id for entry in entries}) == 1
+ ), "directory_entry_add_many must be called with entries for a single dir"
+
+ # query to INSERT one row
+ insert_query = _insert_query(DirectoryEntryRow) + ";\n"
+
+ # In "steady state", we insert batches of the maximum allowed size.
+ # Then, the last one has however many entries remain.
+ last_batch_size = len(entries) % BATCH_INSERT_MAX_SIZE
+ last_statement = self._session.prepare(
+ "BEGIN UNLOGGED BATCH\n" + insert_query * last_batch_size + "APPLY BATCH"
+ )
+
+ for entry_group in grouper(entries, BATCH_INSERT_MAX_SIZE):
+ entry_group = list(map(dataclasses.astuple, entry_group))
+ if len(entry_group) == BATCH_INSERT_MAX_SIZE:
+ self._execute_with_retries(
+ statement, list(itertools.chain.from_iterable(entry_group))
+ )
+ else:
+ assert len(entry_group) == last_batch_size
+ self._execute_with_retries(
+ last_statement, list(itertools.chain.from_iterable(entry_group))
+ )
+
@_prepared_select_statement(DirectoryEntryRow, "WHERE directory_id IN ?")
def directory_entry_get(
self, directory_ids, *, statement
diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py
--- a/swh/storage/cassandra/storage.py
+++ b/swh/storage/cassandra/storage.py
@@ -90,6 +90,8 @@
# Max block size of contents to return
BULK_BLOCK_CONTENT_LEN_MAX = 10000
+DIRECTORY_ENTRIES_INSERT_ALGOS = ["one-by-one", "concurrent", "batch"]
+
class CassandraStorage:
def __init__(
@@ -101,6 +103,7 @@
journal_writer=None,
allow_overwrite=False,
consistency_level="ONE",
+ directory_entries_insert_algo="one-by-one",
):
"""
A backend of swh-storage backed by Cassandra
@@ -121,6 +124,10 @@
Note that a ``False`` value does not guarantee there won't be
any overwrite.
consistency_level: The default read/write consistency to use
+ directory_entries_insert_algo: Must be one of:
+ * one-by-one: naive, one INSERT per directory entry, serialized
+ * concurrent: one INSERT per directory entry, concurrent
+ * batch: using UNLOGGED BATCH to insert many entries in a few statements
"""
self._hosts = hosts
self._keyspace = keyspace
@@ -131,6 +138,13 @@
self.objstorage: ObjStorage = ObjStorage(objstorage)
self._allow_overwrite = allow_overwrite
+ if directory_entries_insert_algo not in DIRECTORY_ENTRIES_INSERT_ALGOS:
+ raise ValueError(
+ f"directory_entries_insert_algo must be one of: "
+ f"{', '.join(DIRECTORY_ENTRIES_INSERT_ALGOS)}"
+ )
+ self._directory_entries_insert_algo = directory_entries_insert_algo
+
def _set_cql_runner(self):
"""Used by tests when they need to reset the CqlRunner"""
self._cql_runner: CqlRunner = CqlRunner(
@@ -458,9 +472,21 @@
for directory in directories:
# Add directory entries to the 'directory_entry' table
- for entry in directory.entries:
- self._cql_runner.directory_entry_add_one(
- DirectoryEntryRow(directory_id=directory.id, **entry.to_dict())
+ rows = [
+ DirectoryEntryRow(directory_id=directory.id, **entry.to_dict())
+ for entry in directory.entries
+ ]
+ if self._directory_entries_insert_algo == "one-by-one":
+ for row in rows:
+ self._cql_runner.directory_entry_add_one(row)
+ elif self._directory_entries_insert_algo == "concurrent":
+ self._cql_runner.directory_entry_add_concurrent(rows)
+ elif self._directory_entries_insert_algo == "batch":
+ self._cql_runner.directory_entry_add_batch(rows)
+ else:
+ raise ValueError(
+ f"Unexpected value for directory_entries_insert_algo: "
+ f"{self._directory_entries_insert_algo}"
)
# Add the directory *after* adding all the entries, so someone
diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py
--- a/swh/storage/in_memory.py
+++ b/swh/storage/in_memory.py
@@ -729,6 +729,7 @@
self.reset()
self.journal_writer = JournalWriter(journal_writer)
self._allow_overwrite = False
+ self._directory_entries_insert_algo = "one-by-one"
def reset(self):
self._cql_runner = InMemoryCqlRunner()
diff --git a/swh/storage/tests/storage_tests.py b/swh/storage/tests/storage_tests.py
--- a/swh/storage/tests/storage_tests.py
+++ b/swh/storage/tests/storage_tests.py
@@ -690,7 +690,7 @@
def test_directory_add(self, swh_storage, sample_data):
content = sample_data.content
- directory = sample_data.directories[1]
+ directory = sample_data.directory
assert directory.entries[0].target == content.sha1_git
swh_storage.content_add([content])
diff --git a/swh/storage/tests/test_cassandra.py b/swh/storage/tests/test_cassandra.py
--- a/swh/storage/tests/test_cassandra.py
+++ b/swh/storage/tests/test_cassandra.py
@@ -18,11 +18,14 @@
import pytest
from swh.core.api.classes import stream_results
+from swh.model import from_disk
from swh.model.model import Directory, DirectoryEntry, Snapshot, SnapshotBranch
from swh.storage import get_storage
from swh.storage.cassandra import create_keyspace
+from swh.storage.cassandra.cql import BATCH_INSERT_MAX_SIZE
from swh.storage.cassandra.model import ContentRow, ExtIDRow
from swh.storage.cassandra.schema import HASH_ALGORITHMS, TABLES
+from swh.storage.cassandra.storage import DIRECTORY_ENTRIES_INSERT_ALGOS
from swh.storage.tests.storage_data import StorageData
from swh.storage.tests.storage_tests import (
TestStorageGeneratedData as _TestStorageGeneratedData,
@@ -486,29 +489,67 @@
)
assert extids == [extid]
- def test_directory_add_atomic(self, swh_storage, sample_data, mocker):
+ def _directory_with_entries(self, sample_data, nb_entries):
+ """Returns a dir with ``nb_entries``, all pointing to
+ the same content"""
+ return Directory(
+ entries=tuple(
+ DirectoryEntry(
+ name=f"file{i:10}".encode(),
+ type="file",
+ target=sample_data.content.sha1_git,
+ perms=from_disk.DentryPerms.directory,
+ )
+ for i in range(nb_entries)
+ )
+ )
+
+ @pytest.mark.parametrize(
+ "insert_algo,nb_entries",
+ [
+ ("one-by-one", 10),
+ ("concurrent", 10),
+ ("batch", 1),
+ ("batch", 2),
+ ("batch", BATCH_INSERT_MAX_SIZE - 1),
+ ("batch", BATCH_INSERT_MAX_SIZE),
+ ("batch", BATCH_INSERT_MAX_SIZE + 1),
+ ("batch", BATCH_INSERT_MAX_SIZE * 2),
+ ],
+ )
+ def test_directory_add_algos(
+ self, swh_storage, sample_data, mocker, insert_algo, nb_entries,
+ ):
+ mocker.patch.object(swh_storage, "_directory_entries_insert_algo", insert_algo)
+
+ class new_sample_data:
+ content = sample_data.content
+ directory = self._directory_with_entries(sample_data, nb_entries)
+
+ self.test_directory_add(swh_storage, new_sample_data)
+
+ @pytest.mark.parametrize("insert_algo", DIRECTORY_ENTRIES_INSERT_ALGOS)
+ def test_directory_add_atomic(self, swh_storage, sample_data, mocker, insert_algo):
"""Checks that a crash occurring after some directory entries were written
does not cause the directory to be (partially) visible.
ie. checks directories are added somewhat atomically."""
# Disable the journal writer, it would detect the CrashyEntry exception too
# early for this test to be relevant
swh_storage.journal_writer.journal = None
-
- class MyException(Exception):
- pass
+ mocker.patch.object(swh_storage, "_directory_entries_insert_algo", insert_algo)
class CrashyEntry(DirectoryEntry):
def __init__(self):
pass
def to_dict(self):
- raise MyException()
+ return {**directory.entries[0].to_dict(), "perms": "abcde"}
- directory = sample_data.directory3
+ directory = self._directory_with_entries(sample_data, BATCH_INSERT_MAX_SIZE)
entries = directory.entries
directory = attr.evolve(directory, entries=entries + (CrashyEntry(),))
- with pytest.raises(MyException):
+ with pytest.raises(TypeError):
swh_storage.directory_add([directory])
# This should have written some of the entries to the database:

File Metadata

Mime Type
text/plain
Expires
Dec 20 2024, 7:06 AM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3229880

Event Timeline