Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_kafka_writer.py
# Copyright (C) 2018-2020 The Software Heritage developers | # Copyright (C) 2018-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from typing import Iterable | from typing import Iterable | ||||
from confluent_kafka import Consumer, Producer | from confluent_kafka import Consumer, Producer | ||||
import pytest | import pytest | ||||
from swh.journal.pytest_plugin import assert_all_objects_consumed, consume_messages | from swh.journal.pytest_plugin import assert_all_objects_consumed, consume_messages | ||||
from swh.journal.tests.journal_data import TEST_OBJECTS | from swh.journal.tests.journal_data import TEST_OBJECTS | ||||
from swh.journal.writer import model_object_dict_sanitizer | from swh.journal.writer import model_object_dict_sanitizer | ||||
from swh.journal.writer.kafka import KafkaDeliveryError, KafkaJournalWriter | from swh.journal.writer.kafka import KafkaDeliveryError, KafkaJournalWriter | ||||
from swh.model.model import Directory, Release, Revision | from swh.model.model import BaseModel, Directory, Release, Revision | ||||
def test_kafka_writer( | def test_kafka_writer( | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_server: str, | kafka_server: str, | ||||
consumer: Consumer, | consumer: Consumer, | ||||
privileged_object_types: Iterable[str], | privileged_object_types: Iterable[str], | ||||
): | ): | ||||
writer = KafkaJournalWriter( | writer = KafkaJournalWriter[BaseModel]( | ||||
brokers=[kafka_server], | brokers=[kafka_server], | ||||
client_id="kafka_writer", | client_id="kafka_writer", | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
value_sanitizer=model_object_dict_sanitizer, | value_sanitizer=model_object_dict_sanitizer, | ||||
anonymize=False, | anonymize=False, | ||||
) | ) | ||||
expected_messages = 0 | expected_messages = 0 | ||||
Show All 24 Lines | |||||
def test_kafka_writer_anonymized( | def test_kafka_writer_anonymized( | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_server: str, | kafka_server: str, | ||||
consumer: Consumer, | consumer: Consumer, | ||||
privileged_object_types: Iterable[str], | privileged_object_types: Iterable[str], | ||||
): | ): | ||||
writer = KafkaJournalWriter( | writer = KafkaJournalWriter[BaseModel]( | ||||
brokers=[kafka_server], | brokers=[kafka_server], | ||||
client_id="kafka_writer", | client_id="kafka_writer", | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
value_sanitizer=model_object_dict_sanitizer, | value_sanitizer=model_object_dict_sanitizer, | ||||
anonymize=True, | anonymize=True, | ||||
) | ) | ||||
expected_messages = 0 | expected_messages = 0 | ||||
▲ Show 20 Lines • Show All 71 Lines • ▼ Show 20 Lines | ): | ||||
class MockProducer(Producer): | class MockProducer(Producer): | ||||
"""A kafka producer which pretends to produce messages, but never sends any | """A kafka producer which pretends to produce messages, but never sends any | ||||
delivery acknowledgements""" | delivery acknowledgements""" | ||||
def produce(self, **kwargs): | def produce(self, **kwargs): | ||||
produced.append(kwargs) | produced.append(kwargs) | ||||
writer = KafkaJournalWriter( | writer = KafkaJournalWriter[BaseModel]( | ||||
brokers=[kafka_server], | brokers=[kafka_server], | ||||
client_id="kafka_writer", | client_id="kafka_writer", | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
value_sanitizer=model_object_dict_sanitizer, | value_sanitizer=model_object_dict_sanitizer, | ||||
flush_timeout=1, | flush_timeout=1, | ||||
producer_class=MockProducer, | producer_class=MockProducer, | ||||
) | ) | ||||
Show All 11 Lines |