Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_kafka_writer.py
# Copyright (C) 2018-2020 The Software Heritage developers | # Copyright (C) 2018-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from typing import Iterable | |||||
import pytest | import pytest | ||||
from confluent_kafka import Consumer, Producer | from confluent_kafka import Consumer, Producer | ||||
from swh.model.model import Directory | from swh.model.model import Directory | ||||
from swh.journal.tests.journal_data import TEST_OBJECTS | from swh.journal.tests.journal_data import TEST_OBJECTS | ||||
from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed | from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed | ||||
from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError | from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError | ||||
def test_kafka_writer(kafka_prefix: str, kafka_server: str, consumer: Consumer): | def test_kafka_writer( | ||||
kafka_prefix: str, | |||||
kafka_server: str, | |||||
consumer: Consumer, | |||||
privileged_object_types: Iterable[str], | |||||
): | |||||
writer = KafkaJournalWriter( | writer = KafkaJournalWriter( | ||||
brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, | brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, | ||||
) | ) | ||||
expected_messages = 0 | expected_messages = 0 | ||||
for object_type, objects in TEST_OBJECTS.items(): | for object_type, objects in TEST_OBJECTS.items(): | ||||
writer.write_additions(object_type, objects) | writer.write_additions(object_type, objects) | ||||
expected_messages += len(objects) | expected_messages += len(objects) | ||||
if object_type in privileged_object_types: | |||||
writer.write_additions(object_type, objects, privileged=True) | |||||
expected_messages += len(objects) | |||||
consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) | consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) | ||||
assert_all_objects_consumed(consumed_messages) | assert_all_objects_consumed(consumed_messages) | ||||
def test_write_delivery_failure( | def test_write_delivery_failure( | ||||
kafka_prefix: str, kafka_server: str, consumer: Consumer | kafka_prefix: str, kafka_server: str, consumer: Consumer | ||||
): | ): | ||||
class MockKafkaError: | class MockKafkaError: | ||||
▲ Show 20 Lines • Show All 63 Lines • Show Last 20 Lines |