diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py --- a/swh/journal/writer/kafka.py +++ b/swh/journal/writer/kafka.py @@ -5,7 +5,7 @@ import logging -from confluent_kafka import Producer +from confluent_kafka import Producer, KafkaException from swh.model.hashutil import DEFAULT_ALGORITHMS from swh.model.model import BaseModel @@ -28,9 +28,21 @@ self.producer = Producer({ 'bootstrap.servers': ','.join(brokers), 'client.id': client_id, + 'on_delivery': self._on_delivery, + 'error_cb': self._error_cb, + 'logger': logger, 'enable.idempotence': 'true', }) + def _error_cb(self, error): + if error.fatal(): + raise KafkaException(error) + logger.info('Received non-fatal kafka error: %s', error) + + def _on_delivery(self, error, message): + if error is not None: + self._error_cb(error) + def send(self, topic, key, value): self.producer.produce( topic=topic,