Page MenuHomeSoftware Heritage

test_cassandra.py
No OneTemporary

test_cassandra.py

# Copyright (C) 2018-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import itertools
import os
import resource
import signal
import socket
import subprocess
import time
from typing import Any, Dict
import attr
from cassandra.cluster import NoHostAvailable
import pytest
from swh.core.api.classes import stream_results
from swh.model import from_disk
from swh.model.model import Directory, DirectoryEntry, Snapshot, SnapshotBranch
from swh.storage import get_storage
from swh.storage.cassandra import create_keyspace
from swh.storage.cassandra.cql import BATCH_INSERT_MAX_SIZE
from swh.storage.cassandra.model import ContentRow, ExtIDRow
from swh.storage.cassandra.schema import HASH_ALGORITHMS, TABLES
from swh.storage.cassandra.storage import DIRECTORY_ENTRIES_INSERT_ALGOS
from swh.storage.tests.storage_data import StorageData
from swh.storage.tests.storage_tests import (
TestStorageGeneratedData as _TestStorageGeneratedData,
)
from swh.storage.tests.storage_tests import TestStorage as _TestStorage
from swh.storage.utils import now, remove_keys
CONFIG_TEMPLATE = """
data_file_directories:
- {data_dir}/data
commitlog_directory: {data_dir}/commitlog
hints_directory: {data_dir}/hints
saved_caches_directory: {data_dir}/saved_caches
commitlog_sync: periodic
commitlog_sync_period_in_ms: 1000000
partitioner: org.apache.cassandra.dht.Murmur3Partitioner
endpoint_snitch: SimpleSnitch
seed_provider:
- class_name: org.apache.cassandra.locator.SimpleSeedProvider
parameters:
- seeds: "127.0.0.1"
storage_port: {storage_port}
native_transport_port: {native_transport_port}
start_native_transport: true
listen_address: 127.0.0.1
enable_user_defined_functions: true
# speed-up by disabling period saving to disk
key_cache_save_period: 0
row_cache_save_period: 0
trickle_fsync: false
commitlog_sync_period_in_ms: 100000
"""
SCYLLA_EXTRA_CONFIG_TEMPLATE = """
experimental_features:
- udf
view_hints_directory: {data_dir}/view_hints
prometheus_port: 0 # disable prometheus server
start_rpc: false # disable thrift server
api_port: {api_port}
"""
def free_port():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("127.0.0.1", 0))
port = sock.getsockname()[1]
sock.close()
return port
def wait_for_peer(addr, port):
wait_until = time.time() + 60
while time.time() < wait_until:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((addr, port))
except ConnectionRefusedError:
time.sleep(0.1)
else:
sock.close()
return True
return False
@pytest.fixture(scope="session")
def cassandra_cluster(tmpdir_factory):
cassandra_conf = tmpdir_factory.mktemp("cassandra_conf")
cassandra_data = tmpdir_factory.mktemp("cassandra_data")
cassandra_log = tmpdir_factory.mktemp("cassandra_log")
native_transport_port = free_port()
storage_port = free_port()
jmx_port = free_port()
api_port = free_port()
use_scylla = bool(os.environ.get("SWH_USE_SCYLLADB", ""))
cassandra_bin = os.environ.get(
"SWH_CASSANDRA_BIN", "/usr/bin/scylla" if use_scylla else "/usr/sbin/cassandra"
)
if use_scylla:
os.makedirs(cassandra_conf.join("conf"))
config_path = cassandra_conf.join("conf/scylla.yaml")
config_template = CONFIG_TEMPLATE + SCYLLA_EXTRA_CONFIG_TEMPLATE
else:
config_path = cassandra_conf.join("cassandra.yaml")
config_template = CONFIG_TEMPLATE
with open(str(config_path), "w") as fd:
fd.write(
config_template.format(
data_dir=str(cassandra_data),
storage_port=storage_port,
native_transport_port=native_transport_port,
api_port=api_port,
)
)
if os.environ.get("SWH_CASSANDRA_LOG"):
stdout = stderr = None
else:
stdout = stderr = subprocess.DEVNULL
env = {
"MAX_HEAP_SIZE": "300M",
"HEAP_NEWSIZE": "50M",
"JVM_OPTS": "-Xlog:gc=error:file=%s/gc.log" % cassandra_log,
}
if "JAVA_HOME" in os.environ:
env["JAVA_HOME"] = os.environ["JAVA_HOME"]
if use_scylla:
env = {
**env,
"SCYLLA_HOME": cassandra_conf,
}
# prevent "NOFILE rlimit too low (recommended setting 200000,
# minimum setting 10000; refusing to start."
resource.setrlimit(resource.RLIMIT_NOFILE, (200000, 200000))
proc = subprocess.Popen(
[
cassandra_bin,
"--developer-mode=1",
],
start_new_session=True,
env=env,
stdout=stdout,
stderr=stderr,
)
else:
proc = subprocess.Popen(
[
cassandra_bin,
"-Dcassandra.config=file://%s/cassandra.yaml" % cassandra_conf,
"-Dcassandra.logdir=%s" % cassandra_log,
"-Dcassandra.jmx.local.port=%d" % jmx_port,
"-Dcassandra-foreground=yes",
],
start_new_session=True,
env=env,
stdout=stdout,
stderr=stderr,
)
listening = wait_for_peer("127.0.0.1", native_transport_port)
if listening:
yield (["127.0.0.1"], native_transport_port)
if not listening or os.environ.get("SWH_CASSANDRA_LOG"):
debug_log_path = str(cassandra_log.join("debug.log"))
if os.path.exists(debug_log_path):
with open(debug_log_path) as fd:
print(fd.read())
if not listening:
if proc.poll() is None:
raise Exception("cassandra process unexpectedly not listening.")
else:
raise Exception("cassandra process unexpectedly stopped.")
pgrp = os.getpgid(proc.pid)
os.killpg(pgrp, signal.SIGKILL)
class RequestHandler:
def on_request(self, rf):
if hasattr(rf.message, "query"):
print()
print(rf.message.query)
@pytest.fixture(scope="session")
def keyspace(cassandra_cluster):
(hosts, port) = cassandra_cluster
keyspace = os.urandom(10).hex()
create_keyspace(hosts, keyspace, port)
return keyspace
# tests are executed using imported classes (TestStorage and
# TestStorageGeneratedData) using overloaded swh_storage fixture
# below
@pytest.fixture
def swh_storage_backend_config(cassandra_cluster, keyspace):
(hosts, port) = cassandra_cluster
storage_config = dict(
cls="cassandra",
hosts=hosts,
port=port,
keyspace=keyspace,
journal_writer={"cls": "memory"},
objstorage={"cls": "memory"},
)
yield storage_config
storage = get_storage(**storage_config)
for table in TABLES:
storage._cql_runner._session.execute('TRUNCATE TABLE "%s"' % table)
storage._cql_runner._cluster.shutdown()
@pytest.mark.cassandra
class TestCassandraStorage(_TestStorage):
def test_config_wrong_consistency_should_raise(self):
storage_config = dict(
cls="cassandra",
hosts=["first"],
port=9999,
keyspace="any",
consistency_level="fake",
journal_writer={"cls": "memory"},
objstorage={"cls": "memory"},
)
with pytest.raises(ValueError, match="Unknown consistency"):
get_storage(**storage_config)
def test_config_consistency_used(self, swh_storage_backend_config):
config_with_consistency = dict(
swh_storage_backend_config, **{"consistency_level": "THREE"}
)
storage = get_storage(**config_with_consistency)
with pytest.raises(NoHostAvailable):
storage.content_get_random()
def test_content_add_murmur3_collision(self, swh_storage, mocker, sample_data):
"""The Murmur3 token is used as link from index tables to the main
table; and non-matching contents with colliding murmur3-hash
are filtered-out when reading the main table.
This test checks the content methods do filter out these collision.
"""
called = 0
cont, cont2 = sample_data.contents[:2]
# always return a token
def mock_cgtfsa(algo, hashes):
nonlocal called
called += 1
assert algo in ("sha1", "sha1_git")
return [123456]
mocker.patch.object(
swh_storage._cql_runner,
"content_get_tokens_from_single_algo",
mock_cgtfsa,
)
# For all tokens, always return cont
def mock_cgft(tokens):
nonlocal called
called += 1
return [
ContentRow(
length=10,
ctime=datetime.datetime.now(),
status="present",
**{algo: getattr(cont, algo) for algo in HASH_ALGORITHMS},
)
]
mocker.patch.object(
swh_storage._cql_runner, "content_get_from_tokens", mock_cgft
)
actual_result = swh_storage.content_add([cont2])
assert called == 4
assert actual_result == {
"content:add": 1,
"content:add:bytes": cont2.length,
}
def test_content_get_metadata_murmur3_collision(
self, swh_storage, mocker, sample_data
):
"""The Murmur3 token is used as link from index tables to the main
table; and non-matching contents with colliding murmur3-hash
are filtered-out when reading the main table.
This test checks the content methods do filter out these collisions.
"""
called = 0
cont, cont2 = [attr.evolve(c, ctime=now()) for c in sample_data.contents[:2]]
# always return a token
def mock_cgtfsa(algo, hashes):
nonlocal called
called += 1
assert algo in ("sha1", "sha1_git")
return [123456]
mocker.patch.object(
swh_storage._cql_runner,
"content_get_tokens_from_single_algo",
mock_cgtfsa,
)
# For all tokens, always return cont and cont2
cols = list(set(cont.to_dict()) - {"data"})
def mock_cgft(tokens):
nonlocal called
called += 1
return [
ContentRow(
**{col: getattr(cont, col) for col in cols},
)
for cont in [cont, cont2]
]
mocker.patch.object(
swh_storage._cql_runner, "content_get_from_tokens", mock_cgft
)
actual_result = swh_storage.content_get([cont.sha1])
assert called == 2
# dropping extra column not returned
expected_cont = attr.evolve(cont, data=None)
# but cont2 should be filtered out
assert actual_result == [expected_cont]
def test_content_find_murmur3_collision(self, swh_storage, mocker, sample_data):
"""The Murmur3 token is used as link from index tables to the main
table; and non-matching contents with colliding murmur3-hash
are filtered-out when reading the main table.
This test checks the content methods do filter out these collisions.
"""
called = 0
cont, cont2 = [attr.evolve(c, ctime=now()) for c in sample_data.contents[:2]]
# always return a token
def mock_cgtfsa(algo, hashes):
nonlocal called
called += 1
assert algo in ("sha1", "sha1_git")
return [123456]
mocker.patch.object(
swh_storage._cql_runner,
"content_get_tokens_from_single_algo",
mock_cgtfsa,
)
# For all tokens, always return cont and cont2
cols = list(set(cont.to_dict()) - {"data"})
def mock_cgft(tokens):
nonlocal called
called += 1
return [
ContentRow(**{col: getattr(cont, col) for col in cols})
for cont in [cont, cont2]
]
mocker.patch.object(
swh_storage._cql_runner, "content_get_from_tokens", mock_cgft
)
expected_content = attr.evolve(cont, data=None)
actual_result = swh_storage.content_find({"sha1": cont.sha1})
assert called == 2
# but cont2 should be filtered out
assert actual_result == [expected_content]
def test_content_get_partition_murmur3_collision(
self, swh_storage, mocker, sample_data
):
"""The Murmur3 token is used as link from index tables to the main table; and
non-matching contents with colliding murmur3-hash are filtered-out when reading
the main table.
This test checks the content_get_partition endpoints return all contents, even
the collisions.
"""
called = 0
rows: Dict[int, Dict] = {}
for tok, content in enumerate(sample_data.contents):
cont = attr.evolve(content, data=None, ctime=now())
row_d = {**cont.to_dict(), "tok": tok}
rows[tok] = row_d
# For all tokens, always return cont
def mock_content_get_token_range(range_start, range_end, limit):
nonlocal called
called += 1
for tok in list(rows.keys()) * 3: # yield multiple times the same tok
row_d = dict(rows[tok].items())
row_d.pop("tok")
yield (tok, ContentRow(**row_d))
mocker.patch.object(
swh_storage._cql_runner,
"content_get_token_range",
mock_content_get_token_range,
)
actual_results = list(
stream_results(
swh_storage.content_get_partition, partition_id=0, nb_partitions=1
)
)
assert called > 0
# everything is listed, even collisions
assert len(actual_results) == 3 * len(sample_data.contents)
# as we duplicated the returned results, dropping duplicate should yield
# the original length
assert len(set(actual_results)) == len(sample_data.contents)
@pytest.mark.skip("content_update is not yet implemented for Cassandra")
def test_content_update(self):
pass
def test_extid_murmur3_collision(self, swh_storage, mocker, sample_data):
"""The Murmur3 token is used as link from index table to the main
table; and non-matching extid with colliding murmur3-hash
are filtered-out when reading the main table.
This test checks the extid methods do filter out these collision.
"""
swh_storage.extid_add(sample_data.extids)
# For any token, always return all extids, i.e. make as if all tokens
# for all extid entries collide
def mock_egft(token):
return [
ExtIDRow(
extid_type=extid.extid_type,
extid=extid.extid,
extid_version=extid.extid_version,
target_type=extid.target.object_type.value,
target=extid.target.object_id,
)
for extid in sample_data.extids
]
mocker.patch.object(
swh_storage._cql_runner,
"extid_get_from_token",
mock_egft,
)
for extid in sample_data.extids:
extids = swh_storage.extid_get_from_target(
target_type=extid.target.object_type, ids=[extid.target.object_id]
)
assert extids == [extid]
def _directory_with_entries(self, sample_data, nb_entries):
"""Returns a dir with ``nb_entries``, all pointing to
the same content"""
return Directory(
entries=tuple(
DirectoryEntry(
name=f"file{i:10}".encode(),
type="file",
target=sample_data.content.sha1_git,
perms=from_disk.DentryPerms.directory,
)
for i in range(nb_entries)
)
)
@pytest.mark.parametrize(
"insert_algo,nb_entries",
[
("one-by-one", 10),
("concurrent", 10),
("batch", 1),
("batch", 2),
("batch", BATCH_INSERT_MAX_SIZE - 1),
("batch", BATCH_INSERT_MAX_SIZE),
("batch", BATCH_INSERT_MAX_SIZE + 1),
("batch", BATCH_INSERT_MAX_SIZE * 2),
],
)
def test_directory_add_algos(
self,
swh_storage,
sample_data,
mocker,
insert_algo,
nb_entries,
):
mocker.patch.object(swh_storage, "_directory_entries_insert_algo", insert_algo)
class new_sample_data:
content = sample_data.content
directory = self._directory_with_entries(sample_data, nb_entries)
self.test_directory_add(swh_storage, new_sample_data)
@pytest.mark.parametrize("insert_algo", DIRECTORY_ENTRIES_INSERT_ALGOS)
def test_directory_add_atomic(self, swh_storage, sample_data, mocker, insert_algo):
"""Checks that a crash occurring after some directory entries were written
does not cause the directory to be (partially) visible.
ie. checks directories are added somewhat atomically."""
# Disable the journal writer, it would detect the CrashyEntry exception too
# early for this test to be relevant
swh_storage.journal_writer.journal = None
mocker.patch.object(swh_storage, "_directory_entries_insert_algo", insert_algo)
class CrashyEntry(DirectoryEntry):
def __init__(self):
super().__init__(**{**directory.entries[0].to_dict(), "name": b"crash"})
def to_dict(self):
return {**super().to_dict(), "perms": "abcde"}
directory = self._directory_with_entries(sample_data, BATCH_INSERT_MAX_SIZE)
entries = directory.entries
directory = attr.evolve(directory, entries=entries + (CrashyEntry(),))
with pytest.raises(TypeError):
swh_storage.directory_add([directory])
# This should have written some of the entries to the database:
entry_rows = swh_storage._cql_runner.directory_entry_get([directory.id])
assert {row.name for row in entry_rows} == {entry.name for entry in entries}
# BUT, because not all the entries were written, the directory should
# be considered not written.
assert swh_storage.directory_missing([directory.id]) == [directory.id]
assert list(swh_storage.directory_ls(directory.id)) == []
assert swh_storage.directory_get_entries(directory.id) is None
def test_directory_add_raw_manifest__different_entries__allow_overwrite(
self, swh_storage
):
"""This test demonstrates a shortcoming of the Cassandra storage backend's
design:
1. add a directory with an entry named "name1" and raw_manifest="abc"
2. add a directory with an entry named "name2" and the same raw_manifest
3. the directories' id is computed only from the raw_manifest, so both
directories have the same id, which causes their entries to be
"additive" in the database; so directory_ls returns both entries
However, by default, the Cassandra storage has allow_overwrite=False,
which "accidentally" avoids this issue most of the time, by skipping insertion
if an object with the same id is already in the database.
This can still be an issue when either allow_overwrite=True or when inserting
both directories at about the same time (because of the lack of
transactionality); but the likelihood of two clients inserting two different
objects with the same manifest at the same time is very low, it could only
happen if loaders running in parallel used different (or nondeterministic)
parsers on corrupt objects.
"""
assert (
swh_storage._allow_overwrite is False
), "Unexpected default _allow_overwrite value"
swh_storage._allow_overwrite = True
# Run the other test, but skip its last assertion
dir_id = self.test_directory_add_raw_manifest__different_entries(
swh_storage, check_ls=False
)
assert [entry["name"] for entry in swh_storage.directory_ls(dir_id)] == [
b"name1",
b"name2",
]
def test_snapshot_add_atomic(self, swh_storage, sample_data, mocker):
"""Checks that a crash occurring after some snapshot branches were written
does not cause the snapshot to be (partially) visible.
ie. checks snapshots are added somewhat atomically."""
# Disable the journal writer, it would detect the CrashyBranch exception too
# early for this test to be relevant
swh_storage.journal_writer.journal = None
class MyException(Exception):
pass
class CrashyBranch(SnapshotBranch):
def __getattribute__(self, name):
if name == "target" and should_raise:
raise MyException()
else:
return super().__getattribute__(name)
snapshot = sample_data.complete_snapshot
branches = snapshot.branches
should_raise = False # just so that we can construct the object
crashy_branch = CrashyBranch.from_dict(branches[b"directory"].to_dict())
should_raise = True
snapshot = attr.evolve(
snapshot,
branches={
**branches,
b"crashy": crashy_branch,
},
)
with pytest.raises(MyException):
swh_storage.snapshot_add([snapshot])
# This should have written some of the branches to the database:
branch_rows = swh_storage._cql_runner.snapshot_branch_get(snapshot.id, b"", 10)
assert {row.name for row in branch_rows} == set(branches)
# BUT, because not all the branches were written, the snapshot should
# be considered not written.
assert swh_storage.snapshot_missing([snapshot.id]) == [snapshot.id]
assert swh_storage.snapshot_get(snapshot.id) is None
assert swh_storage.snapshot_count_branches(snapshot.id) is None
assert swh_storage.snapshot_get_branches(snapshot.id) is None
@pytest.mark.skip(
'The "person" table of the pgsql is a legacy thing, and not '
"supported by the cassandra backend."
)
def test_person_fullname_unicity(self):
pass
@pytest.mark.skip(
'The "person" table of the pgsql is a legacy thing, and not '
"supported by the cassandra backend."
)
def test_person_get(self):
pass
@pytest.mark.skip("Not supported by Cassandra")
def test_origin_count(self):
pass
@pytest.mark.cassandra
class TestCassandraStorageGeneratedData(_TestStorageGeneratedData):
@pytest.mark.skip("Not supported by Cassandra")
def test_origin_count(self):
pass
@pytest.mark.skip("Not supported by Cassandra")
def test_origin_count_with_visit_no_visits(self):
pass
@pytest.mark.skip("Not supported by Cassandra")
def test_origin_count_with_visit_with_visits_and_snapshot(self):
pass
@pytest.mark.skip("Not supported by Cassandra")
def test_origin_count_with_visit_with_visits_no_snapshot(self):
pass
@pytest.mark.parametrize(
"allow_overwrite,object_type",
itertools.product(
[False, True],
# Note the absence of "content", it's tested above.
["directory", "revision", "release", "snapshot", "origin", "extid"],
),
)
def test_allow_overwrite(
allow_overwrite: bool, object_type: str, swh_storage_backend_config
):
if object_type in ("origin", "extid"):
pytest.skip(
f"test_disallow_overwrite not implemented for {object_type} objects, "
f"because all their columns are in the primary key."
)
swh_storage = get_storage(
allow_overwrite=allow_overwrite, **swh_storage_backend_config
)
# directory_ls joins with content and directory table, and needs those to return
# non-None entries:
if object_type == "directory":
swh_storage.directory_add([StorageData.directory5])
swh_storage.content_add([StorageData.content, StorageData.content2])
obj1: Any
obj2: Any
# Get two test objects
if object_type == "directory":
(obj1, obj2, *_) = StorageData.directories
elif object_type == "snapshot":
# StorageData.snapshots[1] is the empty snapshot, which is the corner case
# that makes this test succeed for the wrong reasons
obj1 = StorageData.snapshot
obj2 = StorageData.complete_snapshot
else:
(obj1, obj2, *_) = getattr(StorageData, (object_type + "s"))
# Let's make both objects have the same hash, but different content
obj1 = attr.evolve(obj1, id=obj2.id)
# Get the methods used to add and get these objects
add = getattr(swh_storage, object_type + "_add")
if object_type == "directory":
def get(ids):
return [
Directory(
id=ids[0],
entries=tuple(
map(
lambda entry: DirectoryEntry(
name=entry["name"],
type=entry["type"],
target=entry["sha1_git"],
perms=entry["perms"],
),
swh_storage.directory_ls(ids[0]),
)
),
)
]
elif object_type == "snapshot":
def get(ids):
return [
Snapshot.from_dict(
remove_keys(swh_storage.snapshot_get(ids[0]), ("next_branch",))
)
]
else:
get = getattr(swh_storage, object_type + "_get")
# Add the first object
add([obj1])
# It should be returned as-is
assert get([obj1.id]) == [obj1]
# Add the second object
add([obj2])
if allow_overwrite:
# obj1 was overwritten by obj2
expected = obj2
else:
# obj2 was not written, because obj1 already exists and has the same hash
expected = obj1
if allow_overwrite and object_type in ("directory", "snapshot"):
# TODO
pytest.xfail(
"directory entries and snapshot branches are concatenated "
"instead of being replaced"
)
assert get([obj1.id]) == [expected]

File Metadata

Mime Type
text/x-python
Expires
Jul 4 2025, 9:50 AM (5 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3446159

Event Timeline