Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/conftest.py
Show First 20 Lines • Show All 157 Lines • ▼ Show 20 Lines | |||||
@pytest.fixture(scope='function') | @pytest.fixture(scope='function') | ||||
def kafka_prefix(): | def kafka_prefix(): | ||||
return ''.join(random.choice(string.ascii_lowercase) for _ in range(10)) | return ''.join(random.choice(string.ascii_lowercase) for _ in range(10)) | ||||
TEST_CONFIG = { | TEST_CONFIG = { | ||||
'temporary_prefix': 'swh.tmp_journal.new', | |||||
'final_prefix': 'swh.journal.objects', | |||||
'consumer_id': 'swh.journal.consumer', | 'consumer_id': 'swh.journal.consumer', | ||||
'object_types': OBJECT_TYPE_KEYS.keys(), | 'object_types': OBJECT_TYPE_KEYS.keys(), | ||||
'max_messages': 1, # will read 1 message and stops | 'max_messages': 1, # will read 1 message and stops | ||||
'storage': {'cls': 'memory', 'args': {}}, | 'storage': {'cls': 'memory', 'args': {}}, | ||||
} | } | ||||
@pytest.fixture | @pytest.fixture | ||||
def test_config(kafka_server: Tuple[Popen, int], | def test_config(kafka_server: Tuple[Popen, int], | ||||
kafka_prefix: str): | kafka_prefix: str): | ||||
"""Test configuration needed for producer/consumer | """Test configuration needed for producer/consumer | ||||
""" | """ | ||||
_, port = kafka_server | _, port = kafka_server | ||||
return { | return { | ||||
**TEST_CONFIG, | **TEST_CONFIG, | ||||
'brokers': ['localhost:{}'.format(port)], | 'brokers': ['localhost:{}'.format(port)], | ||||
'temporary_prefix': kafka_prefix + '.swh.tmp_journal.new', | 'prefix': kafka_prefix + '.swh.journal.objects', | ||||
'final_prefix': kafka_prefix + '.swh.journal.objects', | |||||
} | } | ||||
@pytest.fixture | @pytest.fixture | ||||
def consumer( | def consumer( | ||||
kafka_server: Tuple[Popen, int], test_config: Dict) -> KafkaConsumer: | kafka_server: Tuple[Popen, int], test_config: Dict) -> KafkaConsumer: | ||||
"""Get a connected Kafka consumer. | """Get a connected Kafka consumer. | ||||
""" | """ | ||||
kafka_topics = [ | kafka_topics = [ | ||||
'%s.%s' % (test_config['final_prefix'], object_type) | '%s.%s' % (test_config['prefix'], object_type) | ||||
for object_type in test_config['object_types']] | for object_type in test_config['object_types']] | ||||
_, kafka_port = kafka_server | _, kafka_port = kafka_server | ||||
consumer = KafkaConsumer( | consumer = KafkaConsumer( | ||||
*kafka_topics, | *kafka_topics, | ||||
bootstrap_servers='localhost:{}'.format(kafka_port), | bootstrap_servers='localhost:{}'.format(kafka_port), | ||||
consumer_timeout_ms=constants.DEFAULT_CONSUMER_TIMEOUT_MS, | consumer_timeout_ms=constants.DEFAULT_CONSUMER_TIMEOUT_MS, | ||||
key_deserializer=kafka_to_key, | key_deserializer=kafka_to_key, | ||||
value_deserializer=kafka_to_value, | value_deserializer=kafka_to_value, | ||||
Show All 12 Lines |