Page MenuHomeSoftware Heritage

D2003.id6733.diff
No OneTemporary

D2003.id6733.diff

diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py
--- a/swh/journal/writer/kafka.py
+++ b/swh/journal/writer/kafka.py
@@ -5,7 +5,7 @@
import logging
-from confluent_kafka import Producer
+from confluent_kafka import Producer, KafkaException
from swh.model.hashutil import DEFAULT_ALGORITHMS
from swh.model.model import BaseModel
@@ -28,9 +28,21 @@
self.producer = Producer({
'bootstrap.servers': ','.join(brokers),
'client.id': client_id,
+ 'on_delivery': self._on_delivery,
+ 'error_cb': self._error_cb,
+ 'logger': logger,
'enable.idempotence': 'true',
})
+ def _error_cb(self, error):
+ if error.fatal():
+ raise KafkaException(error)
+ logger.info('Received non-fatal kafka error: %s', error)
+
+ def _on_delivery(self, error, message):
+ if error is not None:
+ self._error_cb(error)
+
def send(self, topic, key, value):
self.producer.produce(
topic=topic,

File Metadata

Mime Type
text/plain
Expires
Jul 3 2025, 8:27 AM (8 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3231268

Event Timeline