Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7163721
D2994.id10670.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
10 KB
Subscribers
None
D2994.id10670.diff
View Options
diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py
--- a/swh/journal/tests/test_kafka_writer.py
+++ b/swh/journal/tests/test_kafka_writer.py
@@ -6,15 +6,17 @@
from collections import defaultdict
from confluent_kafka import Consumer, KafkaException
+
+import pytest
from subprocess import Popen
-from typing import Tuple
+from typing import List, Tuple
from swh.storage import get_storage
from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value
-from swh.journal.writer.kafka import KafkaJournalWriter
+from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError
-from swh.model.model import Origin, OriginVisit
+from swh.model.model import Directory, DirectoryEntry, Origin, OriginVisit
from .conftest import TEST_OBJECTS, TEST_OBJECT_DICTS
@@ -147,3 +149,134 @@
consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages)
assert_all_objects_consumed(consumed_messages)
+
+
+@pytest.fixture(scope="session")
+def large_directories() -> List[Directory]:
+ dir_sizes = [1 << n for n in range(21)] # 2**0 = 1 to 2**20 = 1024 * 1024
+
+ dir_entries = [
+ DirectoryEntry(
+ name=("%09d" % i).encode(),
+ type="file",
+ perms=0o100644,
+ target=b"\x00" * 20,
+ )
+ for i in range(max(dir_sizes))
+ ]
+
+ return [Directory(entries=dir_entries[:size]) for size in dir_sizes]
+
+
+def test_write_large_objects(
+ kafka_prefix: str,
+ kafka_server: Tuple[Popen, int],
+ consumer: Consumer,
+ large_directories: List[Directory],
+):
+ kafka_prefix += ".swh.journal.objects"
+
+ # Needed as there is no directories in TEST_OBJECT_DICTS, the consumer
+ # isn't autosubscribed to directories.
+ consumer.subscribe([kafka_prefix + ".directory"])
+
+ writer = KafkaJournalWriter(
+ brokers=["localhost:%d" % kafka_server[1]],
+ client_id="kafka_writer",
+ prefix=kafka_prefix,
+ )
+
+ writer.write_additions("directory", large_directories)
+
+ consumed_messages = consume_messages(consumer, kafka_prefix, len(large_directories))
+
+ for dir, message in zip(large_directories, consumed_messages["directory"]):
+ (dir_id, consumed_dir) = message
+ assert dir_id == dir.id
+ assert consumed_dir == dir.to_dict()
+
+
+def dir_message_size(directory: Directory) -> int:
+ """Estimate the size of a directory kafka message.
+
+ We could just do it with `len(value_to_kafka(directory.to_dict()))`,
+ but the serialization is a substantial chunk of the test time here.
+
+ """
+ n_entries = len(directory.entries)
+ return (
+ # fmt: off
+ 0
+ + 1 # header of a 2-element fixmap
+ + 1 + 2 # fixstr("id")
+ + 2 + 20 # bin8(directory.id of length 20)
+ + 1 + 7 # fixstr("entries")
+ + 4 # array header
+ + n_entries
+ * (
+ 0
+ + 1 # header of a 4-element fixmap
+ + 1 + 6 # fixstr("target")
+ + 2 + 20 # bin8(target of length 20)
+ + 1 + 4 # fixstr("name")
+ + 2 + 9 # bin8(name of length 9)
+ + 1 + 5 # fixstr("perms")
+ + 5 # uint32(perms)
+ + 1 + 4 # fixstr("type")
+ + 1 + 3 # fixstr(type)
+ )
+ # fmt: on
+ )
+
+
+SMALL_MESSAGE_SIZE = 1024 * 1024
+
+
+@pytest.mark.parametrize(
+ "kafka_server_config_overrides", [{"message.max.bytes": str(SMALL_MESSAGE_SIZE)}]
+)
+def test_fail_write_large_objects(
+ kafka_prefix: str,
+ kafka_server: Tuple[Popen, int],
+ consumer: Consumer,
+ large_directories: List[Directory],
+):
+ kafka_prefix += ".swh.journal.objects"
+
+ # Needed as there is no directories in TEST_OBJECT_DICTS, the consumer
+ # isn't autosubscribed to directories.
+ consumer.subscribe([kafka_prefix + ".directory"])
+
+ writer = KafkaJournalWriter(
+ brokers=["localhost:%d" % kafka_server[1]],
+ client_id="kafka_writer",
+ prefix=kafka_prefix,
+ )
+
+ expected_dirs = []
+
+ for directory in large_directories:
+ if dir_message_size(directory) < SMALL_MESSAGE_SIZE:
+ # No error; write anyway, but continue
+ writer.write_addition("directory", directory)
+ expected_dirs.append(directory)
+ continue
+
+ with pytest.raises(KafkaDeliveryError) as exc:
+ writer.write_addition("directory", directory)
+
+ assert "Failed deliveries" in exc.value.message
+ assert len(exc.value.delivery_failures) == 1
+
+ object_type, key, msg, error_name = exc.value.delivery_failures[0]
+
+ assert object_type == "directory"
+ assert key == directory.id
+ assert error_name == "MSG_SIZE_TOO_LARGE"
+
+ consumed_messages = consume_messages(consumer, kafka_prefix, len(expected_dirs))
+
+ for dir, message in zip(expected_dirs, consumed_messages["directory"]):
+ (dir_id, consumed_dir) = message
+ assert dir_id == dir.id
+ assert consumed_dir == dir.to_dict()
diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py
--- a/swh/journal/writer/kafka.py
+++ b/swh/journal/writer/kafka.py
@@ -4,7 +4,8 @@
# See top-level LICENSE file for more information
import logging
-from typing import Dict, Iterable, Optional, Type
+import time
+from typing import Dict, Iterable, List, NamedTuple, Optional, Type
from confluent_kafka import Producer, KafkaException
@@ -24,6 +25,7 @@
KeyType,
ModelObject,
object_key,
+ pprint_key,
key_to_kafka,
value_to_kafka,
)
@@ -42,6 +44,44 @@
}
+class DeliveryTag(NamedTuple):
+ """Unique tag allowing us to check for a message delivery"""
+
+ topic: str
+ kafka_key: bytes
+
+
+class DeliveryFailureInfo(NamedTuple):
+ """Verbose information for failed deliveries"""
+
+ object_type: str
+ key: KeyType
+ message: str
+ code: str
+
+
+def get_object_type(topic: str) -> str:
+ """Get the object type from a topic string"""
+ return topic.rsplit(".", 1)[-1]
+
+
+class KafkaDeliveryError(Exception):
+ """Delivery failed on some kafka messages."""
+
+ def __init__(self, message: str, delivery_failures: Iterable[DeliveryFailureInfo]):
+ self.message = message
+ self.delivery_failures = list(delivery_failures)
+
+ def pretty_failures(self) -> str:
+ return ", ".join(
+ f"{f.object_type} {pprint_key(f.key)} ({f.message})"
+ for f in self.delivery_failures
+ )
+
+ def __str__(self):
+ return f"KafkaDeliveryError({self.message}, [{self.pretty_failures()}])"
+
+
class KafkaJournalWriter:
"""This class is instantiated and used by swh-storage to write incoming
new objects to Kafka before adding them to the storage backend
@@ -52,6 +92,8 @@
prefix: the prefix used to build the topic names for objects
client_id: the id of the writer sent to kafka
producer_config: extra configuration keys passed to the `Producer`
+ flush_timeout: timeout, in seconds, after which the `flush` operation
+ will fail if some message deliveries are still pending.
"""
@@ -61,6 +103,7 @@
prefix: str,
client_id: str,
producer_config: Optional[Dict] = None,
+ flush_timeout: float = 120,
):
self._prefix = prefix
@@ -85,14 +128,30 @@
}
)
+ # Delivery management
+ self.flush_timeout = flush_timeout
+
+ # delivery tag -> original object "key" mapping
+ self.deliveries_pending: Dict[DeliveryTag, KeyType] = {}
+
+ # List of (object_type, key, error_msg, error_name) for failed deliveries
+ self.delivery_failures: List[DeliveryFailureInfo] = []
+
def _error_cb(self, error):
if error.fatal():
raise KafkaException(error)
logger.info("Received non-fatal kafka error: %s", error)
def _on_delivery(self, error, message):
+ (topic, key) = delivery_tag = DeliveryTag(message.topic(), message.key())
+ sent_key = self.deliveries_pending.pop(delivery_tag, None)
+
if error is not None:
- self._error_cb(error)
+ self.delivery_failures.append(
+ DeliveryFailureInfo(
+ get_object_type(topic), sent_key, error.str(), error.name()
+ )
+ )
def send(self, topic: str, key: KeyType, value):
kafka_key = key_to_kafka(key)
@@ -100,11 +159,44 @@
topic=topic, key=kafka_key, value=value_to_kafka(value),
)
- # Need to service the callbacks regularly by calling poll
- self.producer.poll(0)
+ self.deliveries_pending[DeliveryTag(topic, kafka_key)] = key
+
+ def delivery_error(self, message) -> KafkaDeliveryError:
+ """Get all failed deliveries, and clear them"""
+ ret = self.delivery_failures
+ self.delivery_failures = []
+
+ while self.deliveries_pending:
+ delivery_tag, orig_key = self.deliveries_pending.popitem()
+ (topic, kafka_key) = delivery_tag
+ ret.append(
+ DeliveryFailureInfo(
+ get_object_type(topic),
+ orig_key,
+ "No delivery before flush() timeout",
+ "SWH_FLUSH_TIMEOUT",
+ )
+ )
+
+ return KafkaDeliveryError(message, ret)
def flush(self):
- self.producer.flush()
+ start = time.monotonic()
+
+ self.producer.flush(self.flush_timeout)
+
+ while self.deliveries_pending:
+ if time.monotonic() - start > self.flush_timeout:
+ break
+ self.producer.poll(0.1)
+
+ if self.deliveries_pending:
+ # Delivery timeout
+ raise self.delivery_error(
+ "flush() exceeded timeout (%ss)" % self.flush_timeout,
+ )
+ elif self.delivery_failures:
+ raise self.delivery_error("Failed deliveries after flush()")
def _sanitize_object(
self, object_type: str, object_: ModelObject
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jan 30, 2:00 PM (1 w, 11 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3228029
Attached To
D2994: Add delivery notification handling to swh.journal.writer.kafka
Event Timeline
Log In to Comment