Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9337424
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
12 KB
Subscribers
None
View Options
diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py
index 72bd89b..2bd6dc8 100644
--- a/swh/journal/tests/conftest.py
+++ b/swh/journal/tests/conftest.py
@@ -1,206 +1,208 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import pytest
from kafka import KafkaConsumer, KafkaProducer
from subprocess import Popen
from typing import Tuple
from pathlib import Path
from pytest_kafka import (
make_zookeeper_process, make_kafka_server, make_kafka_consumer
)
from swh.journal.publisher import JournalPublisher
from swh.model.hashutil import hash_to_bytes
from swh.journal.serializers import kafka_to_key, key_to_kafka
TEST_CONFIG = {
'brokers': ['localhost'],
'temporary_prefix': 'swh.tmp_journal.new',
'final_prefix': 'swh.journal.objects',
'consumer_id': 'swh.journal.publisher',
'publisher_id': 'swh.journal.publisher',
- 'object_types': ['content', 'revision', 'release', 'origin'],
+ 'object_types': ['content'],
'max_messages': 1, # will read 1 message and stops
'storage': {'cls': 'memory', 'args': {}}
}
CONTENTS = [
{
'length': 3,
'sha1': hash_to_bytes(
'34973274ccef6ab4dfaaf86599792fa9c3fe4689'),
'sha1_git': b'foo',
'blake2s256': b'bar',
'sha256': b'baz',
'status': 'visible',
},
]
COMMITTER = [
{
'id': 1,
'fullname': 'foo',
},
{
'id': 2,
'fullname': 'bar',
}
]
REVISIONS = [
{
'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'),
'message': b'hello',
'date': {
'timestamp': {
'seconds': 1234567891,
'microseconds': 0,
},
'offset': 120,
'negative_utc': None,
},
'committer': COMMITTER[0],
'author': COMMITTER[0],
'committer_date': None,
},
{
'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'),
'message': b'hello again',
'date': {
'timestamp': {
'seconds': 1234567892,
'microseconds': 0,
},
'offset': 120,
'negative_utc': None,
},
'committer': COMMITTER[1],
'author': COMMITTER[1],
'committer_date': None,
},
]
RELEASES = [
{
'id': hash_to_bytes('d81cc0710eb6cf9efd5b920a8453e1e07157b6cd'),
'name': b'v0.0.1',
'date': {
'timestamp': {
'seconds': 1234567890,
'microseconds': 0,
},
'offset': 120,
'negative_utc': None,
},
'author': COMMITTER[0],
},
]
ORIGINS = [
{
'url': 'https://somewhere.org/den/fox',
'type': 'git',
},
{
'url': 'https://overtherainbow.org/fox/den',
'type': 'svn',
}
]
ORIGIN_VISITS = [
{
'date': '2013-05-07T04:20:39.369271+00:00',
},
{
'date': '2018-11-27T17:20:39.000000+00:00',
}
]
class JournalPublisherTest(JournalPublisher):
"""A journal publisher which override the default configuration
parsing setup.
"""
def _prepare_storage(self, config):
super()._prepare_storage(config)
self.storage.content_add({'data': b'42', **c} for c in CONTENTS)
self.storage.revision_add(REVISIONS)
self.storage.release_add(RELEASES)
origins = self.storage.origin_add(ORIGINS)
origin_visits = []
for i, ov in enumerate(ORIGIN_VISITS):
origin_id = origins[i]['id']
ov = self.storage.origin_visit_add(origin_id, ov['date'])
origin_visits.append(ov)
self.origins = origins
self.origin_visits = origin_visits
print("publisher.origin-visits", self.origin_visits)
KAFKA_ROOT = os.environ.get('SWH_KAFKA_ROOT', Path(__file__).parent)
KAFKA_SCRIPTS = KAFKA_ROOT / 'kafka/bin/'
KAFKA_BIN = str(KAFKA_SCRIPTS / 'kafka-server-start.sh')
ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / 'zookeeper-server-start.sh')
# Those defines fixtures
zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN)
kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc')
@pytest.fixture
-def kafka_producer(request: 'SubRequest', kafka_server: Tuple[Popen, int]) -> KafkaProducer: # noqa
+def producer_to_publisher(
+ request: 'SubRequest',
+ kafka_server: Tuple[Popen, int]) -> KafkaProducer: # noqa
+ """Producer to send message to the publisher's consumer.
+
+ """
_, port = kafka_server
- producer = KafkaProducer(bootstrap_servers='localhost:{}'.format(port))
+ producer = KafkaProducer(
+ bootstrap_servers='localhost:{}'.format(port),
+ key_serializer=key_to_kafka,
+ value_serializer=key_to_kafka,
+ client_id=TEST_CONFIG['consumer_id'],
+ )
return producer
@pytest.fixture
-def kafka_consumer(
- request: 'SubRequest', kafka_server: Tuple[Popen, int]) -> KafkaConsumer: # noqa
+def consumer_from_publisher(request: 'SubRequest') -> KafkaConsumer: # noqa
+ """Consumer to read message from the publisher's producer message
- TOPIC = 'abc'
+ """
+ subscribed_topics = [
+ '%s.%s' % (TEST_CONFIG['final_prefix'], object_type)
+ for object_type in TEST_CONFIG['object_types']
+ ]
+ print(subscribed_topics)
kafka_consumer = make_kafka_consumer(
- 'kafka_server', seek_to_beginning=True, kafka_topics=[TOPIC])
+ 'kafka_server',
+ seek_to_beginning=True,
+ value_deserializer=kafka_to_key,
+ auto_offset_reset='earliest',
+ enable_auto_commit=False,
+ client_id=TEST_CONFIG['publisher_id'],
+ kafka_topics=subscribed_topics) # Callback [..., KafkaConsumer]
return kafka_consumer(request)
-class JournalPublisherKafkaInMemoryStorage(JournalPublisherTest):
- """A journal publisher with:
- - kafka dependency
- - in-memory storage
-
- """
- def _prepare_journal(self, config):
- """No journal for now
-
- """
- self.consumer = KafkaConsumer(
- bootstrap_servers=config['brokers'],
- value_deserializer=kafka_to_key,
- auto_offset_reset='earliest',
- enable_auto_commit=False,
- group_id=config['consumer_id'],
- )
- self.producer = KafkaProducer(
- bootstrap_servers=config['brokers'],
- key_serializer=key_to_kafka,
- value_serializer=key_to_kafka,
- client_id=config['publisher_id'],
- )
-
-
@pytest.fixture
-def journal_publisher(request: 'SubRequest', kafka_consumer, kafka_producer):
- return JournalPublisherKafkaInMemoryStorage(TEST_CONFIG)
+def publisher(
+ request: 'SubRequest',
+ kafka_server: Tuple[Popen, int]) -> JournalPublisher:
+ # consumer and producer of the publisher needs to discuss with the
+ # right instance
+ _, port = kafka_server
+ TEST_CONFIG['brokers'] = ['localhost:{}'.format(port)]
+ return JournalPublisher(TEST_CONFIG)
diff --git a/swh/journal/tests/test_publisher2.py b/swh/journal/tests/test_publisher2.py
index b3428f4..23eb337 100644
--- a/swh/journal/tests/test_publisher2.py
+++ b/swh/journal/tests/test_publisher2.py
@@ -1,78 +1,62 @@
# Copyright (C) 2018-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from kafka import KafkaConsumer, KafkaProducer
+from swh.journal.publisher import JournalPublisher
+from .conftest import (
+ TEST_CONFIG, CONTENTS, REVISIONS, RELEASES, ORIGINS
+)
-def write_and_read(kafka_producer: KafkaProducer,
- kafka_consumer: KafkaConsumer) -> None:
- """Produces writes to a topic, consumer consumes from the same topic.
+def test_publisher(
+ publisher: JournalPublisher,
+ consumer_from_publisher: KafkaConsumer,
+ producer_to_publisher: KafkaProducer):
"""
- message = b'msg'
- topic = 'abc'
- # write to kafka
- kafka_producer.send(topic, message)
- kafka_producer.flush()
- # read from it
- consumed = list(kafka_consumer)
- assert len(consumed) == 1
- assert consumed[0].topic == topic
- assert consumed[0].value == message
-
-
-def test_read_write(kafka_producer: KafkaProducer,
- kafka_consumer: KafkaConsumer):
- """Independent test from the publisher so far"""
- write_and_read(kafka_producer, kafka_consumer)
-
-
-def test_poll_publisher():
- pass
-
-# def setUp(self):
-# self.publisher = JournalPublisherTest()
-# self.contents = [{b'sha1': c['sha1']} for c in CONTENTS]
-# # self.revisions = [{b'id': c['id']} for c in REVISIONS]
-# # self.releases = [{b'id': c['id']} for c in RELEASES]
-# # producer and consumer to send and read data from publisher
-# self.producer_to_publisher = KafkaProducer(
-# bootstrap_servers=TEST_CONFIG['brokers'],
-# key_serializer=key_to_kafka,
-# value_serializer=key_to_kafka,
-# acks='all')
-# self.consumer_from_publisher = KafkaConsumer(
-# bootstrap_servers=TEST_CONFIG['brokers'],
-# value_deserializer=kafka_to_key)
-# self.consumer_from_publisher.subscribe(
-# topics=['%s.%s' % (TEST_CONFIG['temporary_prefix'], object_type)
-# for object_type in TEST_CONFIG['object_types']])
-
-
-# def test_poll(kafka_consumer):
-# # given (send message to the publisher)
-# self.producer_to_publisher.send(
-# '%s.content' % TEST_CONFIG['temporary_prefix'],
-# self.contents[0]
-# )
-
-# nb_messages = 1
-
-# # when (the publisher poll 1 message and send 1 reified object)
-# self.publisher.poll(max_messages=nb_messages)
-
-# # then (client reads from the messages from output topic)
-# msgs = []
-# for num, msg in enumerate(self.consumer_from_publisher):
-# print('#### consumed msg %s: %s ' % (num, msg))
-# msgs.append(msg)
-
-# self.assertEqual(len(msgs), nb_messages)
-# print('##### msgs: %s' % msgs)
-# # check the results
-# expected_topic = 'swh.journal.objects.content'
-# expected_object = (self.contents[0][b'sha1'], CONTENTS[0])
-
-# self.assertEqual(msgs, (expected_topic, expected_object))
+ Reading from and writing to the journal publisher should work
+
+ Args:
+ journal_publisher (JournalPublisher): publisher to read and write data
+ kafka_consumer (KafkaConsumer): To read data from the publisher
+ kafka_producer (KafkaProducer): To send data to the publisher
+
+ """
+
+ contents = [{b'sha1': c['sha1']} for c in CONTENTS]
+
+ # revisions = [{b'id': c['id']} for c in REVISIONS]
+ # releases = [{b'id': c['id']} for c in RELEASES]
+
+ # read the output of the publisher
+ consumer_from_publisher.subscribe(
+ topics=['%s.%s' % (TEST_CONFIG['temporary_prefix'], object_type)
+ for object_type in TEST_CONFIG['object_types']])
+
+ # send message to the publisher
+ producer_to_publisher.send(
+ '%s.content' % TEST_CONFIG['temporary_prefix'],
+ contents[0]
+ )
+
+ nb_messages = 1
+
+ # publisher should poll 1 message and send 1 reified object
+ publisher.poll(max_messages=nb_messages)
+
+ # then (client reads from the messages from output topic)
+ msgs = []
+ for num, msg in enumerate(consumer_from_publisher):
+ print('#### consumed msg %s: %s ' % (num, msg))
+ msgs.append(msg)
+
+ assert len(msgs) == nb_messages
+
+ print('##### msgs: %s' % msgs)
+ # check the results
+ expected_topic = '%s.content' % TEST_CONFIG['final_prefix']
+ expected_object = (contents[0][b'sha1'], CONTENTS[0])
+
+ assert msgs == (expected_topic, expected_object)
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Jul 4 2025, 8:04 AM (10 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3246753
Attached To
rDJNL Journal infrastructure
Event Timeline
Log In to Comment