Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_cassandra.py
Show All 17 Lines | |||||
import pytest | import pytest | ||||
from swh.core.api.classes import stream_results | from swh.core.api.classes import stream_results | ||||
from swh.model.model import Directory, DirectoryEntry, Snapshot, SnapshotBranch | from swh.model.model import Directory, DirectoryEntry, Snapshot, SnapshotBranch | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.cassandra import create_keyspace | from swh.storage.cassandra import create_keyspace | ||||
from swh.storage.cassandra.model import ContentRow, ExtIDRow | from swh.storage.cassandra.model import ContentRow, ExtIDRow | ||||
from swh.storage.cassandra.schema import HASH_ALGORITHMS, TABLES | from swh.storage.cassandra.schema import HASH_ALGORITHMS, TABLES | ||||
from swh.storage.cassandra.storage import DIRECTORY_ENTRIES_INSERT_ALGOS | |||||
from swh.storage.tests.storage_data import StorageData | from swh.storage.tests.storage_data import StorageData | ||||
from swh.storage.tests.storage_tests import ( | from swh.storage.tests.storage_tests import ( | ||||
TestStorageGeneratedData as _TestStorageGeneratedData, | TestStorageGeneratedData as _TestStorageGeneratedData, | ||||
) | ) | ||||
from swh.storage.tests.storage_tests import TestStorage as _TestStorage | from swh.storage.tests.storage_tests import TestStorage as _TestStorage | ||||
from swh.storage.utils import now, remove_keys | from swh.storage.utils import now, remove_keys | ||||
CONFIG_TEMPLATE = """ | CONFIG_TEMPLATE = """ | ||||
▲ Show 20 Lines • Show All 447 Lines • ▼ Show 20 Lines | def test_extid_murmur3_collision(self, swh_storage, mocker, sample_data): | ||||
) | ) | ||||
for extid in sample_data.extids: | for extid in sample_data.extids: | ||||
extids = swh_storage.extid_get_from_target( | extids = swh_storage.extid_get_from_target( | ||||
target_type=extid.target.object_type, ids=[extid.target.object_id] | target_type=extid.target.object_type, ids=[extid.target.object_id] | ||||
) | ) | ||||
assert extids == [extid] | assert extids == [extid] | ||||
def test_directory_add_atomic(self, swh_storage, sample_data, mocker): | @pytest.mark.parametrize( | ||||
"insert_algo,batch_size", | |||||
[ | |||||
("one-by-one", None), | |||||
("concurrent", None), | |||||
("batch", 1), | |||||
("batch", 2), | |||||
("batch", 10), | |||||
("batch", 100), | |||||
], | |||||
) | |||||
def test_directory_add_algos( | |||||
self, swh_storage, sample_data, mocker, insert_algo, batch_size, | |||||
): | |||||
mocker.patch.object(swh_storage, "_directory_entries_insert_algo", insert_algo) | |||||
mocker.patch("swh.storage.cassandra.cql.BATCH_INSERT_MAX_SIZE", batch_size) | |||||
self.test_directory_add(swh_storage, sample_data) | |||||
@pytest.mark.parametrize("insert_algo", DIRECTORY_ENTRIES_INSERT_ALGOS) | |||||
def test_directory_add_atomic(self, swh_storage, sample_data, mocker, insert_algo): | |||||
"""Checks that a crash occurring after some directory entries were written | """Checks that a crash occurring after some directory entries were written | ||||
does not cause the directory to be (partially) visible. | does not cause the directory to be (partially) visible. | ||||
ie. checks directories are added somewhat atomically.""" | ie. checks directories are added somewhat atomically.""" | ||||
# Disable the journal writer, it would detect the CrashyEntry exception too | # Disable the journal writer, it would detect the CrashyEntry exception too | ||||
# early for this test to be relevant | # early for this test to be relevant | ||||
swh_storage.journal_writer.journal = None | swh_storage.journal_writer.journal = None | ||||
mocker.patch.object(swh_storage, "_directory_entries_insert_algo", insert_algo) | |||||
class MyException(Exception): | mocker.patch("swh.storage.cassandra.cql.BATCH_INSERT_MAX_SIZE", 1) | ||||
pass | |||||
class CrashyEntry(DirectoryEntry): | class CrashyEntry(DirectoryEntry): | ||||
def __init__(self): | def __init__(self): | ||||
pass | pass | ||||
def to_dict(self): | def to_dict(self): | ||||
raise MyException() | return {**directory.entries[0].to_dict(), "perms": "abcde"} | ||||
directory = sample_data.directory3 | directory = sample_data.directory3 | ||||
entries = directory.entries | entries = directory.entries | ||||
directory = attr.evolve(directory, entries=entries + (CrashyEntry(),)) | directory = attr.evolve(directory, entries=entries + (CrashyEntry(),)) | ||||
with pytest.raises(MyException): | with pytest.raises(TypeError): | ||||
swh_storage.directory_add([directory]) | swh_storage.directory_add([directory]) | ||||
# This should have written some of the entries to the database: | # This should have written some of the entries to the database: | ||||
entry_rows = swh_storage._cql_runner.directory_entry_get([directory.id]) | entry_rows = swh_storage._cql_runner.directory_entry_get([directory.id]) | ||||
assert {row.name for row in entry_rows} == {entry.name for entry in entries} | assert {row.name for row in entry_rows} == {entry.name for entry in entries} | ||||
# BUT, because not all the entries were written, the directory should | # BUT, because not all the entries were written, the directory should | ||||
# be considered not written. | # be considered not written. | ||||
▲ Show 20 Lines • Show All 185 Lines • Show Last 20 Lines |