Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_kafka_writer.py
# Copyright (C) 2018-2020 The Software Heritage developers | # Copyright (C) 2018-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from typing import Iterable | |||||
import pytest | import pytest | ||||
from confluent_kafka import Consumer, Producer | from confluent_kafka import Consumer, Producer | ||||
from swh.model.model import Directory | from swh.model.model import Directory, Revision, Release | ||||
from swh.journal.tests.journal_data import TEST_OBJECTS | from swh.journal.tests.journal_data import TEST_OBJECTS | ||||
from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed | from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed | ||||
from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError | from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError | ||||
def test_kafka_writer(kafka_prefix: str, kafka_server: str, consumer: Consumer): | def test_kafka_writer( | ||||
kafka_prefix: str, | |||||
kafka_server: str, | |||||
consumer: Consumer, | |||||
privileged_object_types: Iterable[str], | |||||
): | |||||
writer = KafkaJournalWriter( | writer = KafkaJournalWriter( | ||||
brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, | brokers=[kafka_server], | ||||
client_id="kafka_writer", | |||||
prefix=kafka_prefix, | |||||
anonymize=False, | |||||
) | |||||
expected_messages = 0 | |||||
for object_type, objects in TEST_OBJECTS.items(): | |||||
writer.write_additions(object_type, objects) | |||||
expected_messages += len(objects) | |||||
consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) | |||||
assert_all_objects_consumed(consumed_messages) | |||||
for key, obj_dict in consumed_messages["revision"]: | |||||
obj = Revision.from_dict(obj_dict) | |||||
for person in (obj.author, obj.committer): | |||||
assert not ( | |||||
len(person.fullname) == 32 | |||||
and person.name is None | |||||
ardumont: couldn't our strategy generate those once in a while nonetheless (which would result in a false… | |||||
Done Inline ActionsYes, I had this in mind and wanted to modify the strategy accordingly, but then I forgot :-) Thanks douardda: Yes, I had this in mind and wanted to modify the strategy accordingly, but then I forgot :-)… | |||||
Done Inline ActionsUpdated the hypothesis strategy in D3171. douardda: Updated the hypothesis strategy in D3171. | |||||
and person.email is None | |||||
) | |||||
for key, obj_dict in consumed_messages["release"]: | |||||
obj = Release.from_dict(obj_dict) | |||||
for person in (obj.author,): | |||||
assert not ( | |||||
len(person.fullname) == 32 | |||||
and person.name is None | |||||
and person.email is None | |||||
) | |||||
def test_kafka_writer_anonymized( | |||||
kafka_prefix: str, | |||||
kafka_server: str, | |||||
consumer: Consumer, | |||||
privileged_object_types: Iterable[str], | |||||
): | |||||
writer = KafkaJournalWriter( | |||||
brokers=[kafka_server], | |||||
client_id="kafka_writer", | |||||
prefix=kafka_prefix, | |||||
anonymize=True, | |||||
) | ) | ||||
expected_messages = 0 | expected_messages = 0 | ||||
for object_type, objects in TEST_OBJECTS.items(): | for object_type, objects in TEST_OBJECTS.items(): | ||||
writer.write_additions(object_type, objects) | writer.write_additions(object_type, objects) | ||||
expected_messages += len(objects) | expected_messages += len(objects) | ||||
if object_type in privileged_object_types: | |||||
expected_messages += len(objects) | |||||
consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) | consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) | ||||
assert_all_objects_consumed(consumed_messages) | assert_all_objects_consumed(consumed_messages) | ||||
for key, obj_dict in consumed_messages["revision"]: | |||||
obj = Revision.from_dict(obj_dict) | |||||
for person in (obj.author, obj.committer): | |||||
assert ( | |||||
len(person.fullname) == 32 | |||||
and person.name is None | |||||
and person.email is None | |||||
) | |||||
for key, obj_dict in consumed_messages["release"]: | |||||
obj = Release.from_dict(obj_dict) | |||||
for person in (obj.author,): | |||||
assert ( | |||||
len(person.fullname) == 32 | |||||
and person.name is None | |||||
and person.email is None | |||||
) | |||||
def test_write_delivery_failure( | def test_write_delivery_failure( | ||||
kafka_prefix: str, kafka_server: str, consumer: Consumer | kafka_prefix: str, kafka_server: str, consumer: Consumer | ||||
): | ): | ||||
class MockKafkaError: | class MockKafkaError: | ||||
"""A mocked kafka error""" | """A mocked kafka error""" | ||||
def str(self): | def str(self): | ||||
▲ Show 20 Lines • Show All 60 Lines • Show Last 20 Lines |
couldn't our strategy generate those once in a while nonetheless (which would result in a false negative test then)?