diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -30,6 +30,17 @@ ] +def _error_cb(error): + if error.fatal(): + raise KafkaException(error) + logger.info('Received non-fatal kafka error: %s', error) + + +def _on_commit(error, partitions): + if error is not None: + _error_cb(error) + + class JournalClient: """A base client for the Software Heritage journal. @@ -81,8 +92,8 @@ 'bootstrap.servers': ','.join(brokers), 'auto.offset.reset': auto_offset_reset, 'group.id': group_id, - 'on_commit': self._on_commit, - 'error_cb': self._error_cb, + 'on_commit': _on_commit, + 'error_cb': _error_cb, 'enable.auto.commit': False, 'logger': logger, }