Page MenuHomeSoftware Heritage

D2994.id10670.diff
No OneTemporary

D2994.id10670.diff

diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py
--- a/swh/journal/tests/test_kafka_writer.py
+++ b/swh/journal/tests/test_kafka_writer.py
@@ -6,15 +6,17 @@
from collections import defaultdict
from confluent_kafka import Consumer, KafkaException
+
+import pytest
from subprocess import Popen
-from typing import Tuple
+from typing import List, Tuple
from swh.storage import get_storage
from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value
-from swh.journal.writer.kafka import KafkaJournalWriter
+from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError
-from swh.model.model import Origin, OriginVisit
+from swh.model.model import Directory, DirectoryEntry, Origin, OriginVisit
from .conftest import TEST_OBJECTS, TEST_OBJECT_DICTS
@@ -147,3 +149,134 @@
consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages)
assert_all_objects_consumed(consumed_messages)
+
+
+@pytest.fixture(scope="session")
+def large_directories() -> List[Directory]:
+ dir_sizes = [1 << n for n in range(21)] # 2**0 = 1 to 2**20 = 1024 * 1024
+
+ dir_entries = [
+ DirectoryEntry(
+ name=("%09d" % i).encode(),
+ type="file",
+ perms=0o100644,
+ target=b"\x00" * 20,
+ )
+ for i in range(max(dir_sizes))
+ ]
+
+ return [Directory(entries=dir_entries[:size]) for size in dir_sizes]
+
+
+def test_write_large_objects(
+ kafka_prefix: str,
+ kafka_server: Tuple[Popen, int],
+ consumer: Consumer,
+ large_directories: List[Directory],
+):
+ kafka_prefix += ".swh.journal.objects"
+
+ # Needed as there is no directories in TEST_OBJECT_DICTS, the consumer
+ # isn't autosubscribed to directories.
+ consumer.subscribe([kafka_prefix + ".directory"])
+
+ writer = KafkaJournalWriter(
+ brokers=["localhost:%d" % kafka_server[1]],
+ client_id="kafka_writer",
+ prefix=kafka_prefix,
+ )
+
+ writer.write_additions("directory", large_directories)
+
+ consumed_messages = consume_messages(consumer, kafka_prefix, len(large_directories))
+
+ for dir, message in zip(large_directories, consumed_messages["directory"]):
+ (dir_id, consumed_dir) = message
+ assert dir_id == dir.id
+ assert consumed_dir == dir.to_dict()
+
+
+def dir_message_size(directory: Directory) -> int:
+ """Estimate the size of a directory kafka message.
+
+ We could just do it with `len(value_to_kafka(directory.to_dict()))`,
+ but the serialization is a substantial chunk of the test time here.
+
+ """
+ n_entries = len(directory.entries)
+ return (
+ # fmt: off
+ 0
+ + 1 # header of a 2-element fixmap
+ + 1 + 2 # fixstr("id")
+ + 2 + 20 # bin8(directory.id of length 20)
+ + 1 + 7 # fixstr("entries")
+ + 4 # array header
+ + n_entries
+ * (
+ 0
+ + 1 # header of a 4-element fixmap
+ + 1 + 6 # fixstr("target")
+ + 2 + 20 # bin8(target of length 20)
+ + 1 + 4 # fixstr("name")
+ + 2 + 9 # bin8(name of length 9)
+ + 1 + 5 # fixstr("perms")
+ + 5 # uint32(perms)
+ + 1 + 4 # fixstr("type")
+ + 1 + 3 # fixstr(type)
+ )
+ # fmt: on
+ )
+
+
+SMALL_MESSAGE_SIZE = 1024 * 1024
+
+
+@pytest.mark.parametrize(
+ "kafka_server_config_overrides", [{"message.max.bytes": str(SMALL_MESSAGE_SIZE)}]
+)
+def test_fail_write_large_objects(
+ kafka_prefix: str,
+ kafka_server: Tuple[Popen, int],
+ consumer: Consumer,
+ large_directories: List[Directory],
+):
+ kafka_prefix += ".swh.journal.objects"
+
+ # Needed as there is no directories in TEST_OBJECT_DICTS, the consumer
+ # isn't autosubscribed to directories.
+ consumer.subscribe([kafka_prefix + ".directory"])
+
+ writer = KafkaJournalWriter(
+ brokers=["localhost:%d" % kafka_server[1]],
+ client_id="kafka_writer",
+ prefix=kafka_prefix,
+ )
+
+ expected_dirs = []
+
+ for directory in large_directories:
+ if dir_message_size(directory) < SMALL_MESSAGE_SIZE:
+ # No error; write anyway, but continue
+ writer.write_addition("directory", directory)
+ expected_dirs.append(directory)
+ continue
+
+ with pytest.raises(KafkaDeliveryError) as exc:
+ writer.write_addition("directory", directory)
+
+ assert "Failed deliveries" in exc.value.message
+ assert len(exc.value.delivery_failures) == 1
+
+ object_type, key, msg, error_name = exc.value.delivery_failures[0]
+
+ assert object_type == "directory"
+ assert key == directory.id
+ assert error_name == "MSG_SIZE_TOO_LARGE"
+
+ consumed_messages = consume_messages(consumer, kafka_prefix, len(expected_dirs))
+
+ for dir, message in zip(expected_dirs, consumed_messages["directory"]):
+ (dir_id, consumed_dir) = message
+ assert dir_id == dir.id
+ assert consumed_dir == dir.to_dict()
diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py
--- a/swh/journal/writer/kafka.py
+++ b/swh/journal/writer/kafka.py
@@ -4,7 +4,8 @@
# See top-level LICENSE file for more information
import logging
-from typing import Dict, Iterable, Optional, Type
+import time
+from typing import Dict, Iterable, List, NamedTuple, Optional, Type
from confluent_kafka import Producer, KafkaException
@@ -24,6 +25,7 @@
KeyType,
ModelObject,
object_key,
+ pprint_key,
key_to_kafka,
value_to_kafka,
)
@@ -42,6 +44,44 @@
}
+class DeliveryTag(NamedTuple):
+ """Unique tag allowing us to check for a message delivery"""
+
+ topic: str
+ kafka_key: bytes
+
+
+class DeliveryFailureInfo(NamedTuple):
+ """Verbose information for failed deliveries"""
+
+ object_type: str
+ key: KeyType
+ message: str
+ code: str
+
+
+def get_object_type(topic: str) -> str:
+ """Get the object type from a topic string"""
+ return topic.rsplit(".", 1)[-1]
+
+
+class KafkaDeliveryError(Exception):
+ """Delivery failed on some kafka messages."""
+
+ def __init__(self, message: str, delivery_failures: Iterable[DeliveryFailureInfo]):
+ self.message = message
+ self.delivery_failures = list(delivery_failures)
+
+ def pretty_failures(self) -> str:
+ return ", ".join(
+ f"{f.object_type} {pprint_key(f.key)} ({f.message})"
+ for f in self.delivery_failures
+ )
+
+ def __str__(self):
+ return f"KafkaDeliveryError({self.message}, [{self.pretty_failures()}])"
+
+
class KafkaJournalWriter:
"""This class is instantiated and used by swh-storage to write incoming
new objects to Kafka before adding them to the storage backend
@@ -52,6 +92,8 @@
prefix: the prefix used to build the topic names for objects
client_id: the id of the writer sent to kafka
producer_config: extra configuration keys passed to the `Producer`
+ flush_timeout: timeout, in seconds, after which the `flush` operation
+ will fail if some message deliveries are still pending.
"""
@@ -61,6 +103,7 @@
prefix: str,
client_id: str,
producer_config: Optional[Dict] = None,
+ flush_timeout: float = 120,
):
self._prefix = prefix
@@ -85,14 +128,30 @@
}
)
+ # Delivery management
+ self.flush_timeout = flush_timeout
+
+ # delivery tag -> original object "key" mapping
+ self.deliveries_pending: Dict[DeliveryTag, KeyType] = {}
+
+ # List of (object_type, key, error_msg, error_name) for failed deliveries
+ self.delivery_failures: List[DeliveryFailureInfo] = []
+
def _error_cb(self, error):
if error.fatal():
raise KafkaException(error)
logger.info("Received non-fatal kafka error: %s", error)
def _on_delivery(self, error, message):
+ (topic, key) = delivery_tag = DeliveryTag(message.topic(), message.key())
+ sent_key = self.deliveries_pending.pop(delivery_tag, None)
+
if error is not None:
- self._error_cb(error)
+ self.delivery_failures.append(
+ DeliveryFailureInfo(
+ get_object_type(topic), sent_key, error.str(), error.name()
+ )
+ )
def send(self, topic: str, key: KeyType, value):
kafka_key = key_to_kafka(key)
@@ -100,11 +159,44 @@
topic=topic, key=kafka_key, value=value_to_kafka(value),
)
- # Need to service the callbacks regularly by calling poll
- self.producer.poll(0)
+ self.deliveries_pending[DeliveryTag(topic, kafka_key)] = key
+
+ def delivery_error(self, message) -> KafkaDeliveryError:
+ """Get all failed deliveries, and clear them"""
+ ret = self.delivery_failures
+ self.delivery_failures = []
+
+ while self.deliveries_pending:
+ delivery_tag, orig_key = self.deliveries_pending.popitem()
+ (topic, kafka_key) = delivery_tag
+ ret.append(
+ DeliveryFailureInfo(
+ get_object_type(topic),
+ orig_key,
+ "No delivery before flush() timeout",
+ "SWH_FLUSH_TIMEOUT",
+ )
+ )
+
+ return KafkaDeliveryError(message, ret)
def flush(self):
- self.producer.flush()
+ start = time.monotonic()
+
+ self.producer.flush(self.flush_timeout)
+
+ while self.deliveries_pending:
+ if time.monotonic() - start > self.flush_timeout:
+ break
+ self.producer.poll(0.1)
+
+ if self.deliveries_pending:
+ # Delivery timeout
+ raise self.delivery_error(
+ "flush() exceeded timeout (%ss)" % self.flush_timeout,
+ )
+ elif self.delivery_failures:
+ raise self.delivery_error("Failed deliveries after flush()")
def _sanitize_object(
self, object_type: str, object_: ModelObject

File Metadata

Mime Type
text/plain
Expires
Thu, Jan 30, 2:00 PM (1 w, 11 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3228029

Event Timeline