Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_publisher.py
# Copyright (C) 2018 The Software Heritage developers | # Copyright (C) 2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import unittest | import unittest | ||||
from swh.model.hashutil import hash_to_bytes | from swh.model.hashutil import hash_to_bytes | ||||
from swh.journal.publisher import JournalPublisher | from swh.journal.publisher import JournalPublisher | ||||
from swh.storage.in_memory import Storage | from swh.storage.in_memory import Storage | ||||
from kafka import KafkaConsumer, KafkaProducer | |||||
from swh.journal.serializers import kafka_to_key, key_to_kafka | |||||
class MockStorage: | |||||
# Type from object to their corresponding expected key id | |||||
type_to_key = { | |||||
'content': 'sha1', | |||||
vlorentz: Nooooo, not a mock storage D: | |||||
Done Inline Actionslol, don't worry, that should go away, that's old code, i wanted to keep the other part of the diff, not that ;) ardumont: lol, don't worry, that should go away, that's old code, i wanted to keep the other part of the… | |||||
Done Inline ActionsThis predates the in-memory storage, author date is: AuthorDate: Fri Oct 26 12:04:33 2018 +0200 ardumont: This predates the in-memory storage, author date is: AuthorDate: Fri Oct 26 12:04:33 2018 +0200 | |||||
Done Inline ActionsGuess what, it's dead code, the code here already uses an in-memory one! \m/ ardumont: Guess what, it's dead code, the code here already uses an in-memory one!
\m/ | |||||
'revision': 'id', | |||||
'release': 'id', | |||||
} | |||||
def __init__(self, state): | |||||
"""Initialize mock storage's state. | |||||
Args: | |||||
state (dict): keys are the object type (content, revision, | |||||
release) and values are a list of dict | |||||
representing the associated typed objects | |||||
""" | |||||
self.state = {} | |||||
for type, key in self.type_to_key.items(): | |||||
self.state[type] = { | |||||
obj[key]: obj for obj in state[type] | |||||
} | |||||
def get(self, type, objects): | |||||
"""Given an object type and a list of objects with type type, returns | |||||
the state's matching objects. | |||||
Args: | |||||
type (str): expected object type (release, revision, content) | |||||
objects ([bytes]): list of object id (bytes) | |||||
Returns: | |||||
list of dict corresponding to the id provided | |||||
""" | |||||
data = [] | |||||
if type not in self.type_to_key: | |||||
raise ValueError('Programmatic error: expected %s not %s' % ( | |||||
', '.join(self.type_to_key), type | |||||
)) | |||||
object_ids = self.state[type] | |||||
for _id in objects: | |||||
c = object_ids.get(_id) | |||||
if c: | |||||
data.append(c) | |||||
return data | |||||
def content_get_metadata(self, contents): | |||||
return self.get('content', contents) | |||||
def revision_get(self, revisions): | |||||
return self.get('revision', revisions) | |||||
def release_get(self, releases): | |||||
return self.get('release', releases) | |||||
CONTENTS = [ | CONTENTS = [ | ||||
{ | { | ||||
'length': 3, | 'length': 3, | ||||
'sha1': hash_to_bytes( | 'sha1': hash_to_bytes( | ||||
'34973274ccef6ab4dfaaf86599792fa9c3fe4689'), | '34973274ccef6ab4dfaaf86599792fa9c3fe4689'), | ||||
'sha1_git': b'foo', | 'sha1_git': b'foo', | ||||
'blake2s256': b'bar', | 'blake2s256': b'bar', | ||||
▲ Show 20 Lines • Show All 81 Lines • ▼ Show 20 Lines | ORIGIN_VISITS = [ | ||||
'date': '2018-11-27T17:20:39.000000+00:00', | 'date': '2018-11-27T17:20:39.000000+00:00', | ||||
} | } | ||||
] | ] | ||||
TEST_CONFIG = { | TEST_CONFIG = { | ||||
'brokers': ['localhost'], | 'brokers': ['localhost'], | ||||
'temporary_prefix': 'swh.tmp_journal.new', | 'temporary_prefix': 'swh.tmp_journal.new', | ||||
'final_prefix': 'swh.journal.objects', | 'final_prefix': 'swh.journal.objects', | ||||
'consumer_id': 'swh.journal.test.publisher', | 'consumer_id': 'swh.journal.publisher', | ||||
'publisher_id': 'swh.journal.test.publisher', | 'publisher_id': 'swh.journal.publisher', | ||||
'object_types': ['content'], | 'object_types': ['content'], | ||||
'max_messages': 3, | 'max_messages': 1, # will read 1 message and stops | ||||
} | } | ||||
class JournalPublisherTest(JournalPublisher): | class JournalPublisherTest(JournalPublisher): | ||||
def _prepare_storage(self, config): | def _prepare_storage(self, config): | ||||
self.storage = Storage() | self.storage = Storage() | ||||
self.storage.content_add({'data': b'42', **c} for c in CONTENTS) | self.storage.content_add({'data': b'42', **c} for c in CONTENTS) | ||||
self.storage.revision_add(REVISIONS) | self.storage.revision_add(REVISIONS) | ||||
self.storage.release_add(RELEASES) | self.storage.release_add(RELEASES) | ||||
origins = self.storage.origin_add(ORIGINS) | origins = self.storage.origin_add(ORIGINS) | ||||
origin_visits = [] | origin_visits = [] | ||||
for i, ov in enumerate(ORIGIN_VISITS): | for i, ov in enumerate(ORIGIN_VISITS): | ||||
origin_id = origins[i]['id'] | origin_id = origins[i]['id'] | ||||
ov = self.storage.origin_visit_add(origin_id, ov['date']) | ov = self.storage.origin_visit_add(origin_id, ov['date']) | ||||
origin_visits.append(ov) | origin_visits.append(ov) | ||||
self.origins = origins | self.origins = origins | ||||
self.origin_visits = origin_visits | self.origin_visits = origin_visits | ||||
print("publisher.origin-visits", self.origin_visits) | print("publisher.origin-visits", self.origin_visits) | ||||
class JournalPublisherNoKafkaInMemoryStorage(JournalPublisherTest): | class JournalPublisherNoKafkaInMemoryStorage(JournalPublisherTest): | ||||
"""A journal publisher with: | """A journal publisher with: | ||||
- no kafka dependency | - no kafka dependency | ||||
- in-memory storage | - mock storage as storage | ||||
""" | |||||
""" | |||||
def _prepare_journal(self, config): | def _prepare_journal(self, config): | ||||
"""No journal for now | """No journal for now | ||||
""" | """ | ||||
pass | pass | ||||
class TestPublisher(unittest.TestCase): | class TestPublisherNoKafka(unittest.TestCase): | ||||
"""This tests only the part not using any kafka instance | |||||
""" | |||||
def setUp(self): | def setUp(self): | ||||
self.publisher = JournalPublisherNoKafkaInMemoryStorage(TEST_CONFIG) | self.publisher = JournalPublisherNoKafkaInMemoryStorage(TEST_CONFIG) | ||||
self.contents = [{b'sha1': c['sha1']} for c in CONTENTS] | self.contents = [{b'sha1': c['sha1']} for c in CONTENTS] | ||||
self.revisions = [{b'id': c['id']} for c in REVISIONS] | self.revisions = [{b'id': c['id']} for c in REVISIONS] | ||||
self.releases = [{b'id': c['id']} for c in RELEASES] | self.releases = [{b'id': c['id']} for c in RELEASES] | ||||
# those needs id generation from the storage | # those needs id generation from the storage | ||||
# so initialization is different than other entities | # so initialization is different than other entities | ||||
self.origins = [{b'url': o['url'], | self.origins = [{b'url': o['url'], | ||||
▲ Show 20 Lines • Show All 61 Lines • ▼ Show 20 Lines | def test_process_objects(self): | ||||
'content': expected_contents, | 'content': expected_contents, | ||||
'revision': expected_revisions, | 'revision': expected_revisions, | ||||
'release': expected_releases, | 'release': expected_releases, | ||||
'origin': expected_origins, | 'origin': expected_origins, | ||||
'origin_visit': expected_ovs, | 'origin_visit': expected_ovs, | ||||
} | } | ||||
self.assertEqual(actual_objects, expected_objects) | self.assertEqual(actual_objects, expected_objects) | ||||
class TestPublisher(unittest.TestCase): | |||||
"""This tests a publisher actually speaking with kafka underneath | |||||
""" | |||||
def setUp(self): | |||||
self.publisher = JournalPublisherTest() | |||||
self.contents = [{b'sha1': c['sha1']} for c in CONTENTS] | |||||
# self.revisions = [{b'id': c['id']} for c in REVISIONS] | |||||
# self.releases = [{b'id': c['id']} for c in RELEASES] | |||||
# producer and consumer to send and read data from publisher | |||||
self.producer_to_publisher = KafkaProducer( | |||||
bootstrap_servers=TEST_CONFIG['brokers'], | |||||
key_serializer=key_to_kafka, | |||||
value_serializer=key_to_kafka, | |||||
acks='all') | |||||
self.consumer_from_publisher = KafkaConsumer( | |||||
bootstrap_servers=TEST_CONFIG['brokers'], | |||||
value_deserializer=kafka_to_key) | |||||
self.consumer_from_publisher.subscribe( | |||||
topics=['%s.%s' % (TEST_CONFIG['temporary_prefix'], object_type) | |||||
for object_type in TEST_CONFIG['object_types']]) | |||||
import logging | |||||
logging.basicConfig( | |||||
level=logging.INFO, | |||||
format='%(asctime)s %(process)d %(levelname)s %(message)s') | |||||
def test_poll(self): | |||||
# given (send message to the publisher) | |||||
self.producer_to_publisher.send( | |||||
'%s.content' % TEST_CONFIG['temporary_prefix'], | |||||
self.contents[0] | |||||
) | |||||
nb_messages = 1 | |||||
# when (the publisher poll 1 message and send 1 reified object) | |||||
self.publisher.poll(max_messages=nb_messages) | |||||
# then (client reads from the messages from output topic) | |||||
msgs = [] | |||||
for num, msg in enumerate(self.consumer_from_publisher): | |||||
print('#### consumed msg %s: %s ' % (num, msg)) | |||||
msgs.append(msg) | |||||
self.assertEqual(len(msgs), nb_messages) | |||||
print('##### msgs: %s' % msgs) | |||||
# check the results | |||||
expected_topic = 'swh.journal.objects.content' | |||||
expected_object = (self.contents[0][b'sha1'], CONTENTS[0]) | |||||
self.assertEqual(msgs, (expected_topic, expected_object)) |
Nooooo, not a mock storage D: