Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_publisher.py
- This file was copied to swh/journal/tests/test_publisher_no_kafka.py.
# Copyright (C) 2018 The Software Heritage developers | # Copyright (C) 2018-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import unittest | |||||
from swh.model.hashutil import hash_to_bytes | from kafka import KafkaConsumer, KafkaProducer | ||||
from subprocess import Popen | |||||
from typing import Tuple, Text | |||||
from swh.journal.serializers import value_to_kafka, kafka_to_value | |||||
from swh.journal.publisher import JournalPublisher | from swh.journal.publisher import JournalPublisher | ||||
from swh.storage.in_memory import Storage | |||||
CONTENTS = [ | from .conftest import ( | ||||
{ | TEST_CONFIG, CONTENTS, REVISIONS, RELEASES, publisher | ||||
'length': 3, | ) | ||||
'sha1': hash_to_bytes( | |||||
'34973274ccef6ab4dfaaf86599792fa9c3fe4689'), | |||||
'sha1_git': b'foo', | |||||
'blake2s256': b'bar', | |||||
'sha256': b'baz', | |||||
'status': 'visible', | |||||
}, | |||||
] | |||||
COMMITTER = [ | |||||
{ | |||||
'id': 1, | |||||
'fullname': 'foo', | |||||
}, | |||||
{ | |||||
'id': 2, | |||||
'fullname': 'bar', | |||||
} | |||||
] | |||||
REVISIONS = [ | |||||
{ | |||||
'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'), | |||||
'message': b'hello', | |||||
'date': { | |||||
'timestamp': { | |||||
'seconds': 1234567891, | |||||
'microseconds': 0, | |||||
}, | |||||
'offset': 120, | |||||
'negative_utc': None, | |||||
}, | |||||
'committer': COMMITTER[0], | |||||
'author': COMMITTER[0], | |||||
'committer_date': None, | |||||
}, | |||||
{ | |||||
'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'), | |||||
'message': b'hello again', | |||||
'date': { | |||||
'timestamp': { | |||||
'seconds': 1234567892, | |||||
'microseconds': 0, | |||||
}, | |||||
'offset': 120, | |||||
'negative_utc': None, | |||||
}, | |||||
'committer': COMMITTER[1], | |||||
'author': COMMITTER[1], | |||||
'committer_date': None, | |||||
}, | |||||
] | |||||
RELEASES = [ | |||||
{ | |||||
'id': hash_to_bytes('d81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), | |||||
'name': b'v0.0.1', | |||||
'date': { | |||||
'timestamp': { | |||||
'seconds': 1234567890, | |||||
'microseconds': 0, | |||||
}, | |||||
'offset': 120, | |||||
'negative_utc': None, | |||||
}, | |||||
'author': COMMITTER[0], | |||||
}, | |||||
] | |||||
ORIGINS = [ | |||||
{ | |||||
'url': 'https://somewhere.org/den/fox', | |||||
'type': 'git', | |||||
}, | |||||
{ | |||||
'url': 'https://overtherainbow.org/fox/den', | |||||
'type': 'svn', | |||||
} | |||||
] | |||||
ORIGIN_VISITS = [ | OBJECT_TYPE_KEYS = { | ||||
{ | 'content': (b'sha1', CONTENTS), | ||||
'date': '2013-05-07T04:20:39.369271+00:00', | 'revision': (b'id', REVISIONS), | ||||
vlorentz: Nooooo, not a mock storage D: | |||||
Done Inline Actionslol, don't worry, that should go away, that's old code, i wanted to keep the other part of the diff, not that ;) ardumont: lol, don't worry, that should go away, that's old code, i wanted to keep the other part of the… | |||||
Done Inline ActionsThis predates the in-memory storage, author date is: AuthorDate: Fri Oct 26 12:04:33 2018 +0200 ardumont: This predates the in-memory storage, author date is: AuthorDate: Fri Oct 26 12:04:33 2018 +0200 | |||||
Done Inline ActionsGuess what, it's dead code, the code here already uses an in-memory one! \m/ ardumont: Guess what, it's dead code, the code here already uses an in-memory one!
\m/ | |||||
}, | 'release': (b'id', RELEASES), | ||||
{ | |||||
'date': '2018-11-27T17:20:39.000000+00:00', | |||||
} | } | ||||
] | |||||
TEST_CONFIG = { | |||||
'brokers': ['localhost'], | |||||
'temporary_prefix': 'swh.tmp_journal.new', | |||||
'final_prefix': 'swh.journal.objects', | |||||
'consumer_id': 'swh.journal.test.publisher', | |||||
'publisher_id': 'swh.journal.test.publisher', | |||||
'object_types': ['content'], | |||||
'max_messages': 3, | |||||
} | |||||
def assert_publish(publisher: JournalPublisher, | |||||
consumer_from_publisher: KafkaConsumer, | |||||
producer_to_publisher: KafkaProducer, | |||||
object_type: Text): | |||||
"""Assert that publishing object in the publisher is reified and | |||||
published in topics. | |||||
class JournalPublisherTest(JournalPublisher): | Args: | ||||
def _prepare_storage(self, config): | journal_publisher (JournalPublisher): publisher to read and write data | ||||
self.storage = Storage() | kafka_consumer (KafkaConsumer): To read data from the publisher | ||||
self.storage.content_add({'data': b'42', **c} for c in CONTENTS) | kafka_producer (KafkaProducer): To send data to the publisher | ||||
self.storage.revision_add(REVISIONS) | |||||
self.storage.release_add(RELEASES) | |||||
origins = self.storage.origin_add(ORIGINS) | |||||
origin_visits = [] | |||||
for i, ov in enumerate(ORIGIN_VISITS): | |||||
origin_id = origins[i]['id'] | |||||
ov = self.storage.origin_visit_add(origin_id, ov['date']) | |||||
origin_visits.append(ov) | |||||
self.origins = origins | |||||
self.origin_visits = origin_visits | |||||
print("publisher.origin-visits", self.origin_visits) | |||||
class JournalPublisherNoKafkaInMemoryStorage(JournalPublisherTest): | |||||
"""A journal publisher with: | |||||
- no kafka dependency | |||||
- in-memory storage | |||||
""" | |||||
def _prepare_journal(self, config): | |||||
"""No journal for now | |||||
""" | """ | ||||
pass | # object type's id label key | ||||
object_key_id, expected_objects = OBJECT_TYPE_KEYS[object_type] | |||||
# objects to send to the publisher | |||||
class TestPublisher(unittest.TestCase): | objects = [{object_key_id: c[object_key_id.decode()]} | ||||
def setUp(self): | for c in expected_objects] | ||||
self.publisher = JournalPublisherNoKafkaInMemoryStorage(TEST_CONFIG) | |||||
self.contents = [{b'sha1': c['sha1']} for c in CONTENTS] | # send message to the publisher | ||||
self.revisions = [{b'id': c['id']} for c in REVISIONS] | for obj in objects: | ||||
self.releases = [{b'id': c['id']} for c in RELEASES] | producer_to_publisher.send( | ||||
# those needs id generation from the storage | '%s.%s' % (TEST_CONFIG['temporary_prefix'], object_type), | ||||
# so initialization is different than other entities | obj | ||||
self.origins = [{b'url': o['url'], | ) | ||||
b'type': o['type']} | |||||
for o in self.publisher.origins] | nb_messages = len(objects) | ||||
self.origin_visits = [{b'origin': ov['origin'], | |||||
b'visit': ov['visit']} | # publisher should poll 1 message and send 1 reified object | ||||
for ov in self.publisher.origin_visits] | publisher.poll(max_messages=nb_messages) | ||||
# full objects | |||||
storage = self.publisher.storage | # then (client reads from the messages from output topic) | ||||
ovs = [] | msgs = [] | ||||
for ov in self.origin_visits: | for num, msg in enumerate(consumer_from_publisher): | ||||
_ov = storage.origin_visit_get_by( | msgs.append((msg.topic, msg.key, msg.value)) | ||||
ov[b'origin'], ov[b'visit']) | |||||
_ov['date'] = str(_ov['date']) | expected_topic = '%s.%s' % (TEST_CONFIG['final_prefix'], object_type) | ||||
ovs.append(_ov) | assert expected_topic == msg.topic | ||||
self.expected_origin_visits = ovs | |||||
expected_key = objects[num][object_key_id] | |||||
def test_process_contents(self): | assert expected_key == msg.key | ||||
actual_contents = self.publisher.process_contents(self.contents) | |||||
expected_contents = [(c['sha1'], c) for c in CONTENTS] | # Transformation is needed due to our back and forth | ||||
self.assertEqual(actual_contents, expected_contents) | # serialization to kafka | ||||
expected_value = kafka_to_value(value_to_kafka(expected_objects[num])) | |||||
def test_process_revisions(self): | assert expected_value == msg.value | ||||
actual_revisions = self.publisher.process_revisions(self.revisions) | |||||
expected_revisions = [(c['id'], c) for c in REVISIONS] | |||||
self.assertEqual(actual_revisions, expected_revisions) | def test_publish( | ||||
kafka_server: Tuple[Popen, int], | |||||
def test_process_releases(self): | consumer_from_publisher: KafkaConsumer, | ||||
actual_releases = self.publisher.process_releases(self.releases) | producer_to_publisher: KafkaProducer): | ||||
expected_releases = [(c['id'], c) for c in RELEASES] | """ | ||||
self.assertEqual(actual_releases, expected_releases) | Reading from and writing to the journal publisher should work (contents) | ||||
def test_process_origins(self): | |||||
actual_origins = self.publisher.process_origins(self.origins) | |||||
expected_origins = [({'url': o[b'url'], 'type': o[b'type']}, | |||||
{'url': o[b'url'], 'type': o[b'type']}) | |||||
for o in self.origins] | |||||
self.assertEqual(actual_origins, expected_origins) | |||||
def test_process_origin_visits(self): | |||||
actual_ovs = self.publisher.process_origin_visits(self.origin_visits) | |||||
expected_ovs = [((ov['origin'], ov['visit']), ov) | |||||
for ov in self.expected_origin_visits] | |||||
self.assertEqual(actual_ovs, expected_ovs) | |||||
def test_process_objects(self): | |||||
messages = { | |||||
'content': self.contents, | |||||
'revision': self.revisions, | |||||
'release': self.releases, | |||||
'origin': self.origins, | |||||
'origin_visit': self.origin_visits, | |||||
} | |||||
actual_objects = self.publisher.process_objects(messages) | |||||
expected_contents = [(c['sha1'], c) for c in CONTENTS] | Args: | ||||
expected_revisions = [(c['id'], c) for c in REVISIONS] | journal_publisher (JournalPublisher): publisher to read and write data | ||||
expected_releases = [(c['id'], c) for c in RELEASES] | kafka_consumer (KafkaConsumer): To read data from the publisher | ||||
expected_origins = [(o, o) for o in ORIGINS] | kafka_producer (KafkaProducer): To send data to the publisher | ||||
expected_ovs = [((ov['origin'], ov['visit']), ov) | |||||
for ov in self.expected_origin_visits] | |||||
expected_objects = { | |||||
'content': expected_contents, | |||||
'revision': expected_revisions, | |||||
'release': expected_releases, | |||||
'origin': expected_origins, | |||||
'origin_visit': expected_ovs, | |||||
} | |||||
self.assertEqual(actual_objects, expected_objects) | """ | ||||
# retrieve the object types we want to test | |||||
object_types = OBJECT_TYPE_KEYS.keys() | |||||
# synchronize the publisher's config with the test | |||||
conf = TEST_CONFIG.copy() | |||||
conf['object_types'] = object_types | |||||
# instantiate the publisher (not a fixture due to initialization) | |||||
p = publisher(kafka_server, config=conf) | |||||
# Subscribe to the publisher's output topics | |||||
consumer_from_publisher.subscribe( | |||||
topics=['%s.%s' % (conf['final_prefix'], object_type) | |||||
for object_type in object_types]) | |||||
# Now for each object type, we'll send data to the publisher and | |||||
# check that data is indeed fetched and reified in the publisher's | |||||
# output topics | |||||
for object_type in object_types: | |||||
assert_publish(p, consumer_from_publisher, | |||||
producer_to_publisher, object_type) |
Nooooo, not a mock storage D: