diff --git a/swh/journal/tests/test_publisher.py b/swh/journal/tests/test_publisher.py --- a/swh/journal/tests/test_publisher.py +++ b/swh/journal/tests/test_publisher.py @@ -8,6 +8,68 @@ from swh.model.hashutil import hash_to_bytes from swh.journal.publisher import JournalPublisher from swh.storage.in_memory import Storage +from kafka import KafkaConsumer, KafkaProducer + +from swh.journal.serializers import kafka_to_key, key_to_kafka + + +class MockStorage: + # Type from object to their corresponding expected key id + type_to_key = { + 'content': 'sha1', + 'revision': 'id', + 'release': 'id', + } + + def __init__(self, state): + """Initialize mock storage's state. + + Args: + state (dict): keys are the object type (content, revision, + release) and values are a list of dict + representing the associated typed objects + + """ + self.state = {} + for type, key in self.type_to_key.items(): + self.state[type] = { + obj[key]: obj for obj in state[type] + } + + def get(self, type, objects): + """Given an object type and a list of objects with type type, returns + the state's matching objects. + + Args: + type (str): expected object type (release, revision, content) + objects ([bytes]): list of object id (bytes) + + Returns: + list of dict corresponding to the id provided + + """ + data = [] + if type not in self.type_to_key: + raise ValueError('Programmatic error: expected %s not %s' % ( + ', '.join(self.type_to_key), type + )) + object_ids = self.state[type] + for _id in objects: + c = object_ids.get(_id) + if c: + data.append(c) + + return data + + def content_get_metadata(self, contents): + return self.get('content', contents) + + def revision_get(self, revisions): + return self.get('revision', revisions) + + def release_get(self, releases): + return self.get('release', releases) + CONTENTS = [ { @@ -105,10 +167,10 @@ 'brokers': ['localhost'], 'temporary_prefix': 'swh.tmp_journal.new', 'final_prefix': 'swh.journal.objects', - 'consumer_id': 'swh.journal.test.publisher', - 'publisher_id': 'swh.journal.test.publisher', + 'consumer_id': 'swh.journal.publisher', + 'publisher_id': 'swh.journal.publisher', 'object_types': ['content'], - 'max_messages': 3, + 'max_messages': 1, # will read 1 message and stops } @@ -133,9 +195,9 @@ class JournalPublisherNoKafkaInMemoryStorage(JournalPublisherTest): """A journal publisher with: - no kafka dependency - - in-memory storage - """ + - mock storage as storage + """ def _prepare_journal(self, config): """No journal for now @@ -143,7 +205,10 @@ pass -class TestPublisher(unittest.TestCase): +class TestPublisherNoKafka(unittest.TestCase): + """This tests only the part not using any kafka instance + + """ def setUp(self): self.publisher = JournalPublisherNoKafkaInMemoryStorage(TEST_CONFIG) self.contents = [{b'sha1': c['sha1']} for c in CONTENTS] @@ -221,3 +286,56 @@ } self.assertEqual(actual_objects, expected_objects) + + +class TestPublisher(unittest.TestCase): + """This tests a publisher actually speaking with kafka underneath + + """ + def setUp(self): + self.publisher = JournalPublisherTest() + self.contents = [{b'sha1': c['sha1']} for c in CONTENTS] + # self.revisions = [{b'id': c['id']} for c in REVISIONS] + # self.releases = [{b'id': c['id']} for c in RELEASES] + # producer and consumer to send and read data from publisher + self.producer_to_publisher = KafkaProducer( + bootstrap_servers=TEST_CONFIG['brokers'], + key_serializer=key_to_kafka, + value_serializer=key_to_kafka, + acks='all') + self.consumer_from_publisher = KafkaConsumer( + bootstrap_servers=TEST_CONFIG['brokers'], + value_deserializer=kafka_to_key) + self.consumer_from_publisher.subscribe( + topics=['%s.%s' % (TEST_CONFIG['temporary_prefix'], object_type) + for object_type in TEST_CONFIG['object_types']]) + import logging + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s %(process)d %(levelname)s %(message)s') + + def test_poll(self): + # given (send message to the publisher) + self.producer_to_publisher.send( + '%s.content' % TEST_CONFIG['temporary_prefix'], + self.contents[0] + ) + + nb_messages = 1 + + # when (the publisher poll 1 message and send 1 reified object) + self.publisher.poll(max_messages=nb_messages) + + # then (client reads from the messages from output topic) + msgs = [] + for num, msg in enumerate(self.consumer_from_publisher): + print('#### consumed msg %s: %s ' % (num, msg)) + msgs.append(msg) + + self.assertEqual(len(msgs), nb_messages) + print('##### msgs: %s' % msgs) + # check the results + expected_topic = 'swh.journal.objects.content' + expected_object = (self.contents[0][b'sha1'], CONTENTS[0]) + + self.assertEqual(msgs, (expected_topic, expected_object)) diff --git a/tox.ini b/tox.ini --- a/tox.ini +++ b/tox.ini @@ -5,8 +5,9 @@ deps = .[testing] pytest-cov + pifpaf commands = - pytest --cov=swh --cov-branch {posargs} + pifpaf run kafka -- pytest --cov=swh --cov-branch {posargs} [testenv:flake8] skip_install = true