diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -6,6 +6,8 @@ import os import pytest import logging +import random +import string from kafka import KafkaProducer, KafkaConsumer from subprocess import Popen @@ -177,16 +179,21 @@ # Those defines fixtures -zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN) +zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN, scope='session') os.environ['KAFKA_LOG4J_OPTS'] = \ '-Dlog4j.configuration=file:%s/log4j.properties' % \ os.path.dirname(__file__) -kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc') +kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc', scope='session') logger = logging.getLogger('kafka') logger.setLevel(logging.WARN) +@pytest.fixture(scope='function') +def kafka_prefix(): + return ''.join(random.choice(string.ascii_lowercase) for _ in range(10)) + + @pytest.fixture def test_config(): """Test configuration needed for publisher/producer/consumer @@ -215,12 +222,14 @@ @pytest.fixture def consumer_from_publisher(kafka_server: Tuple[Popen, int], + kafka_prefix: str, test_config: Dict) -> KafkaConsumer: """Get a connected Kafka consumer. """ - kafka_topics = ['%s.%s' % (test_config['final_prefix'], object_type) - for object_type in test_config['object_types']] + kafka_topics = [ + '%s.%s.%s' % (kafka_prefix, test_config['final_prefix'], object_type) + for object_type in test_config['object_types']] _, kafka_port = kafka_server consumer = KafkaConsumer( *kafka_topics, diff --git a/swh/journal/tests/test_direct_writer.py b/swh/journal/tests/test_direct_writer.py --- a/swh/journal/tests/test_direct_writer.py +++ b/swh/journal/tests/test_direct_writer.py @@ -18,7 +18,7 @@ from .conftest import OBJECT_TYPE_KEYS -def assert_written(consumer): +def assert_written(consumer, kafka_prefix): time.sleep(0.1) # Without this, some messages are missing consumed_objects = defaultdict(list) @@ -26,7 +26,7 @@ consumed_objects[msg.topic].append((msg.key, msg.value)) for (object_type, (key_name, objects)) in OBJECT_TYPE_KEYS.items(): - topic = 'swh.journal.objects.%s' % object_type + topic = kafka_prefix + '.' + object_type (keys, values) = zip(*consumed_objects[topic]) if key_name: assert list(keys) == [object_[key_name] for object_ in objects] @@ -42,13 +42,15 @@ def test_direct_writer( + kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer_from_publisher: KafkaConsumer): + kafka_prefix += '.swh.journal.objects' config = { 'brokers': 'localhost:%d' % kafka_server[1], 'client_id': 'direct_writer', - 'prefix': 'swh.journal.objects', + 'prefix': kafka_prefix, } writer = DirectKafkaWriter(**config) @@ -59,17 +61,19 @@ object_ = {**object_, 'visit': num} writer.write_addition(object_type, object_) - assert_written(consumer_from_publisher) + assert_written(consumer_from_publisher, kafka_prefix) def test_storage_direct_writer( + kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer_from_publisher: KafkaConsumer): + kafka_prefix += '.swh.journal.objects' config = { 'brokers': 'localhost:%d' % kafka_server[1], 'client_id': 'direct_writer', - 'prefix': 'swh.journal.objects', + 'prefix': kafka_prefix, } storage = get_storage('memory', {'journal_writer': { @@ -92,4 +96,4 @@ else: assert False, object_type - assert_written(consumer_from_publisher) + assert_written(consumer_from_publisher, kafka_prefix) diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -19,8 +19,10 @@ def test_storage_play( + kafka_prefix: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server + kafka_prefix += '.swh.journal.objects' storage = get_storage('memory', {}) @@ -34,7 +36,7 @@ # Fill Kafka nb_sent = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): - topic = 'swh.journal.objects.' + object_type + topic = kafka_prefix + '.' + object_type for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) producer.send(topic, key=key, value=object_) @@ -44,7 +46,7 @@ config = { 'brokers': 'localhost:%d' % kafka_server[1], 'consumer_id': 'replayer', - 'prefix': 'swh.journal.objects', + 'prefix': kafka_prefix, } replayer = StorageReplayer(**config) nb_inserted = replayer.fill(storage, max_messages=nb_sent)