diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -6,6 +6,8 @@ import os import pytest import logging +import random +import string from kafka import KafkaProducer, KafkaConsumer from subprocess import Popen @@ -132,17 +134,6 @@ } -TEST_CONFIG = { - 'temporary_prefix': 'swh.tmp_journal.new', - 'final_prefix': 'swh.journal.objects', - 'consumer_id': 'swh.journal.publisher', - 'publisher_id': 'swh.journal.publisher', - 'object_types': OBJECT_TYPE_KEYS.keys(), - 'max_messages': 1, # will read 1 message and stops - 'storage': {'cls': 'memory', 'args': {}} -} - - class JournalPublisherTest(JournalPublisher): """A journal publisher which override the default configuration parsing setup. @@ -177,22 +168,45 @@ # Those defines fixtures -zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN) +zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN, scope='session') os.environ['KAFKA_LOG4J_OPTS'] = \ '-Dlog4j.configuration=file:%s/log4j.properties' % \ os.path.dirname(__file__) -kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc') +kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc', scope='session') logger = logging.getLogger('kafka') logger.setLevel(logging.WARN) +@pytest.fixture(scope='function') +def kafka_prefix(): + return ''.join(random.choice(string.ascii_lowercase) for _ in range(10)) + + +TEST_CONFIG = { + 'temporary_prefix': 'swh.tmp_journal.new', + 'final_prefix': 'swh.journal.objects', + 'consumer_id': 'swh.journal.publisher', + 'publisher_id': 'swh.journal.publisher', + 'object_types': OBJECT_TYPE_KEYS.keys(), + 'max_messages': 1, # will read 1 message and stops + 'storage': {'cls': 'memory', 'args': {}}, +} + + @pytest.fixture -def test_config(): +def test_config(kafka_server: Tuple[Popen, int], + kafka_prefix: str): """Test configuration needed for publisher/producer/consumer """ - return TEST_CONFIG + _, port = kafka_server + return { + **TEST_CONFIG, + 'brokers': ['localhost:{}'.format(port)], + 'temporary_prefix': kafka_prefix + '.swh.tmp_journal.new', + 'final_prefix': kafka_prefix + '.swh.journal.objects', + } @pytest.fixture @@ -219,8 +233,9 @@ """Get a connected Kafka consumer. """ - kafka_topics = ['%s.%s' % (test_config['final_prefix'], object_type) - for object_type in test_config['object_types']] + kafka_topics = [ + '%s.%s' % (test_config['final_prefix'], object_type) + for object_type in test_config['object_types']] _, kafka_port = kafka_server consumer = KafkaConsumer( *kafka_topics, @@ -251,7 +266,5 @@ """ # consumer and producer of the publisher needs to discuss with the # right instance - _, port = kafka_server - test_config['brokers'] = ['localhost:{}'.format(port)] publisher = JournalPublisherTest(test_config) return publisher diff --git a/swh/journal/tests/test_direct_writer.py b/swh/journal/tests/test_direct_writer.py --- a/swh/journal/tests/test_direct_writer.py +++ b/swh/journal/tests/test_direct_writer.py @@ -18,7 +18,7 @@ from .conftest import OBJECT_TYPE_KEYS -def assert_written(consumer): +def assert_written(consumer, kafka_prefix): time.sleep(0.1) # Without this, some messages are missing consumed_objects = defaultdict(list) @@ -26,7 +26,7 @@ consumed_objects[msg.topic].append((msg.key, msg.value)) for (object_type, (key_name, objects)) in OBJECT_TYPE_KEYS.items(): - topic = 'swh.journal.objects.%s' % object_type + topic = kafka_prefix + '.' + object_type (keys, values) = zip(*consumed_objects[topic]) if key_name: assert list(keys) == [object_[key_name] for object_ in objects] @@ -42,13 +42,15 @@ def test_direct_writer( + kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer_from_publisher: KafkaConsumer): + kafka_prefix += '.swh.journal.objects' config = { 'brokers': 'localhost:%d' % kafka_server[1], 'client_id': 'direct_writer', - 'prefix': 'swh.journal.objects', + 'prefix': kafka_prefix, } writer = DirectKafkaWriter(**config) @@ -59,17 +61,19 @@ object_ = {**object_, 'visit': num} writer.write_addition(object_type, object_) - assert_written(consumer_from_publisher) + assert_written(consumer_from_publisher, kafka_prefix) def test_storage_direct_writer( + kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer_from_publisher: KafkaConsumer): + kafka_prefix += '.swh.journal.objects' config = { 'brokers': 'localhost:%d' % kafka_server[1], 'client_id': 'direct_writer', - 'prefix': 'swh.journal.objects', + 'prefix': kafka_prefix, } storage = get_storage('memory', {'journal_writer': { @@ -92,4 +96,4 @@ else: assert False, object_type - assert_written(consumer_from_publisher) + assert_written(consumer_from_publisher, kafka_prefix) diff --git a/swh/journal/tests/test_publisher_kafka.py b/swh/journal/tests/test_publisher_kafka.py --- a/swh/journal/tests/test_publisher_kafka.py +++ b/swh/journal/tests/test_publisher_kafka.py @@ -11,12 +11,13 @@ from swh.journal.serializers import value_to_kafka, kafka_to_value from swh.journal.publisher import JournalPublisher -from .conftest import TEST_CONFIG, OBJECT_TYPE_KEYS +from .conftest import OBJECT_TYPE_KEYS def assert_publish_ok(publisher: JournalPublisher, consumer_from_publisher: KafkaConsumer, producer_to_publisher: KafkaProducer, + test_config: dict, object_type: str): """Assert that publishing object in the publisher is reified and published in output topics. @@ -43,7 +44,7 @@ # send message to the publisher for obj in objects: producer_to_publisher.send( - '%s.%s' % (TEST_CONFIG['temporary_prefix'], object_type), + '%s.%s' % (test_config['temporary_prefix'], object_type), obj ) @@ -53,7 +54,7 @@ publisher.poll(max_messages=1) # then (client reads from the messages from output topic) - expected_topic = '%s.%s' % (TEST_CONFIG['final_prefix'], object_type) + expected_topic = '%s.%s' % (test_config['final_prefix'], object_type) expected_msgs = [ ( object_[object_key_id], @@ -69,6 +70,7 @@ def test_publish( publisher: JournalPublisher, kafka_server: Tuple[Popen, int], + test_config: dict, consumer_from_publisher: KafkaConsumer, producer_to_publisher: KafkaProducer): """ @@ -88,4 +90,4 @@ for object_type in object_types: assert_publish_ok( publisher, consumer_from_publisher, producer_to_publisher, - object_type) + test_config, object_type) diff --git a/swh/journal/tests/test_publisher_no_kafka.py b/swh/journal/tests/test_publisher_no_kafka.py --- a/swh/journal/tests/test_publisher_no_kafka.py +++ b/swh/journal/tests/test_publisher_no_kafka.py @@ -128,20 +128,20 @@ pass -def test_check_config_ok(): +def test_check_config_ok(test_config): """Instantiate a publisher with the right config is fine """ - publisher = JournalPublisherCheckTest(TEST_CONFIG) + publisher = JournalPublisherCheckTest(test_config) assert publisher is not None -def test_check_config_ko(): +def test_check_config_ko(test_config): """Instantiate a publisher with the wrong config should raise """ for k in MANDATORY_KEYS: - conf = TEST_CONFIG.copy() + conf = test_config.copy() conf.pop(k) with pytest.raises(ValueError) as e: JournalPublisherCheckTest(conf) diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -19,8 +19,10 @@ def test_storage_play( + kafka_prefix: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server + kafka_prefix += '.swh.journal.objects' storage = get_storage('memory', {}) @@ -34,7 +36,7 @@ # Fill Kafka nb_sent = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): - topic = 'swh.journal.objects.' + object_type + topic = kafka_prefix + '.' + object_type for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) producer.send(topic, key=key, value=object_) @@ -44,7 +46,7 @@ config = { 'brokers': 'localhost:%d' % kafka_server[1], 'consumer_id': 'replayer', - 'prefix': 'swh.journal.objects', + 'prefix': kafka_prefix, } replayer = StorageReplayer(**config) nb_inserted = replayer.fill(storage, max_messages=nb_sent)