Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_cassandra.py
# Copyright (C) 2018-2021 The Software Heritage developers | # Copyright (C) 2018-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import itertools | import itertools | ||||
import os | import os | ||||
import resource | |||||
import signal | import signal | ||||
import socket | import socket | ||||
import subprocess | import subprocess | ||||
import time | import time | ||||
from typing import Any, Dict | from typing import Any, Dict | ||||
import attr | import attr | ||||
import pytest | import pytest | ||||
Show All 36 Lines | |||||
# speed-up by disabling period saving to disk | # speed-up by disabling period saving to disk | ||||
key_cache_save_period: 0 | key_cache_save_period: 0 | ||||
row_cache_save_period: 0 | row_cache_save_period: 0 | ||||
trickle_fsync: false | trickle_fsync: false | ||||
commitlog_sync_period_in_ms: 100000 | commitlog_sync_period_in_ms: 100000 | ||||
""" | """ | ||||
SCYLLA_EXTRA_CONFIG_TEMPLATE = """ | |||||
experimental_features: | |||||
- udf | |||||
view_hints_directory: {data_dir}/view_hints | |||||
prometheus_port: 0 # disable prometheus server | |||||
start_rpc: false # disable thrift server | |||||
api_port: {api_port} | |||||
""" | |||||
def free_port(): | def free_port(): | ||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||||
sock.bind(("127.0.0.1", 0)) | sock.bind(("127.0.0.1", 0)) | ||||
port = sock.getsockname()[1] | port = sock.getsockname()[1] | ||||
sock.close() | sock.close() | ||||
return port | return port | ||||
def wait_for_peer(addr, port): | def wait_for_peer(addr, port): | ||||
wait_until = time.time() + 20 | wait_until = time.time() + 60 | ||||
while time.time() < wait_until: | while time.time() < wait_until: | ||||
try: | try: | ||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||||
sock.connect((addr, port)) | sock.connect((addr, port)) | ||||
except ConnectionRefusedError: | except ConnectionRefusedError: | ||||
time.sleep(0.1) | time.sleep(0.1) | ||||
else: | else: | ||||
sock.close() | sock.close() | ||||
return True | return True | ||||
return False | return False | ||||
@pytest.fixture(scope="session") | @pytest.fixture(scope="session") | ||||
def cassandra_cluster(tmpdir_factory): | def cassandra_cluster(tmpdir_factory): | ||||
cassandra_conf = tmpdir_factory.mktemp("cassandra_conf") | cassandra_conf = tmpdir_factory.mktemp("cassandra_conf") | ||||
cassandra_data = tmpdir_factory.mktemp("cassandra_data") | cassandra_data = tmpdir_factory.mktemp("cassandra_data") | ||||
cassandra_log = tmpdir_factory.mktemp("cassandra_log") | cassandra_log = tmpdir_factory.mktemp("cassandra_log") | ||||
native_transport_port = free_port() | native_transport_port = free_port() | ||||
storage_port = free_port() | storage_port = free_port() | ||||
jmx_port = free_port() | jmx_port = free_port() | ||||
api_port = free_port() | |||||
cassandra_bin = os.environ.get("SWH_CASSANDRA_BIN", "/usr/sbin/cassandra") | |||||
scylla = "scylla" in cassandra_bin.lower() | |||||
with open(str(cassandra_conf.join("cassandra.yaml")), "w") as fd: | if scylla: | ||||
os.makedirs(cassandra_conf.join("conf")) | |||||
config_path = cassandra_conf.join("conf/scylla.yaml") | |||||
config_template = CONFIG_TEMPLATE + SCYLLA_EXTRA_CONFIG_TEMPLATE | |||||
else: | |||||
config_path = cassandra_conf.join("cassandra.yaml") | |||||
config_template = CONFIG_TEMPLATE | |||||
with open(str(config_path), "w") as fd: | |||||
fd.write( | fd.write( | ||||
CONFIG_TEMPLATE.format( | config_template.format( | ||||
data_dir=str(cassandra_data), | data_dir=str(cassandra_data), | ||||
storage_port=storage_port, | storage_port=storage_port, | ||||
native_transport_port=native_transport_port, | native_transport_port=native_transport_port, | ||||
api_port=api_port, | |||||
) | ) | ||||
) | ) | ||||
if os.environ.get("SWH_CASSANDRA_LOG"): | if os.environ.get("SWH_CASSANDRA_LOG"): | ||||
stdout = stderr = None | stdout = stderr = None | ||||
else: | else: | ||||
stdout = stderr = subprocess.DEVNULL | stdout = stderr = subprocess.DEVNULL | ||||
cassandra_bin = os.environ.get("SWH_CASSANDRA_BIN", "/usr/sbin/cassandra") | |||||
env = { | env = { | ||||
"MAX_HEAP_SIZE": "300M", | "MAX_HEAP_SIZE": "300M", | ||||
"HEAP_NEWSIZE": "50M", | "HEAP_NEWSIZE": "50M", | ||||
"JVM_OPTS": "-Xlog:gc=error:file=%s/gc.log" % cassandra_log, | "JVM_OPTS": "-Xlog:gc=error:file=%s/gc.log" % cassandra_log, | ||||
} | } | ||||
if "JAVA_HOME" in os.environ: | if "JAVA_HOME" in os.environ: | ||||
env["JAVA_HOME"] = os.environ["JAVA_HOME"] | env["JAVA_HOME"] = os.environ["JAVA_HOME"] | ||||
if scylla: | |||||
env = { | |||||
**env, | |||||
"SCYLLA_HOME": cassandra_conf, | |||||
} | |||||
# prevent "NOFILE rlimit too low (recommended setting 200000, | |||||
# minimum setting 10000; refusing to start." | |||||
resource.setrlimit(resource.RLIMIT_NOFILE, (200000, 200000)) | |||||
proc = subprocess.Popen( | |||||
[cassandra_bin, "--developer-mode=1",], | |||||
start_new_session=True, | |||||
env=env, | |||||
stdout=stdout, | |||||
stderr=stderr, | |||||
) | |||||
else: | |||||
proc = subprocess.Popen( | proc = subprocess.Popen( | ||||
[ | [ | ||||
cassandra_bin, | cassandra_bin, | ||||
"-Dcassandra.config=file://%s/cassandra.yaml" % cassandra_conf, | "-Dcassandra.config=file://%s/cassandra.yaml" % cassandra_conf, | ||||
"-Dcassandra.logdir=%s" % cassandra_log, | "-Dcassandra.logdir=%s" % cassandra_log, | ||||
"-Dcassandra.jmx.local.port=%d" % jmx_port, | "-Dcassandra.jmx.local.port=%d" % jmx_port, | ||||
"-Dcassandra-foreground=yes", | "-Dcassandra-foreground=yes", | ||||
], | ], | ||||
start_new_session=True, | start_new_session=True, | ||||
env=env, | env=env, | ||||
stdout=stdout, | stdout=stdout, | ||||
stderr=stderr, | stderr=stderr, | ||||
) | ) | ||||
listening = wait_for_peer("127.0.0.1", native_transport_port) | listening = wait_for_peer("127.0.0.1", native_transport_port) | ||||
if listening: | if listening: | ||||
yield (["127.0.0.1"], native_transport_port) | yield (["127.0.0.1"], native_transport_port) | ||||
if not listening or os.environ.get("SWH_CASSANDRA_LOG"): | if not listening or os.environ.get("SWH_CASSANDRA_LOG"): | ||||
debug_log_path = str(cassandra_log.join("debug.log")) | debug_log_path = str(cassandra_log.join("debug.log")) | ||||
▲ Show 20 Lines • Show All 500 Lines • Show Last 20 Lines |