diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -74,7 +74,7 @@ kafka_prefix += '.swh.journal.objects' config = { - 'brokers': 'localhost:%d' % kafka_server[1], + 'brokers': ['localhost:%d' % kafka_server[1]], 'client_id': 'kafka_writer', 'prefix': kafka_prefix, } @@ -102,7 +102,7 @@ kafka_prefix += '.swh.journal.objects' config = { - 'brokers': 'localhost:%d' % kafka_server[1], + 'brokers': ['localhost:%d' % kafka_server[1]], 'client_id': 'kafka_writer', 'prefix': kafka_prefix, } diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py --- a/swh/journal/writer/kafka.py +++ b/swh/journal/writer/kafka.py @@ -22,8 +22,11 @@ def __init__(self, brokers, prefix, client_id): self._prefix = prefix + if isinstance(brokers, str): + brokers = [brokers] + self.producer = Producer({ - 'bootstrap.servers': brokers, + 'bootstrap.servers': ','.join(brokers), 'client.id': client_id, 'enable.idempotence': 'true', }) @@ -35,11 +38,12 @@ value=value_to_kafka(value), ) - def flush(self): - self.producer.flush() # Need to service the callbacks regularly by calling poll self.producer.poll(0) + def flush(self): + self.producer.flush() + def _get_key(self, object_type, object_): if object_type in ('revision', 'release', 'directory', 'snapshot'): return object_['id']