diff --git a/swh/storage/tests/test_cassandra.py b/swh/storage/tests/test_cassandra.py --- a/swh/storage/tests/test_cassandra.py +++ b/swh/storage/tests/test_cassandra.py @@ -4,9 +4,11 @@ # See top-level LICENSE file for more information import os +import queue import signal import socket import subprocess +import multiprocessing import time import pytest @@ -41,6 +43,12 @@ listen_address: 127.0.0.1 enable_user_defined_functions: true + +# speed-up by disabling period saving to disk +key_cache_save_period: 0 +row_cache_save_period: 0 +trickle_fsync: false +commitlog_sync_period_in_ms: 100000 ''' @@ -127,16 +135,63 @@ print(rf.message.query) +@pytest.fixture(scope='session') +def keyspace_queue(cassandra_cluster): + """Yields a `multiprocessing.Queue` object containing keyspace names. + These are names of keyspaces created and fully initialized with the + schema. + + They are created concurrently to running tests, in order to avoid + waiting on keyspace initialization (which takes about twice the actual + runtime of a test), in worker processes (not threads, because + cassandra-driver isn't thread-safe). + """ + (hosts, port) = cassandra_cluster + + # Prepare at most 5 keyspaces in advance + keyspaces = multiprocessing.Queue(5) + stop_workers = multiprocessing.Event() + + def worker(): + while not stop_workers.is_set(): + keyspace = os.urandom(10).hex() + create_keyspace(hosts, keyspace, port) + while not stop_workers.is_set(): + try: + keyspaces.put(keyspace, timeout=1) + except queue.Full: + # timeout exceeded; check stop_workers and try again + pass + else: + # successfully added to cassandra + break + + threads = [] + # It takes about twice as much time to create and initialize + # a keyspace than to run the average test; plus an extra worker + # just in case, because it doesn't hurt. + for _ in range(3): + threads.append(multiprocessing.Process(target=worker)) + threads[-1].start() + + yield keyspaces + + # Signal all workers to stop + stop_workers.set() + + for thread in threads: + thread.join() + + # tests are executed using imported classes (TestStorage and # TestStorageGeneratedData) using overloaded swh_storage fixture # below @pytest.fixture -def swh_storage(cassandra_cluster): +def swh_storage(cassandra_cluster, keyspace_queue): (hosts, port) = cassandra_cluster - keyspace = os.urandom(10).hex() - create_keyspace(hosts, keyspace, port) + keyspace = keyspace_queue.get(timeout=10) storage = get_storage( 'cassandra', diff --git a/tox.ini b/tox.ini --- a/tox.ini +++ b/tox.ini @@ -7,8 +7,9 @@ deps = pytest-cov dev: ipdb -passenv = - LOG_CASSANDRA +#passenv = +# LOG_CASSANDRA +setenv = LOG_CASSANDRA = 1 commands = pytest \ !slow: --hypothesis-profile=fast \