Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py
index 72bd89b..2bd6dc8 100644
--- a/swh/journal/tests/conftest.py
+++ b/swh/journal/tests/conftest.py
@@ -1,206 +1,208 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import pytest
from kafka import KafkaConsumer, KafkaProducer
from subprocess import Popen
from typing import Tuple
from pathlib import Path
from pytest_kafka import (
make_zookeeper_process, make_kafka_server, make_kafka_consumer
)
from swh.journal.publisher import JournalPublisher
from swh.model.hashutil import hash_to_bytes
from swh.journal.serializers import kafka_to_key, key_to_kafka
TEST_CONFIG = {
'brokers': ['localhost'],
'temporary_prefix': 'swh.tmp_journal.new',
'final_prefix': 'swh.journal.objects',
'consumer_id': 'swh.journal.publisher',
'publisher_id': 'swh.journal.publisher',
- 'object_types': ['content', 'revision', 'release', 'origin'],
+ 'object_types': ['content'],
'max_messages': 1, # will read 1 message and stops
'storage': {'cls': 'memory', 'args': {}}
}
CONTENTS = [
{
'length': 3,
'sha1': hash_to_bytes(
'34973274ccef6ab4dfaaf86599792fa9c3fe4689'),
'sha1_git': b'foo',
'blake2s256': b'bar',
'sha256': b'baz',
'status': 'visible',
},
]
COMMITTER = [
{
'id': 1,
'fullname': 'foo',
},
{
'id': 2,
'fullname': 'bar',
}
]
REVISIONS = [
{
'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'),
'message': b'hello',
'date': {
'timestamp': {
'seconds': 1234567891,
'microseconds': 0,
},
'offset': 120,
'negative_utc': None,
},
'committer': COMMITTER[0],
'author': COMMITTER[0],
'committer_date': None,
},
{
'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'),
'message': b'hello again',
'date': {
'timestamp': {
'seconds': 1234567892,
'microseconds': 0,
},
'offset': 120,
'negative_utc': None,
},
'committer': COMMITTER[1],
'author': COMMITTER[1],
'committer_date': None,
},
]
RELEASES = [
{
'id': hash_to_bytes('d81cc0710eb6cf9efd5b920a8453e1e07157b6cd'),
'name': b'v0.0.1',
'date': {
'timestamp': {
'seconds': 1234567890,
'microseconds': 0,
},
'offset': 120,
'negative_utc': None,
},
'author': COMMITTER[0],
},
]
ORIGINS = [
{
'url': 'https://somewhere.org/den/fox',
'type': 'git',
},
{
'url': 'https://overtherainbow.org/fox/den',
'type': 'svn',
}
]
ORIGIN_VISITS = [
{
'date': '2013-05-07T04:20:39.369271+00:00',
},
{
'date': '2018-11-27T17:20:39.000000+00:00',
}
]
class JournalPublisherTest(JournalPublisher):
"""A journal publisher which override the default configuration
parsing setup.
"""
def _prepare_storage(self, config):
super()._prepare_storage(config)
self.storage.content_add({'data': b'42', **c} for c in CONTENTS)
self.storage.revision_add(REVISIONS)
self.storage.release_add(RELEASES)
origins = self.storage.origin_add(ORIGINS)
origin_visits = []
for i, ov in enumerate(ORIGIN_VISITS):
origin_id = origins[i]['id']
ov = self.storage.origin_visit_add(origin_id, ov['date'])
origin_visits.append(ov)
self.origins = origins
self.origin_visits = origin_visits
print("publisher.origin-visits", self.origin_visits)
KAFKA_ROOT = os.environ.get('SWH_KAFKA_ROOT', Path(__file__).parent)
KAFKA_SCRIPTS = KAFKA_ROOT / 'kafka/bin/'
KAFKA_BIN = str(KAFKA_SCRIPTS / 'kafka-server-start.sh')
ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / 'zookeeper-server-start.sh')
# Those defines fixtures
zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN)
kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc')
@pytest.fixture
-def kafka_producer(request: 'SubRequest', kafka_server: Tuple[Popen, int]) -> KafkaProducer: # noqa
+def producer_to_publisher(
+ request: 'SubRequest',
+ kafka_server: Tuple[Popen, int]) -> KafkaProducer: # noqa
+ """Producer to send message to the publisher's consumer.
+
+ """
_, port = kafka_server
- producer = KafkaProducer(bootstrap_servers='localhost:{}'.format(port))
+ producer = KafkaProducer(
+ bootstrap_servers='localhost:{}'.format(port),
+ key_serializer=key_to_kafka,
+ value_serializer=key_to_kafka,
+ client_id=TEST_CONFIG['consumer_id'],
+ )
return producer
@pytest.fixture
-def kafka_consumer(
- request: 'SubRequest', kafka_server: Tuple[Popen, int]) -> KafkaConsumer: # noqa
+def consumer_from_publisher(request: 'SubRequest') -> KafkaConsumer: # noqa
+ """Consumer to read message from the publisher's producer message
- TOPIC = 'abc'
+ """
+ subscribed_topics = [
+ '%s.%s' % (TEST_CONFIG['final_prefix'], object_type)
+ for object_type in TEST_CONFIG['object_types']
+ ]
+ print(subscribed_topics)
kafka_consumer = make_kafka_consumer(
- 'kafka_server', seek_to_beginning=True, kafka_topics=[TOPIC])
+ 'kafka_server',
+ seek_to_beginning=True,
+ value_deserializer=kafka_to_key,
+ auto_offset_reset='earliest',
+ enable_auto_commit=False,
+ client_id=TEST_CONFIG['publisher_id'],
+ kafka_topics=subscribed_topics) # Callback [..., KafkaConsumer]
return kafka_consumer(request)
-class JournalPublisherKafkaInMemoryStorage(JournalPublisherTest):
- """A journal publisher with:
- - kafka dependency
- - in-memory storage
-
- """
- def _prepare_journal(self, config):
- """No journal for now
-
- """
- self.consumer = KafkaConsumer(
- bootstrap_servers=config['brokers'],
- value_deserializer=kafka_to_key,
- auto_offset_reset='earliest',
- enable_auto_commit=False,
- group_id=config['consumer_id'],
- )
- self.producer = KafkaProducer(
- bootstrap_servers=config['brokers'],
- key_serializer=key_to_kafka,
- value_serializer=key_to_kafka,
- client_id=config['publisher_id'],
- )
-
-
@pytest.fixture
-def journal_publisher(request: 'SubRequest', kafka_consumer, kafka_producer):
- return JournalPublisherKafkaInMemoryStorage(TEST_CONFIG)
+def publisher(
+ request: 'SubRequest',
+ kafka_server: Tuple[Popen, int]) -> JournalPublisher:
+ # consumer and producer of the publisher needs to discuss with the
+ # right instance
+ _, port = kafka_server
+ TEST_CONFIG['brokers'] = ['localhost:{}'.format(port)]
+ return JournalPublisher(TEST_CONFIG)
diff --git a/swh/journal/tests/test_publisher2.py b/swh/journal/tests/test_publisher2.py
index b3428f4..23eb337 100644
--- a/swh/journal/tests/test_publisher2.py
+++ b/swh/journal/tests/test_publisher2.py
@@ -1,78 +1,62 @@
# Copyright (C) 2018-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from kafka import KafkaConsumer, KafkaProducer
+from swh.journal.publisher import JournalPublisher
+from .conftest import (
+ TEST_CONFIG, CONTENTS, REVISIONS, RELEASES, ORIGINS
+)
-def write_and_read(kafka_producer: KafkaProducer,
- kafka_consumer: KafkaConsumer) -> None:
- """Produces writes to a topic, consumer consumes from the same topic.
+def test_publisher(
+ publisher: JournalPublisher,
+ consumer_from_publisher: KafkaConsumer,
+ producer_to_publisher: KafkaProducer):
"""
- message = b'msg'
- topic = 'abc'
- # write to kafka
- kafka_producer.send(topic, message)
- kafka_producer.flush()
- # read from it
- consumed = list(kafka_consumer)
- assert len(consumed) == 1
- assert consumed[0].topic == topic
- assert consumed[0].value == message
-
-
-def test_read_write(kafka_producer: KafkaProducer,
- kafka_consumer: KafkaConsumer):
- """Independent test from the publisher so far"""
- write_and_read(kafka_producer, kafka_consumer)
-
-
-def test_poll_publisher():
- pass
-
-# def setUp(self):
-# self.publisher = JournalPublisherTest()
-# self.contents = [{b'sha1': c['sha1']} for c in CONTENTS]
-# # self.revisions = [{b'id': c['id']} for c in REVISIONS]
-# # self.releases = [{b'id': c['id']} for c in RELEASES]
-# # producer and consumer to send and read data from publisher
-# self.producer_to_publisher = KafkaProducer(
-# bootstrap_servers=TEST_CONFIG['brokers'],
-# key_serializer=key_to_kafka,
-# value_serializer=key_to_kafka,
-# acks='all')
-# self.consumer_from_publisher = KafkaConsumer(
-# bootstrap_servers=TEST_CONFIG['brokers'],
-# value_deserializer=kafka_to_key)
-# self.consumer_from_publisher.subscribe(
-# topics=['%s.%s' % (TEST_CONFIG['temporary_prefix'], object_type)
-# for object_type in TEST_CONFIG['object_types']])
-
-
-# def test_poll(kafka_consumer):
-# # given (send message to the publisher)
-# self.producer_to_publisher.send(
-# '%s.content' % TEST_CONFIG['temporary_prefix'],
-# self.contents[0]
-# )
-
-# nb_messages = 1
-
-# # when (the publisher poll 1 message and send 1 reified object)
-# self.publisher.poll(max_messages=nb_messages)
-
-# # then (client reads from the messages from output topic)
-# msgs = []
-# for num, msg in enumerate(self.consumer_from_publisher):
-# print('#### consumed msg %s: %s ' % (num, msg))
-# msgs.append(msg)
-
-# self.assertEqual(len(msgs), nb_messages)
-# print('##### msgs: %s' % msgs)
-# # check the results
-# expected_topic = 'swh.journal.objects.content'
-# expected_object = (self.contents[0][b'sha1'], CONTENTS[0])
-
-# self.assertEqual(msgs, (expected_topic, expected_object))
+ Reading from and writing to the journal publisher should work
+
+ Args:
+ journal_publisher (JournalPublisher): publisher to read and write data
+ kafka_consumer (KafkaConsumer): To read data from the publisher
+ kafka_producer (KafkaProducer): To send data to the publisher
+
+ """
+
+ contents = [{b'sha1': c['sha1']} for c in CONTENTS]
+
+ # revisions = [{b'id': c['id']} for c in REVISIONS]
+ # releases = [{b'id': c['id']} for c in RELEASES]
+
+ # read the output of the publisher
+ consumer_from_publisher.subscribe(
+ topics=['%s.%s' % (TEST_CONFIG['temporary_prefix'], object_type)
+ for object_type in TEST_CONFIG['object_types']])
+
+ # send message to the publisher
+ producer_to_publisher.send(
+ '%s.content' % TEST_CONFIG['temporary_prefix'],
+ contents[0]
+ )
+
+ nb_messages = 1
+
+ # publisher should poll 1 message and send 1 reified object
+ publisher.poll(max_messages=nb_messages)
+
+ # then (client reads from the messages from output topic)
+ msgs = []
+ for num, msg in enumerate(consumer_from_publisher):
+ print('#### consumed msg %s: %s ' % (num, msg))
+ msgs.append(msg)
+
+ assert len(msgs) == nb_messages
+
+ print('##### msgs: %s' % msgs)
+ # check the results
+ expected_topic = '%s.content' % TEST_CONFIG['final_prefix']
+ expected_object = (contents[0][b'sha1'], CONTENTS[0])
+
+ assert msgs == (expected_topic, expected_object)

File Metadata

Mime Type
text/x-diff
Expires
Jul 4 2025, 8:04 AM (10 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3246753

Event Timeline