diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py index 72bd89b..2bd6dc8 100644 --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -1,206 +1,208 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest from kafka import KafkaConsumer, KafkaProducer from subprocess import Popen from typing import Tuple from pathlib import Path from pytest_kafka import ( make_zookeeper_process, make_kafka_server, make_kafka_consumer ) from swh.journal.publisher import JournalPublisher from swh.model.hashutil import hash_to_bytes from swh.journal.serializers import kafka_to_key, key_to_kafka TEST_CONFIG = { 'brokers': ['localhost'], 'temporary_prefix': 'swh.tmp_journal.new', 'final_prefix': 'swh.journal.objects', 'consumer_id': 'swh.journal.publisher', 'publisher_id': 'swh.journal.publisher', - 'object_types': ['content', 'revision', 'release', 'origin'], + 'object_types': ['content'], 'max_messages': 1, # will read 1 message and stops 'storage': {'cls': 'memory', 'args': {}} } CONTENTS = [ { 'length': 3, 'sha1': hash_to_bytes( '34973274ccef6ab4dfaaf86599792fa9c3fe4689'), 'sha1_git': b'foo', 'blake2s256': b'bar', 'sha256': b'baz', 'status': 'visible', }, ] COMMITTER = [ { 'id': 1, 'fullname': 'foo', }, { 'id': 2, 'fullname': 'bar', } ] REVISIONS = [ { 'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'), 'message': b'hello', 'date': { 'timestamp': { 'seconds': 1234567891, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'committer': COMMITTER[0], 'author': COMMITTER[0], 'committer_date': None, }, { 'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'), 'message': b'hello again', 'date': { 'timestamp': { 'seconds': 1234567892, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'committer': COMMITTER[1], 'author': COMMITTER[1], 'committer_date': None, }, ] RELEASES = [ { 'id': hash_to_bytes('d81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), 'name': b'v0.0.1', 'date': { 'timestamp': { 'seconds': 1234567890, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'author': COMMITTER[0], }, ] ORIGINS = [ { 'url': 'https://somewhere.org/den/fox', 'type': 'git', }, { 'url': 'https://overtherainbow.org/fox/den', 'type': 'svn', } ] ORIGIN_VISITS = [ { 'date': '2013-05-07T04:20:39.369271+00:00', }, { 'date': '2018-11-27T17:20:39.000000+00:00', } ] class JournalPublisherTest(JournalPublisher): """A journal publisher which override the default configuration parsing setup. """ def _prepare_storage(self, config): super()._prepare_storage(config) self.storage.content_add({'data': b'42', **c} for c in CONTENTS) self.storage.revision_add(REVISIONS) self.storage.release_add(RELEASES) origins = self.storage.origin_add(ORIGINS) origin_visits = [] for i, ov in enumerate(ORIGIN_VISITS): origin_id = origins[i]['id'] ov = self.storage.origin_visit_add(origin_id, ov['date']) origin_visits.append(ov) self.origins = origins self.origin_visits = origin_visits print("publisher.origin-visits", self.origin_visits) KAFKA_ROOT = os.environ.get('SWH_KAFKA_ROOT', Path(__file__).parent) KAFKA_SCRIPTS = KAFKA_ROOT / 'kafka/bin/' KAFKA_BIN = str(KAFKA_SCRIPTS / 'kafka-server-start.sh') ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / 'zookeeper-server-start.sh') # Those defines fixtures zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN) kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc') @pytest.fixture -def kafka_producer(request: 'SubRequest', kafka_server: Tuple[Popen, int]) -> KafkaProducer: # noqa +def producer_to_publisher( + request: 'SubRequest', + kafka_server: Tuple[Popen, int]) -> KafkaProducer: # noqa + """Producer to send message to the publisher's consumer. + + """ _, port = kafka_server - producer = KafkaProducer(bootstrap_servers='localhost:{}'.format(port)) + producer = KafkaProducer( + bootstrap_servers='localhost:{}'.format(port), + key_serializer=key_to_kafka, + value_serializer=key_to_kafka, + client_id=TEST_CONFIG['consumer_id'], + ) return producer @pytest.fixture -def kafka_consumer( - request: 'SubRequest', kafka_server: Tuple[Popen, int]) -> KafkaConsumer: # noqa +def consumer_from_publisher(request: 'SubRequest') -> KafkaConsumer: # noqa + """Consumer to read message from the publisher's producer message - TOPIC = 'abc' + """ + subscribed_topics = [ + '%s.%s' % (TEST_CONFIG['final_prefix'], object_type) + for object_type in TEST_CONFIG['object_types'] + ] + print(subscribed_topics) kafka_consumer = make_kafka_consumer( - 'kafka_server', seek_to_beginning=True, kafka_topics=[TOPIC]) + 'kafka_server', + seek_to_beginning=True, + value_deserializer=kafka_to_key, + auto_offset_reset='earliest', + enable_auto_commit=False, + client_id=TEST_CONFIG['publisher_id'], + kafka_topics=subscribed_topics) # Callback [..., KafkaConsumer] return kafka_consumer(request) -class JournalPublisherKafkaInMemoryStorage(JournalPublisherTest): - """A journal publisher with: - - kafka dependency - - in-memory storage - - """ - def _prepare_journal(self, config): - """No journal for now - - """ - self.consumer = KafkaConsumer( - bootstrap_servers=config['brokers'], - value_deserializer=kafka_to_key, - auto_offset_reset='earliest', - enable_auto_commit=False, - group_id=config['consumer_id'], - ) - self.producer = KafkaProducer( - bootstrap_servers=config['brokers'], - key_serializer=key_to_kafka, - value_serializer=key_to_kafka, - client_id=config['publisher_id'], - ) - - @pytest.fixture -def journal_publisher(request: 'SubRequest', kafka_consumer, kafka_producer): - return JournalPublisherKafkaInMemoryStorage(TEST_CONFIG) +def publisher( + request: 'SubRequest', + kafka_server: Tuple[Popen, int]) -> JournalPublisher: + # consumer and producer of the publisher needs to discuss with the + # right instance + _, port = kafka_server + TEST_CONFIG['brokers'] = ['localhost:{}'.format(port)] + return JournalPublisher(TEST_CONFIG) diff --git a/swh/journal/tests/test_publisher2.py b/swh/journal/tests/test_publisher2.py index b3428f4..23eb337 100644 --- a/swh/journal/tests/test_publisher2.py +++ b/swh/journal/tests/test_publisher2.py @@ -1,78 +1,62 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from kafka import KafkaConsumer, KafkaProducer +from swh.journal.publisher import JournalPublisher +from .conftest import ( + TEST_CONFIG, CONTENTS, REVISIONS, RELEASES, ORIGINS +) -def write_and_read(kafka_producer: KafkaProducer, - kafka_consumer: KafkaConsumer) -> None: - """Produces writes to a topic, consumer consumes from the same topic. +def test_publisher( + publisher: JournalPublisher, + consumer_from_publisher: KafkaConsumer, + producer_to_publisher: KafkaProducer): """ - message = b'msg' - topic = 'abc' - # write to kafka - kafka_producer.send(topic, message) - kafka_producer.flush() - # read from it - consumed = list(kafka_consumer) - assert len(consumed) == 1 - assert consumed[0].topic == topic - assert consumed[0].value == message - - -def test_read_write(kafka_producer: KafkaProducer, - kafka_consumer: KafkaConsumer): - """Independent test from the publisher so far""" - write_and_read(kafka_producer, kafka_consumer) - - -def test_poll_publisher(): - pass - -# def setUp(self): -# self.publisher = JournalPublisherTest() -# self.contents = [{b'sha1': c['sha1']} for c in CONTENTS] -# # self.revisions = [{b'id': c['id']} for c in REVISIONS] -# # self.releases = [{b'id': c['id']} for c in RELEASES] -# # producer and consumer to send and read data from publisher -# self.producer_to_publisher = KafkaProducer( -# bootstrap_servers=TEST_CONFIG['brokers'], -# key_serializer=key_to_kafka, -# value_serializer=key_to_kafka, -# acks='all') -# self.consumer_from_publisher = KafkaConsumer( -# bootstrap_servers=TEST_CONFIG['brokers'], -# value_deserializer=kafka_to_key) -# self.consumer_from_publisher.subscribe( -# topics=['%s.%s' % (TEST_CONFIG['temporary_prefix'], object_type) -# for object_type in TEST_CONFIG['object_types']]) - - -# def test_poll(kafka_consumer): -# # given (send message to the publisher) -# self.producer_to_publisher.send( -# '%s.content' % TEST_CONFIG['temporary_prefix'], -# self.contents[0] -# ) - -# nb_messages = 1 - -# # when (the publisher poll 1 message and send 1 reified object) -# self.publisher.poll(max_messages=nb_messages) - -# # then (client reads from the messages from output topic) -# msgs = [] -# for num, msg in enumerate(self.consumer_from_publisher): -# print('#### consumed msg %s: %s ' % (num, msg)) -# msgs.append(msg) - -# self.assertEqual(len(msgs), nb_messages) -# print('##### msgs: %s' % msgs) -# # check the results -# expected_topic = 'swh.journal.objects.content' -# expected_object = (self.contents[0][b'sha1'], CONTENTS[0]) - -# self.assertEqual(msgs, (expected_topic, expected_object)) + Reading from and writing to the journal publisher should work + + Args: + journal_publisher (JournalPublisher): publisher to read and write data + kafka_consumer (KafkaConsumer): To read data from the publisher + kafka_producer (KafkaProducer): To send data to the publisher + + """ + + contents = [{b'sha1': c['sha1']} for c in CONTENTS] + + # revisions = [{b'id': c['id']} for c in REVISIONS] + # releases = [{b'id': c['id']} for c in RELEASES] + + # read the output of the publisher + consumer_from_publisher.subscribe( + topics=['%s.%s' % (TEST_CONFIG['temporary_prefix'], object_type) + for object_type in TEST_CONFIG['object_types']]) + + # send message to the publisher + producer_to_publisher.send( + '%s.content' % TEST_CONFIG['temporary_prefix'], + contents[0] + ) + + nb_messages = 1 + + # publisher should poll 1 message and send 1 reified object + publisher.poll(max_messages=nb_messages) + + # then (client reads from the messages from output topic) + msgs = [] + for num, msg in enumerate(consumer_from_publisher): + print('#### consumed msg %s: %s ' % (num, msg)) + msgs.append(msg) + + assert len(msgs) == nb_messages + + print('##### msgs: %s' % msgs) + # check the results + expected_topic = '%s.content' % TEST_CONFIG['final_prefix'] + expected_object = (contents[0][b'sha1'], CONTENTS[0]) + + assert msgs == (expected_topic, expected_object)