diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -77,6 +77,9 @@ 'brokers': ['localhost:%d' % kafka_server[1]], 'client_id': 'kafka_writer', 'prefix': kafka_prefix, + 'producer_config': { + 'message.max.bytes': 100000000, + } } writer = KafkaJournalWriter(**config) @@ -105,10 +108,13 @@ 'brokers': ['localhost:%d' % kafka_server[1]], 'client_id': 'kafka_writer', 'prefix': kafka_prefix, + 'producer_config': { + 'message.max.bytes': 100000000, + } } storage = get_storage('memory', journal_writer={ - 'cls': 'kafka', 'args': config, + 'cls': 'kafka', **config, }) expected_messages = 0 diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py --- a/swh/journal/writer/kafka.py +++ b/swh/journal/writer/kafka.py @@ -19,12 +19,15 @@ """This class is instantiated and used by swh-storage to write incoming new objects to Kafka before adding them to the storage backend (eg. postgresql) itself.""" - def __init__(self, brokers, prefix, client_id): + def __init__(self, brokers, prefix, client_id, producer_config=None): self._prefix = prefix if isinstance(brokers, str): brokers = [brokers] + if not producer_config: + producer_config = {} + self.producer = Producer({ 'bootstrap.servers': ','.join(brokers), 'client.id': client_id, @@ -32,6 +35,7 @@ 'error_cb': self._error_cb, 'logger': logger, 'enable.idempotence': 'true', + **producer_config, }) def _error_cb(self, error):