Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_kafka_writer.py
# Copyright (C) 2018-2020 The Software Heritage developers | # Copyright (C) 2018-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import pytest | import pytest | ||||
from confluent_kafka import Consumer, Producer | from confluent_kafka import Consumer, Producer | ||||
from swh.model.model import Directory | from swh.model.model import Directory | ||||
from swh.journal.tests.journal_data import TEST_OBJECTS | from swh.journal.tests.journal_data import TEST_OBJECTS | ||||
from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed | from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed | ||||
from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError | from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError | ||||
def test_kafka_writer(kafka_prefix: str, kafka_server: str, consumer: Consumer): | def test_kafka_writer(kafka_prefix: str, kafka_server: str, consumer: Consumer): | ||||
kafka_prefix += ".swh.journal.objects" | |||||
writer = KafkaJournalWriter( | writer = KafkaJournalWriter( | ||||
brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, | brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, | ||||
) | ) | ||||
expected_messages = 0 | expected_messages = 0 | ||||
for object_type, objects in TEST_OBJECTS.items(): | for object_type, objects in TEST_OBJECTS.items(): | ||||
writer.write_additions(object_type, objects) | writer.write_additions(object_type, objects) | ||||
▲ Show 20 Lines • Show All 46 Lines • ▼ Show 20 Lines | ): | ||||
class MockProducer(Producer): | class MockProducer(Producer): | ||||
"""A kafka producer which pretends to produce messages, but never sends any | """A kafka producer which pretends to produce messages, but never sends any | ||||
delivery acknowledgements""" | delivery acknowledgements""" | ||||
def produce(self, **kwargs): | def produce(self, **kwargs): | ||||
produced.append(kwargs) | produced.append(kwargs) | ||||
kafka_prefix += ".swh.journal.objects" | |||||
writer = KafkaJournalWriter( | writer = KafkaJournalWriter( | ||||
brokers=[kafka_server], | brokers=[kafka_server], | ||||
client_id="kafka_writer", | client_id="kafka_writer", | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
flush_timeout=1, | flush_timeout=1, | ||||
producer_class=MockProducer, | producer_class=MockProducer, | ||||
) | ) | ||||
Show All 11 Lines |