Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/conftest.py
Show First 20 Lines • Show All 178 Lines • ▼ Show 20 Lines | |||||
kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc', scope='session') | kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc', scope='session') | ||||
kafka_logger = logging.getLogger('kafka') | kafka_logger = logging.getLogger('kafka') | ||||
kafka_logger.setLevel(logging.WARN) | kafka_logger.setLevel(logging.WARN) | ||||
@pytest.fixture(scope='function') | @pytest.fixture(scope='function') | ||||
def kafka_prefix(): | def kafka_prefix(): | ||||
"""Pick a random prefix for kafka topics on each call""" | |||||
return ''.join(random.choice(string.ascii_lowercase) for _ in range(10)) | return ''.join(random.choice(string.ascii_lowercase) for _ in range(10)) | ||||
@pytest.fixture(scope='function') | |||||
def kafka_consumer_group(kafka_prefix: str): | |||||
"""Pick a random consumer group for kafka consumers on each call""" | |||||
return "test-consumer-%s" % kafka_prefix | |||||
TEST_CONFIG = { | TEST_CONFIG = { | ||||
'consumer_id': 'swh.journal.consumer', | 'consumer_id': 'swh.journal.consumer', | ||||
'object_types': OBJECT_TYPE_KEYS.keys(), | 'object_types': OBJECT_TYPE_KEYS.keys(), | ||||
'max_messages': 1, # will read 1 message and stops | 'max_messages': 1, # will read 1 message and stops | ||||
'storage': {'cls': 'memory', 'args': {}}, | 'storage': {'cls': 'memory', 'args': {}}, | ||||
} | } | ||||
Show All 10 Lines | return { | ||||
'prefix': kafka_prefix + '.swh.journal.objects', | 'prefix': kafka_prefix + '.swh.journal.objects', | ||||
} | } | ||||
@pytest.fixture | @pytest.fixture | ||||
def consumer( | def consumer( | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
test_config: Dict, | test_config: Dict, | ||||
kafka_prefix: str, | kafka_consumer_group: str, | ||||
) -> Consumer: | ) -> Consumer: | ||||
"""Get a connected Kafka consumer. | """Get a connected Kafka consumer. | ||||
""" | """ | ||||
_, kafka_port = kafka_server | _, kafka_port = kafka_server | ||||
consumer = Consumer({ | consumer = Consumer({ | ||||
'bootstrap.servers': '127.0.0.1:{}'.format(kafka_port), | 'bootstrap.servers': '127.0.0.1:{}'.format(kafka_port), | ||||
'auto.offset.reset': 'earliest', | 'auto.offset.reset': 'earliest', | ||||
'enable.auto.commit': True, | 'enable.auto.commit': True, | ||||
'group.id': "test-consumer-%s" % kafka_prefix, | 'group.id': kafka_consumer_group, | ||||
}) | }) | ||||
kafka_topics = [ | kafka_topics = [ | ||||
'%s.%s' % (test_config['prefix'], object_type) | '%s.%s' % (test_config['prefix'], object_type) | ||||
for object_type in test_config['object_types'] | for object_type in test_config['object_types'] | ||||
] | ] | ||||
consumer.subscribe(kafka_topics) | consumer.subscribe(kafka_topics) | ||||
yield consumer | yield consumer | ||||
consumer.close() | consumer.close() |