Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/conftest.py
Show First 20 Lines • Show All 189 Lines • ▼ Show 20 Lines | TEST_CONFIG = { | ||||
'publisher_id': 'swh.journal.publisher', | 'publisher_id': 'swh.journal.publisher', | ||||
'object_types': OBJECT_TYPE_KEYS.keys(), | 'object_types': OBJECT_TYPE_KEYS.keys(), | ||||
'max_messages': 1, # will read 1 message and stops | 'max_messages': 1, # will read 1 message and stops | ||||
'storage': {'cls': 'memory', 'args': {}}, | 'storage': {'cls': 'memory', 'args': {}}, | ||||
} | } | ||||
@pytest.fixture | @pytest.fixture | ||||
def test_config(kafka_server: Tuple[Popen, int], | def test_publisher_config(kafka_server: Tuple[Popen, int], | ||||
kafka_prefix: str): | kafka_prefix: str): | ||||
"""Test configuration needed for publisher/producer/consumer | """Test configuration needed for publisher/producer/consumer | ||||
""" | """ | ||||
_, port = kafka_server | _, port = kafka_server | ||||
return { | return { | ||||
**TEST_CONFIG, | **TEST_CONFIG, | ||||
'brokers': ['localhost:{}'.format(port)], | 'brokers': ['localhost:{}'.format(port)], | ||||
'temporary_prefix': kafka_prefix + '.swh.tmp_journal.new', | 'temporary_prefix': kafka_prefix + '.swh.tmp_journal.new', | ||||
'final_prefix': kafka_prefix + '.swh.journal.objects', | 'final_prefix': kafka_prefix + '.swh.journal.objects', | ||||
} | } | ||||
@pytest.fixture | @pytest.fixture | ||||
def producer_to_publisher( | def producer_to_publisher( | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
test_config: Dict, | test_publisher_config: Dict, | ||||
) -> KafkaProducer: # noqa | ) -> KafkaProducer: # noqa | ||||
"""Producer to send message to the publisher's consumer. | """Producer to send message to the publisher's consumer. | ||||
""" | """ | ||||
_, port = kafka_server | _, port = kafka_server | ||||
producer = KafkaProducer( | producer = KafkaProducer( | ||||
bootstrap_servers='localhost:{}'.format(port), | bootstrap_servers='localhost:{}'.format(port), | ||||
key_serializer=key_to_kafka, | key_serializer=key_to_kafka, | ||||
value_serializer=key_to_kafka, | value_serializer=key_to_kafka, | ||||
client_id=test_config['consumer_id'], | client_id=test_publisher_config['consumer_id'], | ||||
) | ) | ||||
return producer | return producer | ||||
@pytest.fixture | @pytest.fixture | ||||
def consumer_from_publisher(kafka_server: Tuple[Popen, int], | def consumer_from_publisher(kafka_server: Tuple[Popen, int], | ||||
test_config: Dict) -> KafkaConsumer: | test_publisher_config: Dict) -> KafkaConsumer: | ||||
"""Get a connected Kafka consumer. | """Get a connected Kafka consumer. | ||||
""" | """ | ||||
kafka_topics = [ | kafka_topics = ['%s.%s' % (test_publisher_config['final_prefix'], | ||||
'%s.%s' % (test_config['final_prefix'], object_type) | object_type) | ||||
for object_type in test_config['object_types']] | for object_type in test_publisher_config['object_types']] | ||||
_, kafka_port = kafka_server | _, kafka_port = kafka_server | ||||
consumer = KafkaConsumer( | consumer = KafkaConsumer( | ||||
*kafka_topics, | *kafka_topics, | ||||
bootstrap_servers='localhost:{}'.format(kafka_port), | bootstrap_servers='localhost:{}'.format(kafka_port), | ||||
consumer_timeout_ms=constants.DEFAULT_CONSUMER_TIMEOUT_MS, | consumer_timeout_ms=constants.DEFAULT_CONSUMER_TIMEOUT_MS, | ||||
key_deserializer=kafka_to_key, | key_deserializer=kafka_to_key, | ||||
value_deserializer=kafka_to_value, | value_deserializer=kafka_to_value, | ||||
auto_offset_reset='earliest', | auto_offset_reset='earliest', | ||||
enable_auto_commit=True, | enable_auto_commit=True, | ||||
group_id="test-consumer" | group_id="test-consumer" | ||||
) | ) | ||||
# Enforce auto_offset_reset=earliest even if the consumer was created | # Enforce auto_offset_reset=earliest even if the consumer was created | ||||
# too soon wrt the server. | # too soon wrt the server. | ||||
while len(consumer.assignment()) == 0: | while len(consumer.assignment()) == 0: | ||||
consumer.poll(timeout_ms=20) | consumer.poll(timeout_ms=20) | ||||
consumer.seek_to_beginning() | consumer.seek_to_beginning() | ||||
return consumer | return consumer | ||||
@pytest.fixture | @pytest.fixture | ||||
def publisher(kafka_server: Tuple[Popen, int], | def publisher(kafka_server: Tuple[Popen, int], | ||||
test_config: Dict) -> JournalPublisher: | test_publisher_config: Dict) -> JournalPublisher: | ||||
"""Test Publisher factory. We cannot use a fixture here as we need to | """Test Publisher factory. We cannot use a fixture here as we need to | ||||
modify the sample. | modify the sample. | ||||
""" | """ | ||||
# consumer and producer of the publisher needs to discuss with the | # consumer and producer of the publisher needs to discuss with the | ||||
# right instance | # right instance | ||||
publisher = JournalPublisherTest(test_config) | _, port = kafka_server | ||||
test_publisher_config['brokers'] = ['localhost:{}'.format(port)] | |||||
publisher = JournalPublisherTest(test_publisher_config) | |||||
return publisher | return publisher |