diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py index aceb50d..8078eec 100644 --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -1,241 +1,250 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest import logging from kafka import KafkaProducer, KafkaConsumer from subprocess import Popen from typing import Tuple, Dict from pathlib import Path from pytest_kafka import ( make_zookeeper_process, make_kafka_server, constants ) from swh.journal.publisher import JournalPublisher from swh.model.hashutil import hash_to_bytes from swh.journal.serializers import kafka_to_key, key_to_kafka, kafka_to_value -TEST_CONFIG = { - 'brokers': ['something'], # this will be overriden in publisher setup - 'temporary_prefix': 'swh.tmp_journal.new', - 'final_prefix': 'swh.journal.objects', - 'consumer_id': 'swh.journal.publisher', - 'publisher_id': 'swh.journal.publisher', - 'object_types': ['something'], # this will be overriden in publisher setup - 'max_messages': 1, # will read 1 message and stops - 'storage': {'cls': 'memory', 'args': {}} -} - CONTENTS = [ { 'length': 3, 'sha1': hash_to_bytes( '34973274ccef6ab4dfaaf86599792fa9c3fe4689'), 'sha1_git': b'foo', 'blake2s256': b'bar', 'sha256': b'baz', 'status': 'visible', }, ] COMMITTER = [ { 'id': 1, 'fullname': 'foo', }, { 'id': 2, 'fullname': 'bar', } ] REVISIONS = [ { 'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'), 'message': b'hello', 'date': { 'timestamp': { 'seconds': 1234567891, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'committer': COMMITTER[0], 'author': COMMITTER[0], 'committer_date': None, }, { 'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'), 'message': b'hello again', 'date': { 'timestamp': { 'seconds': 1234567892, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'committer': COMMITTER[1], 'author': COMMITTER[1], 'committer_date': None, }, ] RELEASES = [ { 'id': hash_to_bytes('d81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), 'name': b'v0.0.1', 'date': { 'timestamp': { 'seconds': 1234567890, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'author': COMMITTER[0], }, ] ORIGINS = [ { 'url': 'https://somewhere.org/den/fox', 'type': 'git', }, { 'url': 'https://overtherainbow.org/fox/den', 'type': 'svn', } ] ORIGIN_VISITS = [ { 'date': '2013-05-07T04:20:39.369271+00:00', }, { 'date': '2018-11-27T17:20:39.000000+00:00', } ] +# From type to tuple (id, ) +OBJECT_TYPE_KEYS = { + 'content': (b'sha1', CONTENTS), + 'revision': (b'id', REVISIONS), + 'release': (b'id', RELEASES), +} + + +TEST_CONFIG = { + 'temporary_prefix': 'swh.tmp_journal.new', + 'final_prefix': 'swh.journal.objects', + 'consumer_id': 'swh.journal.publisher', + 'publisher_id': 'swh.journal.publisher', + 'object_types': OBJECT_TYPE_KEYS.keys(), + 'max_messages': 1, # will read 1 message and stops + 'storage': {'cls': 'memory', 'args': {}} +} + class JournalPublisherTest(JournalPublisher): """A journal publisher which override the default configuration parsing setup. """ def _prepare_storage(self, config): super()._prepare_storage(config) self.storage.content_add({'data': b'42', **c} for c in CONTENTS) self.storage.revision_add(REVISIONS) self.storage.release_add(RELEASES) origins = self.storage.origin_add(ORIGINS) origin_visits = [] for i, ov in enumerate(ORIGIN_VISITS): origin_id = origins[i]['id'] ov = self.storage.origin_visit_add(origin_id, ov['date']) origin_visits.append(ov) self.origins = origins self.origin_visits = origin_visits KAFKA_ROOT = os.environ.get('SWH_KAFKA_ROOT') KAFKA_ROOT = KAFKA_ROOT if KAFKA_ROOT else os.path.dirname(__file__) + '/kafka' if not os.path.exists(KAFKA_ROOT): msg = ('Development error: %s must exist and target an ' 'existing kafka installation' % KAFKA_ROOT) raise ValueError(msg) KAFKA_SCRIPTS = Path(KAFKA_ROOT) / 'bin' KAFKA_BIN = str(KAFKA_SCRIPTS / 'kafka-server-start.sh') ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / 'zookeeper-server-start.sh') # Those defines fixtures zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN) kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc') logger = logging.getLogger('kafka') logger.setLevel(logging.WARN) @pytest.fixture def test_config(): """Test configuration needed for publisher/producer/consumer """ return TEST_CONFIG @pytest.fixture def producer_to_publisher( request: 'SubRequest', # noqa F821 kafka_server: Tuple[Popen, int], test_config: Dict, ) -> KafkaProducer: # noqa """Producer to send message to the publisher's consumer. """ _, port = kafka_server producer = KafkaProducer( bootstrap_servers='localhost:{}'.format(port), key_serializer=key_to_kafka, value_serializer=key_to_kafka, client_id=test_config['consumer_id'], ) return producer @pytest.fixture def kafka_consumer( request: 'SubRequest', # noqa F821 test_config: Dict, ) -> KafkaConsumer: """Get a connected Kafka consumer. """ kafka_topics = ['%s.%s' % (test_config['final_prefix'], object_type) for object_type in test_config['object_types']] test_config['topics'] = kafka_topics consumer_kwargs = dict( key_deserializer=kafka_to_key, value_deserializer=kafka_to_value, auto_offset_reset='earliest', enable_auto_commit=True, client_id=test_config['publisher_id']) _, kafka_port = request.getfixturevalue('kafka_server') used_consumer_kwargs = consumer_kwargs.copy() used_consumer_kwargs.setdefault('consumer_timeout_ms', constants.DEFAULT_CONSUMER_TIMEOUT_MS) used_consumer_kwargs.setdefault( 'bootstrap_servers', 'localhost:{}'.format(kafka_port)) consumer = KafkaConsumer( *kafka_topics, **used_consumer_kwargs, ) return consumer +@pytest.fixture def publisher(kafka_server: Tuple[Popen, int], - config: Dict) -> JournalPublisher: + test_config: Dict) -> JournalPublisher: """Test Publisher factory. We cannot use a fixture here as we need to modify the sample. """ # consumer and producer of the publisher needs to discuss with the # right instance _, port = kafka_server - config['brokers'] = ['localhost:{}'.format(port)] - return JournalPublisherTest(config) + test_config['brokers'] = ['localhost:{}'.format(port)] + publisher = JournalPublisherTest(test_config) + return publisher diff --git a/swh/journal/tests/test_publisher_kafka.py b/swh/journal/tests/test_publisher_kafka.py index 97eb736..2fa4c6b 100644 --- a/swh/journal/tests/test_publisher_kafka.py +++ b/swh/journal/tests/test_publisher_kafka.py @@ -1,105 +1,88 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from kafka import KafkaConsumer, KafkaProducer from subprocess import Popen -from typing import Tuple, Text, Dict +from typing import Tuple, Text from swh.journal.serializers import value_to_kafka, kafka_to_value - from swh.journal.publisher import JournalPublisher -from .conftest import ( - TEST_CONFIG, CONTENTS, REVISIONS, RELEASES, publisher -) - +from .conftest import TEST_CONFIG, OBJECT_TYPE_KEYS -OBJECT_TYPE_KEYS = { - 'content': (b'sha1', CONTENTS), - 'revision': (b'id', REVISIONS), - 'release': (b'id', RELEASES), -} - -def assert_publish(publisher: JournalPublisher, - consumer_from_publisher: KafkaConsumer, - producer_to_publisher: KafkaProducer, - object_type: Text): +def assert_publish_ok(publisher: JournalPublisher, + consumer_from_publisher: KafkaConsumer, + producer_to_publisher: KafkaProducer, + object_type: Text): """Assert that publishing object in the publisher is reified and - published in topics. + published in output topics. Args: - journal_publisher (JournalPublisher): publisher to read and write data - kafka_consumer (KafkaConsumer): To read data from the publisher - kafka_producer (KafkaProducer): To send data to the publisher + publisher (JournalPublisher): publisher to read and write data + consumer_from_publisher (KafkaConsumer): To read data from the + publisher + producer_to_publisher (KafkaProducer): To send data to the publisher + object_type (str): Object type to look for (e.g content, revision, + etc...) """ # object type's id label key object_key_id, expected_objects = OBJECT_TYPE_KEYS[object_type] # objects to send to the publisher objects = [{object_key_id: c[object_key_id.decode()]} for c in expected_objects] # send message to the publisher for obj in objects: producer_to_publisher.send( '%s.%s' % (TEST_CONFIG['temporary_prefix'], object_type), obj ) nb_messages = len(objects) # publisher should poll 1 message and send 1 reified object publisher.poll(max_messages=nb_messages) # then (client reads from the messages from output topic) msgs = [] for num, msg in enumerate(consumer_from_publisher): msgs.append((msg.topic, msg.key, msg.value)) expected_topic = '%s.%s' % (TEST_CONFIG['final_prefix'], object_type) assert expected_topic == msg.topic expected_key = objects[num][object_key_id] assert expected_key == msg.key # Transformation is needed due to our back and forth # serialization to kafka expected_value = kafka_to_value(value_to_kafka(expected_objects[num])) assert expected_value == msg.value def test_publish( + publisher: JournalPublisher, kafka_server: Tuple[Popen, int], kafka_consumer: KafkaConsumer, - producer_to_publisher: KafkaProducer, - test_config: Dict): + producer_to_publisher: KafkaProducer): """ Reading from and writing to the journal publisher should work (contents) Args: journal_publisher (JournalPublisher): publisher to read and write data kafka_consumer (KafkaConsumer): To read data from the publisher - kafka_producer (KafkaProducer): To send data to the publisher + producer_to_publisher (KafkaProducer): To send data to the publisher """ # retrieve the object types we want to test object_types = OBJECT_TYPE_KEYS.keys() - # synchronize the publisher's config with the test - conf = test_config.copy() - conf['object_types'] = object_types - # instantiate the publisher (not a fixture due to initialization) - p = publisher(kafka_server, config=conf) - - # Subscribe to the publisher's output topics - kafka_consumer.subscribe( - topics=['%s.%s' % (conf['final_prefix'], object_type) - for object_type in object_types]) - # Now for each object type, we'll send data to the publisher and # check that data is indeed fetched and reified in the publisher's # output topics for object_type in object_types: - assert_publish(p, kafka_consumer, producer_to_publisher, object_type) + assert_publish_ok( + publisher, kafka_consumer, producer_to_publisher, object_type)