diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -184,7 +184,6 @@ @pytest.fixture def producer_to_publisher( - request: 'SubRequest', # noqa F821 kafka_server: Tuple[Popen, int], test_config: Dict, ) -> KafkaProducer: # noqa @@ -202,35 +201,23 @@ @pytest.fixture -def kafka_consumer( - request: 'SubRequest', # noqa F821 - test_config: Dict, -) -> KafkaConsumer: +def consumer_from_publisher(kafka_server: Tuple[Popen, int], + test_config: Dict) -> KafkaConsumer: """Get a connected Kafka consumer. """ kafka_topics = ['%s.%s' % (test_config['final_prefix'], object_type) for object_type in test_config['object_types']] - test_config['topics'] = kafka_topics - - consumer_kwargs = dict( + _, kafka_port = kafka_server + consumer = KafkaConsumer( + *kafka_topics, + bootstrap_servers='localhost:{}'.format(kafka_port), + consumer_timeout_ms=constants.DEFAULT_CONSUMER_TIMEOUT_MS, key_deserializer=kafka_to_key, value_deserializer=kafka_to_value, auto_offset_reset='earliest', enable_auto_commit=True, - group_id="test-consumer") - - _, kafka_port = request.getfixturevalue('kafka_server') - - used_consumer_kwargs = consumer_kwargs.copy() - used_consumer_kwargs.setdefault('consumer_timeout_ms', - constants.DEFAULT_CONSUMER_TIMEOUT_MS) - used_consumer_kwargs.setdefault( - 'bootstrap_servers', 'localhost:{}'.format(kafka_port)) - - consumer = KafkaConsumer( - *kafka_topics, - **used_consumer_kwargs, + group_id="test-consumer" ) # Enforce auto_offset_reset=earliest even if the consumer was created diff --git a/swh/journal/tests/test_publisher_kafka.py b/swh/journal/tests/test_publisher_kafka.py --- a/swh/journal/tests/test_publisher_kafka.py +++ b/swh/journal/tests/test_publisher_kafka.py @@ -64,15 +64,15 @@ def test_publish( publisher: JournalPublisher, kafka_server: Tuple[Popen, int], - kafka_consumer: KafkaConsumer, + consumer_from_publisher: KafkaConsumer, producer_to_publisher: KafkaProducer): """ Reading from and writing to the journal publisher should work (contents) Args: journal_publisher (JournalPublisher): publisher to read and write data - kafka_consumer (KafkaConsumer): To read data from the publisher - producer_to_publisher (KafkaProducer): To send data to the publisher + consumer_from_publisher (KafkaConsumer): To read data from publisher + producer_to_publisher (KafkaProducer): To send data to publisher """ # retrieve the object types we want to test @@ -82,4 +82,5 @@ # output topics for object_type in object_types: assert_publish_ok( - publisher, kafka_consumer, producer_to_publisher, object_type) + publisher, consumer_from_publisher, producer_to_publisher, + object_type)