diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -9,14 +9,14 @@ import pytest from subprocess import Popen -from typing import List, Tuple +from typing import Tuple from swh.storage import get_storage from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError -from swh.model.model import Directory, DirectoryEntry, Origin, OriginVisit +from swh.model.model import Directory, Origin, OriginVisit from .conftest import TEST_OBJECTS, TEST_OBJECT_DICTS @@ -151,135 +151,41 @@ assert_all_objects_consumed(consumed_messages) -@pytest.fixture(scope="session") -def large_directories() -> List[Directory]: - dir_sizes = [1 << n for n in range(21)] # 2**0 = 1 to 2**20 = 1024 * 1024 - - dir_entries = [ - DirectoryEntry( - name=("%09d" % i).encode(), - type="file", - perms=0o100644, - target=b"\x00" * 20, - ) - for i in range(max(dir_sizes)) - ] - - return [Directory(entries=dir_entries[:size]) for size in dir_sizes] - - -def test_write_large_objects( - kafka_prefix: str, - kafka_server: Tuple[Popen, int], - consumer: Consumer, - large_directories: List[Directory], +def test_write_delivery_failure( + kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer, ): - kafka_prefix += ".swh.journal.objects" + class MockKafkaError: + """A mocked kafka error""" - # Needed as there is no directories in TEST_OBJECT_DICTS, the consumer - # isn't autosubscribed to directories. - consumer.subscribe([kafka_prefix + ".directory"]) - - writer = KafkaJournalWriter( - brokers=["localhost:%d" % kafka_server[1]], - client_id="kafka_writer", - prefix=kafka_prefix, - ) - - writer.write_additions("directory", large_directories) - - consumed_messages = consume_messages(consumer, kafka_prefix, len(large_directories)) - - for dir, message in zip(large_directories, consumed_messages["directory"]): - (dir_id, consumed_dir) = message - assert dir_id == dir.id - assert consumed_dir == dir.to_dict() - - -def dir_message_size(directory: Directory) -> int: - """Estimate the size of a directory kafka message. - - We could just do it with `len(value_to_kafka(directory.to_dict()))`, - but the serialization is a substantial chunk of the test time here. - - """ - n_entries = len(directory.entries) - return ( - # fmt: off - 0 - + 1 # header of a 2-element fixmap - + 1 + 2 # fixstr("id") - + 2 + 20 # bin8(directory.id of length 20) - + 1 + 7 # fixstr("entries") - + 4 # array header - + n_entries - * ( - 0 - + 1 # header of a 4-element fixmap - + 1 + 6 # fixstr("target") - + 2 + 20 # bin8(target of length 20) - + 1 + 4 # fixstr("name") - + 2 + 9 # bin8(name of length 9) - + 1 + 5 # fixstr("perms") - + 5 # uint32(perms) - + 1 + 4 # fixstr("type") - + 1 + 3 # fixstr(type) - ) - # fmt: on - ) + def str(self): + return "Mocked Kafka Error" + def name(self): + return "SWH_MOCK_ERROR" -SMALL_MESSAGE_SIZE = 1024 * 1024 + class KafkaJournalWriterFailDelivery(KafkaJournalWriter): + """A journal writer which always fails delivering messages""" + def _on_delivery(self, error, message): + """Replace the inbound error with a fake delivery error""" + super()._on_delivery(MockKafkaError(), message) -@pytest.mark.parametrize( - "kafka_server_config_overrides", [{"message.max.bytes": str(SMALL_MESSAGE_SIZE)}] -) -def test_fail_write_large_objects( - kafka_prefix: str, - kafka_server: Tuple[Popen, int], - consumer: Consumer, - large_directories: List[Directory], -): kafka_prefix += ".swh.journal.objects" - - # Needed as there is no directories in TEST_OBJECT_DICTS, the consumer - # isn't autosubscribed to directories. - consumer.subscribe([kafka_prefix + ".directory"]) - - writer = KafkaJournalWriter( + writer = KafkaJournalWriterFailDelivery( brokers=["localhost:%d" % kafka_server[1]], client_id="kafka_writer", prefix=kafka_prefix, ) - expected_dirs = [] - - for directory in large_directories: - if dir_message_size(directory) < SMALL_MESSAGE_SIZE: - # No error; write anyway, but continue - writer.write_addition("directory", directory) - expected_dirs.append(directory) - continue - - with pytest.raises(KafkaDeliveryError) as exc: - writer.write_addition("directory", directory) - - assert "Failed deliveries" in exc.value.message - assert len(exc.value.delivery_failures) == 1 - - object_type, key, msg, code = exc.value.delivery_failures[0] - - assert object_type == "directory" - assert key == directory.id - assert code == "MSG_SIZE_TOO_LARGE" - - consumed_messages = consume_messages(consumer, kafka_prefix, len(expected_dirs)) + empty_dir = Directory(entries=[]) + with pytest.raises(KafkaDeliveryError) as exc: + writer.write_addition("directory", empty_dir) - for dir, message in zip(expected_dirs, consumed_messages["directory"]): - (dir_id, consumed_dir) = message - assert dir_id == dir.id - assert consumed_dir == dir.to_dict() + assert "Failed deliveries" in exc.value.message + assert len(exc.value.delivery_failures) == 1 + delivery_failure = exc.value.delivery_failures[0] + assert delivery_failure.key == empty_dir.id + assert delivery_failure.code == "SWH_MOCK_ERROR" def test_write_delivery_timeout( @@ -289,6 +195,9 @@ produced = [] class MockProducer(Producer): + """A kafka producer which pretends to produce messages, but never sends any + delivery acknowledgements""" + def produce(self, **kwargs): produced.append(kwargs)