Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_kafka_writer.py
# Copyright (C) 2018-2020 The Software Heritage developers | # Copyright (C) 2018-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import defaultdict | from collections import defaultdict | ||||
from confluent_kafka import Consumer, Producer, KafkaException | from confluent_kafka import Consumer, Producer, KafkaException | ||||
import pytest | import pytest | ||||
from subprocess import Popen | from subprocess import Popen | ||||
from typing import List, Tuple | from typing import Tuple | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value | from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value | ||||
from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError | from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError | ||||
from swh.model.model import Directory, DirectoryEntry, Origin, OriginVisit | from swh.model.model import Directory, Origin, OriginVisit | ||||
from .conftest import TEST_OBJECTS, TEST_OBJECT_DICTS | from .conftest import TEST_OBJECTS, TEST_OBJECT_DICTS | ||||
def consume_messages(consumer, kafka_prefix, expected_messages): | def consume_messages(consumer, kafka_prefix, expected_messages): | ||||
"""Consume expected_messages from the consumer; | """Consume expected_messages from the consumer; | ||||
Sort them all into a consumed_objects dict""" | Sort them all into a consumed_objects dict""" | ||||
consumed_messages = defaultdict(list) | consumed_messages = defaultdict(list) | ||||
▲ Show 20 Lines • Show All 118 Lines • ▼ Show 20 Lines | for object_type, objects in TEST_OBJECTS.items(): | ||||
expected_messages += 1 | expected_messages += 1 | ||||
else: | else: | ||||
assert False, object_type | assert False, object_type | ||||
consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) | consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) | ||||
assert_all_objects_consumed(consumed_messages) | assert_all_objects_consumed(consumed_messages) | ||||
@pytest.fixture(scope="session") | def test_write_delivery_failure( | ||||
def large_directories() -> List[Directory]: | kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer, | ||||
dir_sizes = [1 << n for n in range(21)] # 2**0 = 1 to 2**20 = 1024 * 1024 | |||||
dir_entries = [ | |||||
DirectoryEntry( | |||||
name=("%09d" % i).encode(), | |||||
type="file", | |||||
perms=0o100644, | |||||
target=b"\x00" * 20, | |||||
) | |||||
for i in range(max(dir_sizes)) | |||||
] | |||||
return [Directory(entries=dir_entries[:size]) for size in dir_sizes] | |||||
def test_write_large_objects( | |||||
kafka_prefix: str, | |||||
kafka_server: Tuple[Popen, int], | |||||
consumer: Consumer, | |||||
large_directories: List[Directory], | |||||
): | ): | ||||
kafka_prefix += ".swh.journal.objects" | class MockKafkaError: | ||||
"""A mocked kafka error""" | |||||
# Needed as there is no directories in TEST_OBJECT_DICTS, the consumer | |||||
# isn't autosubscribed to directories. | |||||
consumer.subscribe([kafka_prefix + ".directory"]) | |||||
writer = KafkaJournalWriter( | |||||
brokers=["localhost:%d" % kafka_server[1]], | |||||
client_id="kafka_writer", | |||||
prefix=kafka_prefix, | |||||
) | |||||
writer.write_additions("directory", large_directories) | def str(self): | ||||
return "Mocked Kafka Error" | |||||
consumed_messages = consume_messages(consumer, kafka_prefix, len(large_directories)) | |||||
for dir, message in zip(large_directories, consumed_messages["directory"]): | |||||
(dir_id, consumed_dir) = message | |||||
assert dir_id == dir.id | |||||
assert consumed_dir == dir.to_dict() | |||||
def dir_message_size(directory: Directory) -> int: | |||||
"""Estimate the size of a directory kafka message. | |||||
We could just do it with `len(value_to_kafka(directory.to_dict()))`, | |||||
but the serialization is a substantial chunk of the test time here. | |||||
""" | |||||
n_entries = len(directory.entries) | |||||
return ( | |||||
# fmt: off | |||||
0 | |||||
+ 1 # header of a 2-element fixmap | |||||
+ 1 + 2 # fixstr("id") | |||||
+ 2 + 20 # bin8(directory.id of length 20) | |||||
+ 1 + 7 # fixstr("entries") | |||||
+ 4 # array header | |||||
+ n_entries | |||||
* ( | |||||
0 | |||||
+ 1 # header of a 4-element fixmap | |||||
+ 1 + 6 # fixstr("target") | |||||
+ 2 + 20 # bin8(target of length 20) | |||||
+ 1 + 4 # fixstr("name") | |||||
+ 2 + 9 # bin8(name of length 9) | |||||
+ 1 + 5 # fixstr("perms") | |||||
+ 5 # uint32(perms) | |||||
+ 1 + 4 # fixstr("type") | |||||
+ 1 + 3 # fixstr(type) | |||||
) | |||||
# fmt: on | |||||
) | |||||
def name(self): | |||||
return "SWH_MOCK_ERROR" | |||||
SMALL_MESSAGE_SIZE = 1024 * 1024 | class KafkaJournalWriterFailDelivery(KafkaJournalWriter): | ||||
"""A journal writer which always fails delivering messages""" | |||||
def _on_delivery(self, error, message): | |||||
"""Replace the inbound error with a fake delivery error""" | |||||
super()._on_delivery(MockKafkaError(), message) | |||||
@pytest.mark.parametrize( | |||||
"kafka_server_config_overrides", [{"message.max.bytes": str(SMALL_MESSAGE_SIZE)}] | |||||
) | |||||
def test_fail_write_large_objects( | |||||
kafka_prefix: str, | |||||
kafka_server: Tuple[Popen, int], | |||||
consumer: Consumer, | |||||
large_directories: List[Directory], | |||||
): | |||||
kafka_prefix += ".swh.journal.objects" | kafka_prefix += ".swh.journal.objects" | ||||
writer = KafkaJournalWriterFailDelivery( | |||||
# Needed as there is no directories in TEST_OBJECT_DICTS, the consumer | |||||
# isn't autosubscribed to directories. | |||||
consumer.subscribe([kafka_prefix + ".directory"]) | |||||
writer = KafkaJournalWriter( | |||||
brokers=["localhost:%d" % kafka_server[1]], | brokers=["localhost:%d" % kafka_server[1]], | ||||
client_id="kafka_writer", | client_id="kafka_writer", | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
) | ) | ||||
expected_dirs = [] | empty_dir = Directory(entries=[]) | ||||
for directory in large_directories: | |||||
if dir_message_size(directory) < SMALL_MESSAGE_SIZE: | |||||
# No error; write anyway, but continue | |||||
writer.write_addition("directory", directory) | |||||
expected_dirs.append(directory) | |||||
continue | |||||
with pytest.raises(KafkaDeliveryError) as exc: | with pytest.raises(KafkaDeliveryError) as exc: | ||||
writer.write_addition("directory", directory) | writer.write_addition("directory", empty_dir) | ||||
assert "Failed deliveries" in exc.value.message | assert "Failed deliveries" in exc.value.message | ||||
assert len(exc.value.delivery_failures) == 1 | assert len(exc.value.delivery_failures) == 1 | ||||
delivery_failure = exc.value.delivery_failures[0] | |||||
object_type, key, msg, code = exc.value.delivery_failures[0] | assert delivery_failure.key == empty_dir.id | ||||
assert delivery_failure.code == "SWH_MOCK_ERROR" | |||||
assert object_type == "directory" | |||||
assert key == directory.id | |||||
assert code == "MSG_SIZE_TOO_LARGE" | |||||
consumed_messages = consume_messages(consumer, kafka_prefix, len(expected_dirs)) | |||||
for dir, message in zip(expected_dirs, consumed_messages["directory"]): | |||||
(dir_id, consumed_dir) = message | |||||
assert dir_id == dir.id | |||||
assert consumed_dir == dir.to_dict() | |||||
def test_write_delivery_timeout( | def test_write_delivery_timeout( | ||||
kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer | kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer | ||||
): | ): | ||||
produced = [] | produced = [] | ||||
class MockProducer(Producer): | class MockProducer(Producer): | ||||
"""A kafka producer which pretends to produce messages, but never sends any | |||||
delivery acknowledgements""" | |||||
def produce(self, **kwargs): | def produce(self, **kwargs): | ||||
produced.append(kwargs) | produced.append(kwargs) | ||||
kafka_prefix += ".swh.journal.objects" | kafka_prefix += ".swh.journal.objects" | ||||
writer = KafkaJournalWriter( | writer = KafkaJournalWriter( | ||||
brokers=["localhost:%d" % kafka_server[1]], | brokers=["localhost:%d" % kafka_server[1]], | ||||
client_id="kafka_writer", | client_id="kafka_writer", | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
Show All 15 Lines |