diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -74,7 +74,7 @@ kafka_prefix += '.swh.journal.objects' config = { - 'brokers': 'localhost:%d' % kafka_server[1], + 'brokers': ['localhost:%d' % kafka_server[1]], 'client_id': 'kafka_writer', 'prefix': kafka_prefix, } @@ -102,7 +102,7 @@ kafka_prefix += '.swh.journal.objects' config = { - 'brokers': 'localhost:%d' % kafka_server[1], + 'brokers': ['localhost:%d' % kafka_server[1]], 'client_id': 'kafka_writer', 'prefix': kafka_prefix, } diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py --- a/swh/journal/writer/kafka.py +++ b/swh/journal/writer/kafka.py @@ -5,7 +5,7 @@ import logging -from confluent_kafka import Producer +from confluent_kafka import Producer, KafkaException from swh.model.hashutil import DEFAULT_ALGORITHMS from swh.model.model import BaseModel @@ -22,12 +22,27 @@ def __init__(self, brokers, prefix, client_id): self._prefix = prefix + if isinstance(brokers, str): + brokers = [brokers] + self.producer = Producer({ - 'bootstrap.servers': brokers, + 'bootstrap.servers': ','.join(brokers), 'client.id': client_id, + 'on_delivery': self._on_delivery, + 'error_cb': self._error_cb, + 'logger': logger, 'enable.idempotence': 'true', }) + def _error_cb(self, error): + if error.fatal(): + raise KafkaException(error) + logger.info('Received non-fatal kafka error: %s', error) + + def _on_delivery(self, error, message): + if error is not None: + self._error_cb(error) + def send(self, topic, key, value): self.producer.produce( topic=topic, @@ -35,11 +50,12 @@ value=value_to_kafka(value), ) - def flush(self): - self.producer.flush() # Need to service the callbacks regularly by calling poll self.producer.poll(0) + def flush(self): + self.producer.flush() + def _get_key(self, object_type, object_): if object_type in ('revision', 'release', 'directory', 'snapshot'): return object_['id']