diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -6,15 +6,17 @@ from collections import defaultdict from confluent_kafka import Consumer, KafkaException + +import pytest from subprocess import Popen -from typing import Tuple +from typing import List, Tuple from swh.storage import get_storage from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value -from swh.journal.writer.kafka import KafkaJournalWriter +from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError -from swh.model.model import Origin, OriginVisit +from swh.model.model import Directory, DirectoryEntry, Origin, OriginVisit from .conftest import TEST_OBJECTS, TEST_OBJECT_DICTS @@ -147,3 +149,134 @@ consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages) + + +@pytest.fixture(scope="session") +def large_directories() -> List[Directory]: + dir_sizes = [1 << n for n in range(21)] # 2**0 = 1 to 2**20 = 1024 * 1024 + + dir_entries = [ + DirectoryEntry( + name=("%09d" % i).encode(), + type="file", + perms=0o100644, + target=b"\x00" * 20, + ) + for i in range(max(dir_sizes)) + ] + + return [Directory(entries=dir_entries[:size]) for size in dir_sizes] + + +def test_write_large_objects( + kafka_prefix: str, + kafka_server: Tuple[Popen, int], + consumer: Consumer, + large_directories: List[Directory], +): + kafka_prefix += ".swh.journal.objects" + + # Needed as there is no directories in TEST_OBJECT_DICTS, the consumer + # isn't autosubscribed to directories. + consumer.subscribe([kafka_prefix + ".directory"]) + + writer = KafkaJournalWriter( + brokers=["localhost:%d" % kafka_server[1]], + client_id="kafka_writer", + prefix=kafka_prefix, + ) + + writer.write_additions("directory", large_directories) + + consumed_messages = consume_messages(consumer, kafka_prefix, len(large_directories)) + + for dir, message in zip(large_directories, consumed_messages["directory"]): + (dir_id, consumed_dir) = message + assert dir_id == dir.id + assert consumed_dir == dir.to_dict() + + +def dir_message_size(directory: Directory) -> int: + """Estimate the size of a directory kafka message. + + We could just do it with `len(value_to_kafka(directory.to_dict()))`, + but the serialization is a substantial chunk of the test time here. + + """ + n_entries = len(directory.entries) + return ( + # fmt: off + 0 + + 1 # header of a 2-element fixmap + + 1 + 2 # fixstr("id") + + 2 + 20 # bin8(directory.id of length 20) + + 1 + 7 # fixstr("entries") + + 4 # array header + + n_entries + * ( + 0 + + 1 # header of a 4-element fixmap + + 1 + 6 # fixstr("target") + + 2 + 20 # bin8(target of length 20) + + 1 + 4 # fixstr("name") + + 2 + 9 # bin8(name of length 9) + + 1 + 5 # fixstr("perms") + + 5 # uint32(perms) + + 1 + 4 # fixstr("type") + + 1 + 3 # fixstr(type) + ) + # fmt: on + ) + + +SMALL_MESSAGE_SIZE = 1024 * 1024 + + +@pytest.mark.parametrize( + "kafka_server_config_overrides", [{"message.max.bytes": str(SMALL_MESSAGE_SIZE)}] +) +def test_fail_write_large_objects( + kafka_prefix: str, + kafka_server: Tuple[Popen, int], + consumer: Consumer, + large_directories: List[Directory], +): + kafka_prefix += ".swh.journal.objects" + + # Needed as there is no directories in TEST_OBJECT_DICTS, the consumer + # isn't autosubscribed to directories. + consumer.subscribe([kafka_prefix + ".directory"]) + + writer = KafkaJournalWriter( + brokers=["localhost:%d" % kafka_server[1]], + client_id="kafka_writer", + prefix=kafka_prefix, + ) + + expected_dirs = [] + + for directory in large_directories: + if dir_message_size(directory) < SMALL_MESSAGE_SIZE: + # No error; write anyway, but continue + writer.write_addition("directory", directory) + expected_dirs.append(directory) + continue + + with pytest.raises(KafkaDeliveryError) as exc: + writer.write_addition("directory", directory) + + assert "Failed deliveries" in exc.value.message + assert len(exc.value.delivery_failures) == 1 + + object_type, key, msg, error_name = exc.value.delivery_failures[0] + + assert object_type == "directory" + assert key == directory.id + assert error_name == "MSG_SIZE_TOO_LARGE" + + consumed_messages = consume_messages(consumer, kafka_prefix, len(expected_dirs)) + + for dir, message in zip(expected_dirs, consumed_messages["directory"]): + (dir_id, consumed_dir) = message + assert dir_id == dir.id + assert consumed_dir == dir.to_dict() diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py --- a/swh/journal/writer/kafka.py +++ b/swh/journal/writer/kafka.py @@ -4,7 +4,8 @@ # See top-level LICENSE file for more information import logging -from typing import Dict, Iterable, Optional, Type +import time +from typing import Dict, Iterable, List, NamedTuple, Optional, Type from confluent_kafka import Producer, KafkaException @@ -24,6 +25,7 @@ KeyType, ModelObject, object_key, + pprint_key, key_to_kafka, value_to_kafka, ) @@ -42,6 +44,44 @@ } +class DeliveryTag(NamedTuple): + """Unique tag allowing us to check for a message delivery""" + + topic: str + kafka_key: bytes + + +class DeliveryFailureInfo(NamedTuple): + """Verbose information for failed deliveries""" + + object_type: str + key: KeyType + message: str + code: str + + +def get_object_type(topic: str) -> str: + """Get the object type from a topic string""" + return topic.rsplit(".", 1)[-1] + + +class KafkaDeliveryError(Exception): + """Delivery failed on some kafka messages.""" + + def __init__(self, message: str, delivery_failures: Iterable[DeliveryFailureInfo]): + self.message = message + self.delivery_failures = list(delivery_failures) + + def pretty_failures(self) -> str: + return ", ".join( + f"{f.object_type} {pprint_key(f.key)} ({f.message})" + for f in self.delivery_failures + ) + + def __str__(self): + return f"KafkaDeliveryError({self.message}, [{self.pretty_failures()}])" + + class KafkaJournalWriter: """This class is instantiated and used by swh-storage to write incoming new objects to Kafka before adding them to the storage backend @@ -52,6 +92,8 @@ prefix: the prefix used to build the topic names for objects client_id: the id of the writer sent to kafka producer_config: extra configuration keys passed to the `Producer` + flush_timeout: timeout, in seconds, after which the `flush` operation + will fail if some message deliveries are still pending. """ @@ -61,6 +103,7 @@ prefix: str, client_id: str, producer_config: Optional[Dict] = None, + flush_timeout: float = 120, ): self._prefix = prefix @@ -85,14 +128,30 @@ } ) + # Delivery management + self.flush_timeout = flush_timeout + + # delivery tag -> original object "key" mapping + self.deliveries_pending: Dict[DeliveryTag, KeyType] = {} + + # List of (object_type, key, error_msg, error_name) for failed deliveries + self.delivery_failures: List[DeliveryFailureInfo] = [] + def _error_cb(self, error): if error.fatal(): raise KafkaException(error) logger.info("Received non-fatal kafka error: %s", error) def _on_delivery(self, error, message): + (topic, key) = delivery_tag = DeliveryTag(message.topic(), message.key()) + sent_key = self.deliveries_pending.pop(delivery_tag, None) + if error is not None: - self._error_cb(error) + self.delivery_failures.append( + DeliveryFailureInfo( + get_object_type(topic), sent_key, error.str(), error.name() + ) + ) def send(self, topic: str, key: KeyType, value): kafka_key = key_to_kafka(key) @@ -100,11 +159,44 @@ topic=topic, key=kafka_key, value=value_to_kafka(value), ) - # Need to service the callbacks regularly by calling poll - self.producer.poll(0) + self.deliveries_pending[DeliveryTag(topic, kafka_key)] = key + + def delivery_error(self, message) -> KafkaDeliveryError: + """Get all failed deliveries, and clear them""" + ret = self.delivery_failures + self.delivery_failures = [] + + while self.deliveries_pending: + delivery_tag, orig_key = self.deliveries_pending.popitem() + (topic, kafka_key) = delivery_tag + ret.append( + DeliveryFailureInfo( + get_object_type(topic), + orig_key, + "No delivery before flush() timeout", + "SWH_FLUSH_TIMEOUT", + ) + ) + + return KafkaDeliveryError(message, ret) def flush(self): - self.producer.flush() + start = time.monotonic() + + self.producer.flush(self.flush_timeout) + + while self.deliveries_pending: + if time.monotonic() - start > self.flush_timeout: + break + self.producer.poll(0.1) + + if self.deliveries_pending: + # Delivery timeout + raise self.delivery_error( + "flush() exceeded timeout (%ss)" % self.flush_timeout, + ) + elif self.delivery_failures: + raise self.delivery_error("Failed deliveries after flush()") def _sanitize_object( self, object_type: str, object_: ModelObject