Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_kafka_writer.py
# Copyright (C) 2018-2020 The Software Heritage developers | # Copyright (C) 2018-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from typing import Iterable | from typing import Iterable | ||||
from confluent_kafka import Consumer, Producer | from confluent_kafka import Consumer, Producer | ||||
import pytest | import pytest | ||||
from swh.journal.pytest_plugin import assert_all_objects_consumed, consume_messages | from swh.journal.pytest_plugin import assert_all_objects_consumed, consume_messages | ||||
from swh.journal.tests.journal_data import TEST_OBJECTS | from swh.journal.tests.journal_data import TEST_OBJECTS | ||||
from swh.journal.writer import model_object_dict_sanitizer | |||||
from swh.journal.writer.kafka import KafkaDeliveryError, KafkaJournalWriter | from swh.journal.writer.kafka import KafkaDeliveryError, KafkaJournalWriter | ||||
from swh.model.model import Directory, Release, Revision | from swh.model.model import Directory, Release, Revision | ||||
def test_kafka_writer( | def test_kafka_writer( | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_server: str, | kafka_server: str, | ||||
consumer: Consumer, | consumer: Consumer, | ||||
privileged_object_types: Iterable[str], | privileged_object_types: Iterable[str], | ||||
): | ): | ||||
writer = KafkaJournalWriter( | writer = KafkaJournalWriter( | ||||
brokers=[kafka_server], | brokers=[kafka_server], | ||||
client_id="kafka_writer", | client_id="kafka_writer", | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
value_sanitizer=model_object_dict_sanitizer, | |||||
anonymize=False, | anonymize=False, | ||||
) | ) | ||||
expected_messages = 0 | expected_messages = 0 | ||||
for object_type, objects in TEST_OBJECTS.items(): | for object_type, objects in TEST_OBJECTS.items(): | ||||
writer.write_additions(object_type, objects) | writer.write_additions(object_type, objects) | ||||
expected_messages += len(objects) | expected_messages += len(objects) | ||||
Show All 24 Lines | def test_kafka_writer_anonymized( | ||||
kafka_server: str, | kafka_server: str, | ||||
consumer: Consumer, | consumer: Consumer, | ||||
privileged_object_types: Iterable[str], | privileged_object_types: Iterable[str], | ||||
): | ): | ||||
writer = KafkaJournalWriter( | writer = KafkaJournalWriter( | ||||
brokers=[kafka_server], | brokers=[kafka_server], | ||||
client_id="kafka_writer", | client_id="kafka_writer", | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
value_sanitizer=model_object_dict_sanitizer, | |||||
anonymize=True, | anonymize=True, | ||||
) | ) | ||||
expected_messages = 0 | expected_messages = 0 | ||||
for object_type, objects in TEST_OBJECTS.items(): | for object_type, objects in TEST_OBJECTS.items(): | ||||
writer.write_additions(object_type, objects) | writer.write_additions(object_type, objects) | ||||
expected_messages += len(objects) | expected_messages += len(objects) | ||||
Show All 37 Lines | class KafkaJournalWriterFailDelivery(KafkaJournalWriter): | ||||
"""A journal writer which always fails delivering messages""" | """A journal writer which always fails delivering messages""" | ||||
def _on_delivery(self, error, message): | def _on_delivery(self, error, message): | ||||
"""Replace the inbound error with a fake delivery error""" | """Replace the inbound error with a fake delivery error""" | ||||
super()._on_delivery(MockKafkaError(), message) | super()._on_delivery(MockKafkaError(), message) | ||||
kafka_prefix += ".swh.journal.objects" | kafka_prefix += ".swh.journal.objects" | ||||
writer = KafkaJournalWriterFailDelivery( | writer = KafkaJournalWriterFailDelivery( | ||||
brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, | brokers=[kafka_server], | ||||
client_id="kafka_writer", | |||||
prefix=kafka_prefix, | |||||
value_sanitizer=model_object_dict_sanitizer, | |||||
) | ) | ||||
empty_dir = Directory(entries=()) | empty_dir = Directory(entries=()) | ||||
with pytest.raises(KafkaDeliveryError) as exc: | with pytest.raises(KafkaDeliveryError) as exc: | ||||
writer.write_addition("directory", empty_dir) | writer.write_addition("directory", empty_dir) | ||||
assert "Failed deliveries" in exc.value.message | assert "Failed deliveries" in exc.value.message | ||||
assert len(exc.value.delivery_failures) == 1 | assert len(exc.value.delivery_failures) == 1 | ||||
Show All 14 Lines | class MockProducer(Producer): | ||||
def produce(self, **kwargs): | def produce(self, **kwargs): | ||||
produced.append(kwargs) | produced.append(kwargs) | ||||
writer = KafkaJournalWriter( | writer = KafkaJournalWriter( | ||||
brokers=[kafka_server], | brokers=[kafka_server], | ||||
client_id="kafka_writer", | client_id="kafka_writer", | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
value_sanitizer=model_object_dict_sanitizer, | |||||
flush_timeout=1, | flush_timeout=1, | ||||
producer_class=MockProducer, | producer_class=MockProducer, | ||||
) | ) | ||||
empty_dir = Directory(entries=()) | empty_dir = Directory(entries=()) | ||||
with pytest.raises(KafkaDeliveryError) as exc: | with pytest.raises(KafkaDeliveryError) as exc: | ||||
writer.write_addition("directory", empty_dir) | writer.write_addition("directory", empty_dir) | ||||
assert len(produced) == 1 | assert len(produced) == 1 | ||||
assert "timeout" in exc.value.message | assert "timeout" in exc.value.message | ||||
assert len(exc.value.delivery_failures) == 1 | assert len(exc.value.delivery_failures) == 1 | ||||
delivery_failure = exc.value.delivery_failures[0] | delivery_failure = exc.value.delivery_failures[0] | ||||
assert delivery_failure.key == empty_dir.id | assert delivery_failure.key == empty_dir.id | ||||
assert delivery_failure.code == "SWH_FLUSH_TIMEOUT" | assert delivery_failure.code == "SWH_FLUSH_TIMEOUT" |