Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7123951
D6139.id22479.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
12 KB
Subscribers
None
D6139.id22479.diff
View Options
diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py
--- a/swh/storage/cassandra/cql.py
+++ b/swh/storage/cassandra/cql.py
@@ -7,6 +7,7 @@
import dataclasses
import datetime
import functools
+import itertools
import logging
import random
from typing import (
@@ -17,6 +18,7 @@
Iterator,
List,
Optional,
+ Sequence,
Tuple,
Type,
TypeVar,
@@ -25,6 +27,7 @@
from cassandra import ConsistencyLevel, CoordinationFailure
from cassandra.cluster import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, ResultSet
+from cassandra.concurrent import execute_concurrent_with_args
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy
from cassandra.query import BoundStatement, PreparedStatement, dict_factory
from mypy_extensions import NamedArg
@@ -85,6 +88,8 @@
See <https://github.com/scylladb/scylla/pull/4797> for details and rationale.
"""
+BATCH_INSERT_MAX_SIZE = 1000
+
logger = logging.getLogger(__name__)
@@ -166,6 +171,14 @@
TSelf = TypeVar("TSelf")
+def _insert_query(row_class):
+ columns = row_class.cols()
+ return (
+ f"INSERT INTO {row_class.TABLE} ({', '.join(columns)}) "
+ f"VALUES ({', '.join('?' for _ in columns)})"
+ )
+
+
def _prepared_insert_statement(
row_class: Type[BaseRow],
) -> Callable[
@@ -174,11 +187,7 @@
]:
"""Shorthand for using `_prepared_statement` for `INSERT INTO`
statements."""
- columns = row_class.cols()
- return _prepared_statement(
- "INSERT INTO %s (%s) VALUES (%s)"
- % (row_class.TABLE, ", ".join(columns), ", ".join("?" for _ in columns),)
- )
+ return _prepared_statement(_insert_query(row_class))
def _prepared_exists_statement(
@@ -280,12 +289,28 @@
stop=stop_after_attempt(MAX_RETRIES),
retry=retry_if_exception_type(CoordinationFailure),
)
- def _execute_with_retries(self, statement, args) -> ResultSet:
+ def _execute_with_retries(self, statement, args: Optional[Sequence]) -> ResultSet:
return self._session.execute(statement, args, timeout=1000.0)
+ @retry(
+ wait=wait_random_exponential(multiplier=1, max=10),
+ stop=stop_after_attempt(MAX_RETRIES),
+ retry=retry_if_exception_type(CoordinationFailure),
+ )
+ def _execute_many_with_retries(
+ self, statement, args_list: List[Tuple]
+ ) -> ResultSet:
+ return execute_concurrent_with_args(self._session, statement, args_list)
+
def _add_one(self, statement, obj: BaseRow) -> None:
self._execute_with_retries(statement, dataclasses.astuple(obj))
+ def _add_many(self, statement, objs: Sequence[BaseRow]) -> None:
+ tables = {obj.TABLE for obj in objs}
+ assert len(tables) == 1, f"Cannot insert to multiple tables: {tables}"
+ (table,) = tables
+ self._execute_many_with_retries(statement, list(map(dataclasses.astuple, objs)))
+
_T = TypeVar("_T", bound=BaseRow)
def _get_random_row(self, row_class: Type[_T], statement) -> Optional[_T]: # noqa
@@ -665,6 +690,55 @@
def directory_entry_add_one(self, entry: DirectoryEntryRow, *, statement) -> None:
self._add_one(statement, entry)
+ @_prepared_insert_statement(DirectoryEntryRow)
+ def directory_entry_add_concurrent(
+ self, entries: List[DirectoryEntryRow], *, statement
+ ) -> None:
+ if len(entries) == 0:
+ # nothing to do
+ return
+ assert (
+ len({entry.directory_id for entry in entries}) == 1
+ ), "directory_entry_add_many must be called with entries for a single dir"
+ self._add_many(statement, entries)
+
+ @_prepared_statement(
+ "BEGIN UNLOGGED BATCH\n"
+ + (_insert_query(DirectoryEntryRow) + ";\n") * BATCH_INSERT_MAX_SIZE
+ + "APPLY BATCH"
+ )
+ def directory_entry_add_batch(
+ self, entries: List[DirectoryEntryRow], *, statement
+ ) -> None:
+ if len(entries) == 0:
+ # nothing to do
+ return
+ assert (
+ len({entry.directory_id for entry in entries}) == 1
+ ), "directory_entry_add_many must be called with entries for a single dir"
+
+ # query to INSERT one row
+ insert_query = _insert_query(DirectoryEntryRow) + ";\n"
+
+ # In "steady state", we insert batches of the maximum allowed size.
+ # Then, the last one has however many entries remain.
+ last_batch_size = len(entries) % BATCH_INSERT_MAX_SIZE
+ last_statement = self._session.prepare(
+ "BEGIN UNLOGGED BATCH\n" + insert_query * last_batch_size + "APPLY BATCH"
+ )
+
+ for entry_group in grouper(entries, BATCH_INSERT_MAX_SIZE):
+ entry_group = list(map(dataclasses.astuple, entry_group))
+ if len(entry_group) == BATCH_INSERT_MAX_SIZE:
+ self._execute_with_retries(
+ statement, list(itertools.chain.from_iterable(entry_group))
+ )
+ else:
+ assert len(entry_group) == last_batch_size
+ self._execute_with_retries(
+ last_statement, list(itertools.chain.from_iterable(entry_group))
+ )
+
@_prepared_select_statement(DirectoryEntryRow, "WHERE directory_id IN ?")
def directory_entry_get(
self, directory_ids, *, statement
diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py
--- a/swh/storage/cassandra/storage.py
+++ b/swh/storage/cassandra/storage.py
@@ -90,6 +90,8 @@
# Max block size of contents to return
BULK_BLOCK_CONTENT_LEN_MAX = 10000
+DIRECTORY_ENTRIES_INSERT_ALGOS = ["one-by-one", "concurrent", "batch"]
+
class CassandraStorage:
def __init__(
@@ -101,6 +103,7 @@
journal_writer=None,
allow_overwrite=False,
consistency_level="ONE",
+ directory_entries_insert_algo="one-by-one",
):
"""
A backend of swh-storage backed by Cassandra
@@ -121,6 +124,10 @@
Note that a ``False`` value does not guarantee there won't be
any overwrite.
consistency_level: The default read/write consistency to use
+ directory_entries_insert_algo: Must be one of:
+ * one-by-one: naive, one INSERT per directory entry, serialized
+ * concurrent: one INSERT per directory entry, concurrent
+ * batch: using UNLOGGED BATCH to insert many entries in a few statements
"""
self._hosts = hosts
self._keyspace = keyspace
@@ -131,6 +138,13 @@
self.objstorage: ObjStorage = ObjStorage(objstorage)
self._allow_overwrite = allow_overwrite
+ if directory_entries_insert_algo not in DIRECTORY_ENTRIES_INSERT_ALGOS:
+ raise ValueError(
+ f"directory_entries_insert_algo must be one of: "
+ f"{', '.join(DIRECTORY_ENTRIES_INSERT_ALGOS)}"
+ )
+ self._directory_entries_insert_algo = directory_entries_insert_algo
+
def _set_cql_runner(self):
"""Used by tests when they need to reset the CqlRunner"""
self._cql_runner: CqlRunner = CqlRunner(
@@ -458,9 +472,21 @@
for directory in directories:
# Add directory entries to the 'directory_entry' table
- for entry in directory.entries:
- self._cql_runner.directory_entry_add_one(
- DirectoryEntryRow(directory_id=directory.id, **entry.to_dict())
+ rows = [
+ DirectoryEntryRow(directory_id=directory.id, **entry.to_dict())
+ for entry in directory.entries
+ ]
+ if self._directory_entries_insert_algo == "one-by-one":
+ for row in rows:
+ self._cql_runner.directory_entry_add_one(row)
+ elif self._directory_entries_insert_algo == "concurrent":
+ self._cql_runner.directory_entry_add_concurrent(rows)
+ elif self._directory_entries_insert_algo == "batch":
+ self._cql_runner.directory_entry_add_batch(rows)
+ else:
+ raise ValueError(
+ f"Unexpected value for directory_entries_insert_algo: "
+ f"{self._directory_entries_insert_algo}"
)
# Add the directory *after* adding all the entries, so someone
diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py
--- a/swh/storage/in_memory.py
+++ b/swh/storage/in_memory.py
@@ -729,6 +729,7 @@
self.reset()
self.journal_writer = JournalWriter(journal_writer)
self._allow_overwrite = False
+ self._directory_entries_insert_algo = "one-by-one"
def reset(self):
self._cql_runner = InMemoryCqlRunner()
diff --git a/swh/storage/tests/storage_tests.py b/swh/storage/tests/storage_tests.py
--- a/swh/storage/tests/storage_tests.py
+++ b/swh/storage/tests/storage_tests.py
@@ -690,7 +690,7 @@
def test_directory_add(self, swh_storage, sample_data):
content = sample_data.content
- directory = sample_data.directories[1]
+ directory = sample_data.directory
assert directory.entries[0].target == content.sha1_git
swh_storage.content_add([content])
diff --git a/swh/storage/tests/test_cassandra.py b/swh/storage/tests/test_cassandra.py
--- a/swh/storage/tests/test_cassandra.py
+++ b/swh/storage/tests/test_cassandra.py
@@ -18,11 +18,14 @@
import pytest
from swh.core.api.classes import stream_results
+from swh.model import from_disk
from swh.model.model import Directory, DirectoryEntry, Snapshot, SnapshotBranch
from swh.storage import get_storage
from swh.storage.cassandra import create_keyspace
+from swh.storage.cassandra.cql import BATCH_INSERT_MAX_SIZE
from swh.storage.cassandra.model import ContentRow, ExtIDRow
from swh.storage.cassandra.schema import HASH_ALGORITHMS, TABLES
+from swh.storage.cassandra.storage import DIRECTORY_ENTRIES_INSERT_ALGOS
from swh.storage.tests.storage_data import StorageData
from swh.storage.tests.storage_tests import (
TestStorageGeneratedData as _TestStorageGeneratedData,
@@ -486,29 +489,67 @@
)
assert extids == [extid]
- def test_directory_add_atomic(self, swh_storage, sample_data, mocker):
+ def _directory_with_entries(self, sample_data, nb_entries):
+ """Returns a dir with ``nb_entries``, all pointing to
+ the same content"""
+ return Directory(
+ entries=tuple(
+ DirectoryEntry(
+ name=f"file{i:10}".encode(),
+ type="file",
+ target=sample_data.content.sha1_git,
+ perms=from_disk.DentryPerms.directory,
+ )
+ for i in range(nb_entries)
+ )
+ )
+
+ @pytest.mark.parametrize(
+ "insert_algo,nb_entries",
+ [
+ ("one-by-one", 10),
+ ("concurrent", 10),
+ ("batch", 1),
+ ("batch", 2),
+ ("batch", BATCH_INSERT_MAX_SIZE - 1),
+ ("batch", BATCH_INSERT_MAX_SIZE),
+ ("batch", BATCH_INSERT_MAX_SIZE + 1),
+ ("batch", BATCH_INSERT_MAX_SIZE * 2),
+ ],
+ )
+ def test_directory_add_algos(
+ self, swh_storage, sample_data, mocker, insert_algo, nb_entries,
+ ):
+ mocker.patch.object(swh_storage, "_directory_entries_insert_algo", insert_algo)
+
+ class new_sample_data:
+ content = sample_data.content
+ directory = self._directory_with_entries(sample_data, nb_entries)
+
+ self.test_directory_add(swh_storage, new_sample_data)
+
+ @pytest.mark.parametrize("insert_algo", DIRECTORY_ENTRIES_INSERT_ALGOS)
+ def test_directory_add_atomic(self, swh_storage, sample_data, mocker, insert_algo):
"""Checks that a crash occurring after some directory entries were written
does not cause the directory to be (partially) visible.
ie. checks directories are added somewhat atomically."""
# Disable the journal writer, it would detect the CrashyEntry exception too
# early for this test to be relevant
swh_storage.journal_writer.journal = None
-
- class MyException(Exception):
- pass
+ mocker.patch.object(swh_storage, "_directory_entries_insert_algo", insert_algo)
class CrashyEntry(DirectoryEntry):
def __init__(self):
pass
def to_dict(self):
- raise MyException()
+ return {**directory.entries[0].to_dict(), "perms": "abcde"}
- directory = sample_data.directory3
+ directory = self._directory_with_entries(sample_data, BATCH_INSERT_MAX_SIZE)
entries = directory.entries
directory = attr.evolve(directory, entries=entries + (CrashyEntry(),))
- with pytest.raises(MyException):
+ with pytest.raises(TypeError):
swh_storage.directory_add([directory])
# This should have written some of the entries to the database:
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 20 2024, 7:06 AM (11 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3229880
Attached To
D6139: cassandra: Add option to select (hopefully) more efficient batch insertion algos
Event Timeline
Log In to Comment