Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_publisher_kafka.py
Show All 11 Lines | |||||
from swh.journal.publisher import JournalPublisher | from swh.journal.publisher import JournalPublisher | ||||
from .conftest import OBJECT_TYPE_KEYS | from .conftest import OBJECT_TYPE_KEYS | ||||
def assert_publish_ok(publisher: JournalPublisher, | def assert_publish_ok(publisher: JournalPublisher, | ||||
consumer_from_publisher: KafkaConsumer, | consumer_from_publisher: KafkaConsumer, | ||||
producer_to_publisher: KafkaProducer, | producer_to_publisher: KafkaProducer, | ||||
test_config: dict, | test_publisher_config: dict, | ||||
object_type: str): | object_type: str): | ||||
"""Assert that publishing object in the publisher is reified and | """Assert that publishing object in the publisher is reified and | ||||
published in output topics. | published in output topics. | ||||
Args: | Args: | ||||
publisher (JournalPublisher): publisher to read and write data | publisher (JournalPublisher): publisher to read and write data | ||||
consumer_from_publisher (KafkaConsumer): To read data from the | consumer_from_publisher (KafkaConsumer): To read data from the | ||||
publisher | publisher | ||||
Show All 10 Lines | if object_key_id: | ||||
for c in expected_objects] | for c in expected_objects] | ||||
else: | else: | ||||
# TODO: add support for origin and origin_visit | # TODO: add support for origin and origin_visit | ||||
return | return | ||||
# send message to the publisher | # send message to the publisher | ||||
for obj in objects: | for obj in objects: | ||||
producer_to_publisher.send( | producer_to_publisher.send( | ||||
'%s.%s' % (test_config['temporary_prefix'], object_type), | '%s.%s' % (test_publisher_config['temporary_prefix'], object_type), | ||||
obj | obj | ||||
) | ) | ||||
nb_messages = len(objects) | nb_messages = len(objects) | ||||
for _ in range(nb_messages): | for _ in range(nb_messages): | ||||
publisher.poll(max_messages=1) | publisher.poll(max_messages=1) | ||||
# then (client reads from the messages from output topic) | # then (client reads from the messages from output topic) | ||||
expected_topic = '%s.%s' % (test_config['final_prefix'], object_type) | expected_topic = '%s.%s' % (test_publisher_config['final_prefix'], | ||||
object_type) | |||||
expected_msgs = [ | expected_msgs = [ | ||||
( | ( | ||||
object_[object_key_id], | object_[object_key_id], | ||||
kafka_to_value(value_to_kafka(object_)) | kafka_to_value(value_to_kafka(object_)) | ||||
) | ) | ||||
for object_ in expected_objects] | for object_ in expected_objects] | ||||
msgs = list(consumer_from_publisher) | msgs = list(consumer_from_publisher) | ||||
assert all(msg.topic == expected_topic for msg in msgs) | assert all(msg.topic == expected_topic for msg in msgs) | ||||
assert [(msg.key, msg.value) for msg in msgs] == expected_msgs | assert [(msg.key, msg.value) for msg in msgs] == expected_msgs | ||||
def test_publish( | def test_publish( | ||||
publisher: JournalPublisher, | publisher: JournalPublisher, | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
test_config: dict, | test_publisher_config: dict, | ||||
consumer_from_publisher: KafkaConsumer, | consumer_from_publisher: KafkaConsumer, | ||||
producer_to_publisher: KafkaProducer): | producer_to_publisher: KafkaProducer): | ||||
""" | """ | ||||
Reading from and writing to the journal publisher should work (contents) | Reading from and writing to the journal publisher should work (contents) | ||||
Args: | Args: | ||||
journal_publisher (JournalPublisher): publisher to read and write data | journal_publisher (JournalPublisher): publisher to read and write data | ||||
consumer_from_publisher (KafkaConsumer): To read data from publisher | consumer_from_publisher (KafkaConsumer): To read data from publisher | ||||
producer_to_publisher (KafkaProducer): To send data to publisher | producer_to_publisher (KafkaProducer): To send data to publisher | ||||
""" | """ | ||||
# retrieve the object types we want to test | # retrieve the object types we want to test | ||||
object_types = OBJECT_TYPE_KEYS.keys() | object_types = OBJECT_TYPE_KEYS.keys() | ||||
# Now for each object type, we'll send data to the publisher and | # Now for each object type, we'll send data to the publisher and | ||||
# check that data is indeed fetched and reified in the publisher's | # check that data is indeed fetched and reified in the publisher's | ||||
# output topics | # output topics | ||||
for object_type in object_types: | for object_type in object_types: | ||||
assert_publish_ok( | assert_publish_ok( | ||||
publisher, consumer_from_publisher, producer_to_publisher, | publisher, consumer_from_publisher, producer_to_publisher, | ||||
test_config, object_type) | test_publisher_config, object_type) |