Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/storage.py
Show First 20 Lines • Show All 84 Lines • ▼ Show 20 Lines | from .model import ( | ||||
SnapshotBranchRow, | SnapshotBranchRow, | ||||
SnapshotRow, | SnapshotRow, | ||||
) | ) | ||||
from .schema import HASH_ALGORITHMS | from .schema import HASH_ALGORITHMS | ||||
# Max block size of contents to return | # Max block size of contents to return | ||||
BULK_BLOCK_CONTENT_LEN_MAX = 10000 | BULK_BLOCK_CONTENT_LEN_MAX = 10000 | ||||
DIRECTORY_ENTRIES_INSERT_ALGOS = ["one-by-one", "concurrent", "batch"] | |||||
class CassandraStorage: | class CassandraStorage: | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
hosts, | hosts, | ||||
keyspace, | keyspace, | ||||
objstorage, | objstorage, | ||||
port=9042, | port=9042, | ||||
journal_writer=None, | journal_writer=None, | ||||
allow_overwrite=False, | allow_overwrite=False, | ||||
consistency_level="ONE", | consistency_level="ONE", | ||||
directory_entries_insert_algo="one-by-one", | |||||
): | ): | ||||
""" | """ | ||||
A backend of swh-storage backed by Cassandra | A backend of swh-storage backed by Cassandra | ||||
Args: | Args: | ||||
hosts: Seed Cassandra nodes, to start connecting to the cluster | hosts: Seed Cassandra nodes, to start connecting to the cluster | ||||
keyspace: Name of the Cassandra database to use | keyspace: Name of the Cassandra database to use | ||||
objstorage: Passed as argument to :class:`ObjStorage` | objstorage: Passed as argument to :class:`ObjStorage` | ||||
port: Cassandra port | port: Cassandra port | ||||
journal_writer: Passed as argument to :class:`JournalWriter` | journal_writer: Passed as argument to :class:`JournalWriter` | ||||
allow_overwrite: Whether ``*_add`` functions will check if an object | allow_overwrite: Whether ``*_add`` functions will check if an object | ||||
already exists in the database before sending it in an INSERT. | already exists in the database before sending it in an INSERT. | ||||
``False`` is the default as it is more efficient when there is | ``False`` is the default as it is more efficient when there is | ||||
a moderately high probability the object is already known, | a moderately high probability the object is already known, | ||||
but ``True`` can be useful to overwrite existing objects | but ``True`` can be useful to overwrite existing objects | ||||
(eg. when applying a schema update), | (eg. when applying a schema update), | ||||
or when the database is known to be mostly empty. | or when the database is known to be mostly empty. | ||||
Note that a ``False`` value does not guarantee there won't be | Note that a ``False`` value does not guarantee there won't be | ||||
any overwrite. | any overwrite. | ||||
consistency_level: The default read/write consistency to use | consistency_level: The default read/write consistency to use | ||||
directory_entries_insert_algo: Must be one of: | |||||
* one-by-one: naive, one INSERT per directory entry, serialized | |||||
* concurrent: one INSERT per directory entry, concurrent | |||||
* batch: using UNLOGGED BATCH to insert many entries in a few statements | |||||
""" | """ | ||||
self._hosts = hosts | self._hosts = hosts | ||||
self._keyspace = keyspace | self._keyspace = keyspace | ||||
self._port = port | self._port = port | ||||
self._consistency_level = consistency_level | self._consistency_level = consistency_level | ||||
self._set_cql_runner() | self._set_cql_runner() | ||||
self.journal_writer: JournalWriter = JournalWriter(journal_writer) | self.journal_writer: JournalWriter = JournalWriter(journal_writer) | ||||
self.objstorage: ObjStorage = ObjStorage(objstorage) | self.objstorage: ObjStorage = ObjStorage(objstorage) | ||||
self._allow_overwrite = allow_overwrite | self._allow_overwrite = allow_overwrite | ||||
if directory_entries_insert_algo not in DIRECTORY_ENTRIES_INSERT_ALGOS: | |||||
raise ValueError( | |||||
f"directory_entries_insert_algo must be one of: " | |||||
f"{', '.join(DIRECTORY_ENTRIES_INSERT_ALGOS)}" | |||||
) | |||||
self._directory_entries_insert_algo = directory_entries_insert_algo | |||||
def _set_cql_runner(self): | def _set_cql_runner(self): | ||||
"""Used by tests when they need to reset the CqlRunner""" | """Used by tests when they need to reset the CqlRunner""" | ||||
self._cql_runner: CqlRunner = CqlRunner( | self._cql_runner: CqlRunner = CqlRunner( | ||||
self._hosts, self._keyspace, self._port, self._consistency_level | self._hosts, self._keyspace, self._port, self._consistency_level | ||||
) | ) | ||||
@timed | @timed | ||||
def check_config(self, *, check_write: bool) -> bool: | def check_config(self, *, check_write: bool) -> bool: | ||||
▲ Show 20 Lines • Show All 311 Lines • ▼ Show 20 Lines | def directory_add(self, directories: List[Directory]) -> Dict[str, int]: | ||||
# Filter out directories that are already inserted. | # Filter out directories that are already inserted. | ||||
missing = self.directory_missing([dir_.id for dir_ in to_add]) | missing = self.directory_missing([dir_.id for dir_ in to_add]) | ||||
directories = [dir_ for dir_ in directories if dir_.id in missing] | directories = [dir_ for dir_ in directories if dir_.id in missing] | ||||
self.journal_writer.directory_add(directories) | self.journal_writer.directory_add(directories) | ||||
for directory in directories: | for directory in directories: | ||||
# Add directory entries to the 'directory_entry' table | # Add directory entries to the 'directory_entry' table | ||||
for entry in directory.entries: | rows = [ | ||||
self._cql_runner.directory_entry_add_one( | |||||
DirectoryEntryRow(directory_id=directory.id, **entry.to_dict()) | DirectoryEntryRow(directory_id=directory.id, **entry.to_dict()) | ||||
for entry in directory.entries | |||||
] | |||||
if self._directory_entries_insert_algo == "one-by-one": | |||||
for row in rows: | |||||
self._cql_runner.directory_entry_add_one(row) | |||||
elif self._directory_entries_insert_algo == "concurrent": | |||||
self._cql_runner.directory_entry_add_concurrent(rows) | |||||
elif self._directory_entries_insert_algo == "batch": | |||||
self._cql_runner.directory_entry_add_batch(rows) | |||||
else: | |||||
raise ValueError( | |||||
f"Unexpected value for directory_entries_insert_algo: " | |||||
f"{self._directory_entries_insert_algo}" | |||||
) | ) | ||||
# Add the directory *after* adding all the entries, so someone | # Add the directory *after* adding all the entries, so someone | ||||
# calling snapshot_get_branch in the meantime won't end up | # calling snapshot_get_branch in the meantime won't end up | ||||
# with half the entries. | # with half the entries. | ||||
self._cql_runner.directory_add_one(DirectoryRow(id=directory.id)) | self._cql_runner.directory_add_one(DirectoryRow(id=directory.id)) | ||||
return {"directory:add": len(directories)} | return {"directory:add": len(directories)} | ||||
▲ Show 20 Lines • Show All 1,165 Lines • Show Last 20 Lines |