diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -7,6 +7,8 @@ import datetime from confluent_kafka import Consumer, KafkaException + +import pytest from subprocess import Popen from typing import List, Tuple @@ -14,9 +16,13 @@ from swh.journal.replay import object_converter_fn from swh.journal.serializers import kafka_to_key, kafka_to_value -from swh.journal.writer.kafka import KafkaJournalWriter, OBJECT_TYPES +from swh.journal.writer.kafka import ( + KafkaJournalWriter, + KafkaDeliveryError, + OBJECT_TYPES, +) -from swh.model.model import Content, Origin, BaseModel +from swh.model.model import Content, Directory, DirectoryEntry, Origin, BaseModel from .conftest import OBJECT_TYPE_KEYS @@ -161,3 +167,132 @@ consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages) + + +@pytest.fixture(scope="session") +def large_directories() -> List[Directory]: + dir_sizes = [1 << n for n in range(21)] # 2**0 = 1 to 2**20 = 1024 * 1024 + + dir_entries = [ + DirectoryEntry( + name=("%09d" % i).encode(), + type="file", + perms=0o100644, + target=b"\x00" * 20, + ) + for i in range(max(dir_sizes)) + ] + + return [Directory(entries=dir_entries[:size]) for size in dir_sizes] + + +def test_write_large_objects( + kafka_prefix: str, + kafka_server: Tuple[Popen, int], + consumer: Consumer, + large_directories: List[Directory], +): + kafka_prefix += ".swh.journal.objects" + + # Needed as there is no directories in OBJECT_TYPES + consumer.subscribe([kafka_prefix + ".directory"]) + + writer = KafkaJournalWriter( + brokers=["localhost:%d" % kafka_server[1]], + client_id="kafka_writer", + prefix=kafka_prefix, + ) + + writer.write_additions("directory", large_directories) + + consumed_messages = consume_messages(consumer, kafka_prefix, len(large_directories)) + + for dir, message in zip(large_directories, consumed_messages["directory"]): + (dir_id, consumed_dir) = message + assert dir_id == dir.id + assert consumed_dir == dir.to_dict() + + +def dir_message_size(directory: Directory) -> int: + """Estimate the size of a directory kafka message. + + We could just do it with `len(value_to_kafka(directory.to_dict()))`, + but the serialization is a substantial chunk of the test time here. + + """ + n_entries = len(directory.entries) + return ( + # fmt: off + 0 + + 1 # header of a 2-element fixmap + + 1 + 2 # fixstr("id") + + 2 + 20 # bin8(directory.id of length 20) + + 1 + 7 # fixstr("entries") + + 4 # array header + + n_entries + * ( + 0 + + 1 # header of a 4-element fixmap + + 1 + 6 # fixstr("target") + + 2 + 20 # bin8(target of length 20) + + 1 + 4 # fixstr("name") + + 2 + 9 # bin8(name of length 9) + + 1 + 5 # fixstr("perms") + + 5 # uint32(perms) + + 1 + 4 # fixstr("type") + + 1 + 3 # fixstr(type) + ) + # fmt: on + ) + + +SMALL_MESSAGE_SIZE = 1024 * 1024 + + +@pytest.mark.parametrize( + "kafka_server_config_overrides", [{"message.max.bytes": str(SMALL_MESSAGE_SIZE)}] +) +def test_fail_write_large_objects( + kafka_prefix: str, + kafka_server: Tuple[Popen, int], + consumer: Consumer, + large_directories: List[Directory], +): + kafka_prefix += ".swh.journal.objects" + + # Needed as there is no directories in OBJECT_TYPES + consumer.subscribe([kafka_prefix + ".directory"]) + + writer = KafkaJournalWriter( + brokers=["localhost:%d" % kafka_server[1]], + client_id="kafka_writer", + prefix=kafka_prefix, + ) + + expected_dirs = [] + + for directory in large_directories: + if dir_message_size(directory) < SMALL_MESSAGE_SIZE: + # No error; write anyway, but continue + writer.write_addition("directory", directory) + expected_dirs.append(directory) + continue + + with pytest.raises(KafkaDeliveryError) as exc: + writer.write_addition("directory", directory) + + assert "Failed deliveries" in exc.value.message + assert len(exc.value.delivery_failures) == 1 + + object_type, key, msg, error_name = exc.value.delivery_failures[0] + + assert object_type == "directory" + assert key == directory.id + assert error_name == "MSG_SIZE_TOO_LARGE" + + consumed_messages = consume_messages(consumer, kafka_prefix, len(expected_dirs)) + + for dir, message in zip(expected_dirs, consumed_messages["directory"]): + (dir_id, consumed_dir) = message + assert dir_id == dir.id + assert consumed_dir == dir.to_dict() diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py --- a/swh/journal/writer/kafka.py +++ b/swh/journal/writer/kafka.py @@ -4,7 +4,17 @@ # See top-level LICENSE file for more information import logging -from typing import Dict, Iterable, Optional, Type, Union, overload +import time +from typing import ( + Dict, + Iterable, + List, + NamedTuple, + Optional, + Type, + Union, + overload, +) from confluent_kafka import Producer, KafkaException @@ -41,6 +51,65 @@ ] +class DeliveryTag(NamedTuple): + """Unique tag allowing us to check for a message delivery""" + + topic: str + kafka_key: bytes + + +class DeliveryFailureInfo(NamedTuple): + """Verbose information for failed deliveries""" + + object_type: str + key: KeyType + message: str + code: str + + +def get_object_type(topic: str) -> str: + """Get the object type from a topic string""" + return topic.rsplit(".", 1)[-1] + + +def stringify_item(k: str, v: Union[str, bytes]): + if isinstance(v, str): + return v + if k == "url": + return v.decode("utf-8") + return v.hex() + + +def pprint_key(key: KeyType) -> str: + """Pretty-print a kafka key""" + + if isinstance(key, dict): + return "{%s}" % ", ".join( + f"{k}: {stringify_item(k, v)}" for k, v in key.items() + ) + elif isinstance(key, bytes): + return key.hex() + else: + return key + + +class KafkaDeliveryError(Exception): + """Delivery failed on some kafka messages.""" + + def __init__(self, message: str, delivery_failures: Iterable[DeliveryFailureInfo]): + self.message = message + self.delivery_failures = list(delivery_failures) + + def pretty_failures(self) -> str: + return ", ".join( + f"{f.object_type} {pprint_key(f.key)} ({f.message})" + for f in self.delivery_failures + ) + + def __str__(self): + return f"KafkaDeliveryError({self.message}, [{self.pretty_failures()}])" + + class KafkaJournalWriter: """This class is instantiated and used by swh-storage to write incoming new objects to Kafka before adding them to the storage backend @@ -51,6 +120,8 @@ prefix: the prefix used to build the topic names for objects client_id: the id of the writer sent to kafka producer_config: extra configuration keys passed to the `Producer` + flush_timeout: timeout, in seconds, after which the `flush` operation + will fail if some message deliveries are still pending. """ @@ -60,6 +131,7 @@ prefix: str, client_id: str, producer_config: Optional[Dict] = None, + flush_timeout: float = 120, ): self._prefix = prefix @@ -84,14 +156,30 @@ } ) + # Delivery management + self.flush_timeout = flush_timeout + + # delivery tag -> original object "key" mapping + self.deliveries_pending: Dict[DeliveryTag, KeyType] = {} + + # List of (object_type, key, error_msg, error_name) for failed deliveries + self.delivery_failures: List[DeliveryFailureInfo] = [] + def _error_cb(self, error): if error.fatal(): raise KafkaException(error) logger.info("Received non-fatal kafka error: %s", error) def _on_delivery(self, error, message): + (topic, key) = delivery_tag = DeliveryTag(message.topic(), message.key()) + sent_key = self.deliveries_pending.pop(delivery_tag, None) + if error is not None: - self._error_cb(error) + self.delivery_failures.append( + DeliveryFailureInfo( + get_object_type(topic), sent_key, error.str(), error.name() + ) + ) def send(self, topic: str, key: KeyType, value): kafka_key = key_to_kafka(key) @@ -99,11 +187,44 @@ topic=topic, key=kafka_key, value=value_to_kafka(value), ) - # Need to service the callbacks regularly by calling poll - self.producer.poll(0) + self.deliveries_pending[DeliveryTag(topic, kafka_key)] = key + + def delivery_error(self, message) -> KafkaDeliveryError: + """Get all failed deliveries, and clear them""" + ret = self.delivery_failures + self.delivery_failures = [] + + while self.deliveries_pending: + delivery_tag, orig_key = self.deliveries_pending.popitem() + (topic, kafka_key) = delivery_tag + ret.append( + DeliveryFailureInfo( + get_object_type(topic), + orig_key, + "No delivery before flush() timeout", + "SWH_FLUSH_TIMEOUT", + ) + ) + + return KafkaDeliveryError(message, ret) def flush(self): - self.producer.flush() + start = time.monotonic() + + self.producer.flush(self.flush_timeout) + + while self.deliveries_pending: + if time.monotonic() - start > self.flush_timeout: + break + self.producer.poll(0.1) + + if self.deliveries_pending: + # Delivery timeout + raise self.delivery_error( + "flush() exceeded timeout (%ss)" % self.flush_timeout, + ) + elif self.delivery_failures: + raise self.delivery_error("Failed deliveries after flush()") # these @overload'ed versions of the _get_key method aim at helping mypy figuring # the correct type-ing.