diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py index 00ae826..9094faa 100644 --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -1,201 +1,199 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest import logging from kafka import KafkaProducer from subprocess import Popen -from typing import Tuple +from typing import Tuple, Dict from pathlib import Path from pytest_kafka import ( make_zookeeper_process, make_kafka_server, make_kafka_consumer ) from swh.journal.publisher import JournalPublisher from swh.model.hashutil import hash_to_bytes from swh.journal.serializers import kafka_to_key, key_to_kafka, kafka_to_value TEST_CONFIG = { 'brokers': ['something'], # this will be overriden in publisher setup 'temporary_prefix': 'swh.tmp_journal.new', 'final_prefix': 'swh.journal.objects', 'consumer_id': 'swh.journal.publisher', 'publisher_id': 'swh.journal.publisher', 'object_types': ['something'], # this will be overriden in publisher setup 'max_messages': 1, # will read 1 message and stops 'storage': {'cls': 'memory', 'args': {}} } CONTENTS = [ { 'length': 3, 'sha1': hash_to_bytes( '34973274ccef6ab4dfaaf86599792fa9c3fe4689'), 'sha1_git': b'foo', 'blake2s256': b'bar', 'sha256': b'baz', 'status': 'visible', }, ] COMMITTER = [ { 'id': 1, 'fullname': 'foo', }, { 'id': 2, 'fullname': 'bar', } ] REVISIONS = [ { 'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'), 'message': b'hello', 'date': { 'timestamp': { 'seconds': 1234567891, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'committer': COMMITTER[0], 'author': COMMITTER[0], 'committer_date': None, }, { 'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'), 'message': b'hello again', 'date': { 'timestamp': { 'seconds': 1234567892, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'committer': COMMITTER[1], 'author': COMMITTER[1], 'committer_date': None, }, ] RELEASES = [ { 'id': hash_to_bytes('d81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), 'name': b'v0.0.1', 'date': { 'timestamp': { 'seconds': 1234567890, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'author': COMMITTER[0], }, ] ORIGINS = [ { 'url': 'https://somewhere.org/den/fox', 'type': 'git', }, { 'url': 'https://overtherainbow.org/fox/den', 'type': 'svn', } ] ORIGIN_VISITS = [ { 'date': '2013-05-07T04:20:39.369271+00:00', }, { 'date': '2018-11-27T17:20:39.000000+00:00', } ] class JournalPublisherTest(JournalPublisher): """A journal publisher which override the default configuration parsing setup. """ def _prepare_storage(self, config): super()._prepare_storage(config) self.storage.content_add({'data': b'42', **c} for c in CONTENTS) self.storage.revision_add(REVISIONS) self.storage.release_add(RELEASES) origins = self.storage.origin_add(ORIGINS) origin_visits = [] for i, ov in enumerate(ORIGIN_VISITS): origin_id = origins[i]['id'] ov = self.storage.origin_visit_add(origin_id, ov['date']) origin_visits.append(ov) self.origins = origins self.origin_visits = origin_visits KAFKA_ROOT = os.environ.get('SWH_KAFKA_ROOT', Path(__file__).parent) KAFKA_SCRIPTS = KAFKA_ROOT / 'kafka/bin/' KAFKA_BIN = str(KAFKA_SCRIPTS / 'kafka-server-start.sh') ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / 'zookeeper-server-start.sh') # Those defines fixtures zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN) kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc') logger = logging.getLogger('kafka') logger.setLevel(logging.WARN) @pytest.fixture def producer_to_publisher( request: 'SubRequest', # noqa F821 kafka_server: Tuple[Popen, int]) -> KafkaProducer: # noqa """Producer to send message to the publisher's consumer. """ _, port = kafka_server producer = KafkaProducer( bootstrap_servers='localhost:{}'.format(port), key_serializer=key_to_kafka, value_serializer=key_to_kafka, client_id=TEST_CONFIG['consumer_id'], ) return producer # pytest fixture (no need for the annotation though or else it breaks) consumer_from_publisher = make_kafka_consumer( 'kafka_server', seek_to_beginning=True, key_deserializer=kafka_to_key, value_deserializer=kafka_to_value, auto_offset_reset='earliest', enable_auto_commit=True, client_id=TEST_CONFIG['publisher_id'], kafka_topics=['dummy']) # will be overriden during test setup -@pytest.fixture -def publisher( - request: 'SubRequest', # noqa F821 - kafka_server: Tuple[Popen, int]) -> JournalPublisher: +def publisher(kafka_server: Tuple[Popen, int], + config: Dict) -> JournalPublisher: # consumer and producer of the publisher needs to discuss with the # right instance _, port = kafka_server - TEST_CONFIG['brokers'] = ['localhost:{}'.format(port)] - return JournalPublisherTest(TEST_CONFIG) + config['brokers'] = ['localhost:{}'.format(port)] + return JournalPublisherTest(config) diff --git a/swh/journal/tests/test_publisher2.py b/swh/journal/tests/test_publisher2.py index 7e3af6f..3f8371d 100644 --- a/swh/journal/tests/test_publisher2.py +++ b/swh/journal/tests/test_publisher2.py @@ -1,101 +1,110 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from kafka import KafkaConsumer, KafkaProducer -from typing import Dict, Text +from subprocess import Popen +from typing import Tuple, Dict, Text from swh.journal.publisher import JournalPublisher from .conftest import ( - TEST_CONFIG, CONTENTS, REVISIONS, RELEASES, ORIGINS + TEST_CONFIG, CONTENTS, REVISIONS, RELEASES, publisher ) OBJECT_TYPE_KEYS = { - # 'content': (b'sha1', CONTENTS), - 'revision': (b'id', REVISIONS), + 'content': (b'sha1', CONTENTS), + # 'revision': (b'id', REVISIONS), # 'release': (b'id', RELEASES), } def assert_publish(publisher: JournalPublisher, consumer_from_publisher: KafkaConsumer, producer_to_publisher: KafkaProducer, object_type: Text): """Assert that publishing object in the publisher is reified and published in topics. Args: journal_publisher (JournalPublisher): publisher to read and write data kafka_consumer (KafkaConsumer): To read data from the publisher kafka_producer (KafkaProducer): To send data to the publisher """ # object type's id label key object_key_id, expected_objects = OBJECT_TYPE_KEYS[object_type] # objects to send to the publisher objects = [{object_key_id: c[object_key_id.decode()]} for c in expected_objects] # send message to the publisher for obj in objects: producer_to_publisher.send( '%s.%s' % (TEST_CONFIG['temporary_prefix'], object_type), obj ) nb_messages = len(objects) # publisher should poll 1 message and send 1 reified object publisher.poll(max_messages=nb_messages) # then (client reads from the messages from output topic) msgs = [] for num, msg in enumerate(consumer_from_publisher): - print('#### consumer_from_publisher: msg %s: %s ' % (num, msg)) - print('#### consumer_from_publisher: msg.value %s: %s ' % ( - num, msg.value)) msgs.append((msg.topic, msg.key, msg.value)) - expected_topic = '%s.content' % TEST_CONFIG['final_prefix'] + expected_topic = '%s.%s' % (TEST_CONFIG['final_prefix'], object_type) assert expected_topic == msg.topic expected_key = objects[num][object_key_id] assert expected_key == msg.key expected_value = expected_objects[num] # Transformation is needed due to msgpack which encodes keys and values value = {} for k, v in msg.value.items(): k = k.decode() if k == 'status': v = v.decode() value[k] = v assert expected_value == value def test_publish( - publisher: JournalPublisher, + kafka_server: Tuple[Popen, int], consumer_from_publisher: KafkaConsumer, producer_to_publisher: KafkaProducer): """ Reading from and writing to the journal publisher should work (contents) Args: journal_publisher (JournalPublisher): publisher to read and write data kafka_consumer (KafkaConsumer): To read data from the publisher kafka_producer (KafkaProducer): To send data to the publisher """ + # retrieve the object types we want to test object_types = OBJECT_TYPE_KEYS.keys() - # Subscribe to topics + # synchronize the publisher's config with the test + conf = TEST_CONFIG.copy() + conf['object_types'] = object_types + # instantiate the publisher (not a fixture due to initialization) + p = publisher(kafka_server, config=conf) + + # Subscribe to the publisher's output topics consumer_from_publisher.subscribe( - topics=['%s.%s' % (TEST_CONFIG['final_prefix'], object_type) + topics=['%s.%s' % (conf['final_prefix'], object_type) for object_type in object_types]) + + # Now for each object type, we'll send data to the publisher and + # check that data is indeed fetched and reified in the publisher's + # output topics for object_type in object_types: - assert_publish(publisher, consumer_from_publisher, + assert_publish(p, consumer_from_publisher, producer_to_publisher, object_type)