Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/conftest.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import os | import os | ||||
import pytest | import pytest | ||||
import logging | import logging | ||||
import random | import random | ||||
import string | import string | ||||
from kafka import KafkaConsumer | from confluent_kafka import Consumer | ||||
from subprocess import Popen | from subprocess import Popen | ||||
from typing import Tuple, Dict | from typing import Tuple, Dict | ||||
from pathlib import Path | from pathlib import Path | ||||
from pytest_kafka import ( | from pytest_kafka import ( | ||||
make_zookeeper_process, make_kafka_server, constants | make_zookeeper_process, make_kafka_server | ||||
) | ) | ||||
from swh.model.hashutil import hash_to_bytes | from swh.model.hashutil import hash_to_bytes | ||||
from swh.journal.serializers import kafka_to_key, kafka_to_value | |||||
logger = logging.getLogger(__name__) | |||||
CONTENTS = [ | CONTENTS = [ | ||||
{ | { | ||||
'length': 3, | 'length': 3, | ||||
'sha1': hash_to_bytes( | 'sha1': hash_to_bytes( | ||||
'34973274ccef6ab4dfaaf86599792fa9c3fe4689'), | '34973274ccef6ab4dfaaf86599792fa9c3fe4689'), | ||||
'sha1_git': b'foo', | 'sha1_git': b'foo', | ||||
▲ Show 20 Lines • Show All 139 Lines • ▼ Show 20 Lines | |||||
# Those defines fixtures | # Those defines fixtures | ||||
zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN, scope='session') | zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN, scope='session') | ||||
os.environ['KAFKA_LOG4J_OPTS'] = \ | os.environ['KAFKA_LOG4J_OPTS'] = \ | ||||
'-Dlog4j.configuration=file:%s/log4j.properties' % \ | '-Dlog4j.configuration=file:%s/log4j.properties' % \ | ||||
os.path.dirname(__file__) | os.path.dirname(__file__) | ||||
kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc', scope='session') | kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc', scope='session') | ||||
logger = logging.getLogger('kafka') | kafka_logger = logging.getLogger('kafka') | ||||
logger.setLevel(logging.WARN) | kafka_logger.setLevel(logging.WARN) | ||||
@pytest.fixture(scope='function') | @pytest.fixture(scope='function') | ||||
def kafka_prefix(): | def kafka_prefix(): | ||||
return ''.join(random.choice(string.ascii_lowercase) for _ in range(10)) | return ''.join(random.choice(string.ascii_lowercase) for _ in range(10)) | ||||
TEST_CONFIG = { | TEST_CONFIG = { | ||||
'consumer_id': 'swh.journal.consumer', | 'consumer_id': 'swh.journal.consumer', | ||||
'object_types': OBJECT_TYPE_KEYS.keys(), | 'object_types': OBJECT_TYPE_KEYS.keys(), | ||||
'max_messages': 1, # will read 1 message and stops | 'max_messages': 1, # will read 1 message and stops | ||||
'storage': {'cls': 'memory', 'args': {}}, | 'storage': {'cls': 'memory', 'args': {}}, | ||||
} | } | ||||
@pytest.fixture | @pytest.fixture | ||||
def test_config(kafka_server: Tuple[Popen, int], | def test_config(kafka_server: Tuple[Popen, int], | ||||
kafka_prefix: str): | kafka_prefix: str): | ||||
"""Test configuration needed for producer/consumer | """Test configuration needed for producer/consumer | ||||
""" | """ | ||||
_, port = kafka_server | _, port = kafka_server | ||||
return { | return { | ||||
**TEST_CONFIG, | **TEST_CONFIG, | ||||
'brokers': ['localhost:{}'.format(port)], | 'brokers': ['127.0.0.1:{}'.format(port)], | ||||
'prefix': kafka_prefix + '.swh.journal.objects', | 'prefix': kafka_prefix + '.swh.journal.objects', | ||||
} | } | ||||
@pytest.fixture | @pytest.fixture | ||||
def consumer( | def consumer( | ||||
kafka_server: Tuple[Popen, int], test_config: Dict) -> KafkaConsumer: | kafka_server: Tuple[Popen, int], | ||||
test_config: Dict, | |||||
kafka_prefix: str, | |||||
) -> Consumer: | |||||
"""Get a connected Kafka consumer. | """Get a connected Kafka consumer. | ||||
""" | """ | ||||
_, kafka_port = kafka_server | |||||
consumer = Consumer({ | |||||
'bootstrap.servers': '127.0.0.1:{}'.format(kafka_port), | |||||
'auto.offset.reset': 'earliest', | |||||
'enable.auto.commit': True, | |||||
'group.id': "test-consumer-%s" % kafka_prefix, | |||||
}) | |||||
kafka_topics = [ | kafka_topics = [ | ||||
'%s.%s' % (test_config['prefix'], object_type) | '%s.%s' % (test_config['prefix'], object_type) | ||||
for object_type in test_config['object_types']] | for object_type in test_config['object_types'] | ||||
vlorentz: This comment should be moved a few lines below | |||||
Done Inline ActionsTurns out this isn't needed anymore so I scrapped it. olasd: Turns out this isn't needed anymore so I scrapped it. | |||||
_, kafka_port = kafka_server | ] | ||||
consumer = KafkaConsumer( | |||||
*kafka_topics, | |||||
bootstrap_servers='localhost:{}'.format(kafka_port), | |||||
consumer_timeout_ms=constants.DEFAULT_CONSUMER_TIMEOUT_MS, | |||||
key_deserializer=kafka_to_key, | |||||
value_deserializer=kafka_to_value, | |||||
auto_offset_reset='earliest', | |||||
enable_auto_commit=True, | |||||
group_id="test-consumer" | |||||
) | |||||
# Enforce auto_offset_reset=earliest even if the consumer was created | consumer.subscribe(kafka_topics) | ||||
# too soon wrt the server. | |||||
while len(consumer.assignment()) == 0: | |||||
consumer.poll(timeout_ms=20) | |||||
consumer.seek_to_beginning() | |||||
return consumer | return consumer |
This comment should be moved a few lines below