Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/storage.py
Show First 20 Lines • Show All 60 Lines • ▼ Show 20 Lines | |||||
from swh.storage.objstorage import ObjStorage | from swh.storage.objstorage import ObjStorage | ||||
from swh.storage.utils import map_optional, now | from swh.storage.utils import map_optional, now | ||||
from swh.storage.writer import JournalWriter | from swh.storage.writer import JournalWriter | ||||
from . import converters | from . import converters | ||||
from ..exc import HashCollision, StorageArgumentException | from ..exc import HashCollision, StorageArgumentException | ||||
from ..utils import remove_keys | from ..utils import remove_keys | ||||
from .common import TOKEN_BEGIN, TOKEN_END, hash_url | from .common import TOKEN_BEGIN, TOKEN_END, hash_url | ||||
from .cql import CqlRunner | from .cql import SELECT_MISSING_ALGOS, CqlRunner | ||||
from .model import ( | from .model import ( | ||||
ContentRow, | ContentRow, | ||||
DirectoryEntryRow, | DirectoryEntryRow, | ||||
DirectoryRow, | DirectoryRow, | ||||
ExtIDByTargetRow, | ExtIDByTargetRow, | ||||
ExtIDRow, | ExtIDRow, | ||||
MetadataAuthorityRow, | MetadataAuthorityRow, | ||||
MetadataFetcherRow, | MetadataFetcherRow, | ||||
Show All 21 Lines | def __init__( | ||||
hosts, | hosts, | ||||
keyspace, | keyspace, | ||||
objstorage, | objstorage, | ||||
port=9042, | port=9042, | ||||
journal_writer=None, | journal_writer=None, | ||||
allow_overwrite=False, | allow_overwrite=False, | ||||
consistency_level="ONE", | consistency_level="ONE", | ||||
directory_entries_insert_algo="one-by-one", | directory_entries_insert_algo="one-by-one", | ||||
select_missing_algo="grouped-naive", | |||||
): | ): | ||||
""" | """ | ||||
A backend of swh-storage backed by Cassandra | A backend of swh-storage backed by Cassandra | ||||
Args: | Args: | ||||
hosts: Seed Cassandra nodes, to start connecting to the cluster | hosts: Seed Cassandra nodes, to start connecting to the cluster | ||||
keyspace: Name of the Cassandra database to use | keyspace: Name of the Cassandra database to use | ||||
objstorage: Passed as argument to :class:`ObjStorage` | objstorage: Passed as argument to :class:`ObjStorage` | ||||
port: Cassandra port | port: Cassandra port | ||||
journal_writer: Passed as argument to :class:`JournalWriter` | journal_writer: Passed as argument to :class:`JournalWriter` | ||||
allow_overwrite: Whether ``*_add`` functions will check if an object | allow_overwrite: Whether ``*_add`` functions will check if an object | ||||
already exists in the database before sending it in an INSERT. | already exists in the database before sending it in an INSERT. | ||||
``False`` is the default as it is more efficient when there is | ``False`` is the default as it is more efficient when there is | ||||
a moderately high probability the object is already known, | a moderately high probability the object is already known, | ||||
but ``True`` can be useful to overwrite existing objects | but ``True`` can be useful to overwrite existing objects | ||||
(eg. when applying a schema update), | (eg. when applying a schema update), | ||||
or when the database is known to be mostly empty. | or when the database is known to be mostly empty. | ||||
Note that a ``False`` value does not guarantee there won't be | Note that a ``False`` value does not guarantee there won't be | ||||
any overwrite. | any overwrite. | ||||
consistency_level: The default read/write consistency to use | consistency_level: The default read/write consistency to use | ||||
directory_entries_insert_algo: Must be one of: | directory_entries_insert_algo: Must be one of: | ||||
* one-by-one: naive, one INSERT per directory entry, serialized | * one-by-one: naive, one INSERT per directory entry, serialized | ||||
* concurrent: one INSERT per directory entry, concurrent | * concurrent: one INSERT per directory entry, concurrent | ||||
* batch: using UNLOGGED BATCH to insert many entries in a few statements | * batch: using UNLOGGED BATCH to insert many entries in a few statements | ||||
select_missing_algo: Must be one of: | |||||
* concurrent: one SELECT per key, concurrent | |||||
* grouped-naive: group keys, run SELECT on each server, serially | |||||
* grouped-pk-serial: group keys per server, run SELECT on each server, | |||||
server-by-server | |||||
* grouped-pk-concurrent: same as before, but send all server queries | |||||
in parallel | |||||
""" | """ | ||||
self._hosts = hosts | self._hosts = hosts | ||||
self._keyspace = keyspace | self._keyspace = keyspace | ||||
self._port = port | self._port = port | ||||
self._consistency_level = consistency_level | self._consistency_level = consistency_level | ||||
self._set_cql_runner() | |||||
self.journal_writer: JournalWriter = JournalWriter(journal_writer) | |||||
self.objstorage: ObjStorage = ObjStorage(objstorage) | |||||
self._allow_overwrite = allow_overwrite | self._allow_overwrite = allow_overwrite | ||||
if directory_entries_insert_algo not in DIRECTORY_ENTRIES_INSERT_ALGOS: | if directory_entries_insert_algo not in DIRECTORY_ENTRIES_INSERT_ALGOS: | ||||
raise ValueError( | raise ValueError( | ||||
f"directory_entries_insert_algo must be one of: " | f"directory_entries_insert_algo must be one of: " | ||||
f"{', '.join(DIRECTORY_ENTRIES_INSERT_ALGOS)}" | f"{', '.join(DIRECTORY_ENTRIES_INSERT_ALGOS)}" | ||||
) | ) | ||||
self._directory_entries_insert_algo = directory_entries_insert_algo | self._directory_entries_insert_algo = directory_entries_insert_algo | ||||
if select_missing_algo not in SELECT_MISSING_ALGOS: | |||||
raise ValueError( | |||||
f"Configuration error: select_missing_algo has unknown value: " | |||||
f"{self._select_missing_algo}, expected one of: " | |||||
f"{', '.join(SELECT_MISSING_ALGOS)}" | |||||
) | |||||
self._select_missing_algo = select_missing_algo | |||||
self._set_cql_runner() | |||||
self.journal_writer: JournalWriter = JournalWriter(journal_writer) | |||||
self.objstorage: ObjStorage = ObjStorage(objstorage) | |||||
def _set_cql_runner(self): | def _set_cql_runner(self): | ||||
"""Used by tests when they need to reset the CqlRunner""" | """Used by tests when they need to reset the CqlRunner""" | ||||
self._cql_runner: CqlRunner = CqlRunner( | self._cql_runner: CqlRunner = CqlRunner( | ||||
self._hosts, self._keyspace, self._port, self._consistency_level | self._hosts, | ||||
self._keyspace, | |||||
self._port, | |||||
self._consistency_level, | |||||
select_missing_algo=self._select_missing_algo, | |||||
) | ) | ||||
@timed | @timed | ||||
def check_config(self, *, check_write: bool) -> bool: | def check_config(self, *, check_write: bool) -> bool: | ||||
self._cql_runner.check_read() | self._cql_runner.check_read() | ||||
return True | return True | ||||
▲ Show 20 Lines • Show All 1,546 Lines • Show Last 20 Lines |