diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -30,6 +30,17 @@ ] +def _error_cb(error): + if error.fatal(): + raise KafkaException(error) + logger.info('Received non-fatal kafka error: %s', error) + + +def _on_commit(error, partitions): + if error is not None: + _error_cb(error) + + class JournalClient: """A base client for the Software Heritage journal. @@ -81,8 +92,8 @@ 'bootstrap.servers': ','.join(brokers), 'auto.offset.reset': auto_offset_reset, 'group.id': group_id, - 'on_commit': self._on_commit, - 'error_cb': self._error_cb, + 'on_commit': _on_commit, + 'error_cb': _error_cb, 'enable.auto.commit': False, 'logger': logger, } @@ -105,15 +116,6 @@ self._object_types = object_types - def _error_cb(self, error): - if error.fatal(): - raise KafkaException(error) - logger.info('Received non-fatal kafka error: %s', error) - - def _on_commit(self, error, partitions): - if error is not None: - self._error_cb(error) - def process(self, worker_fn): """Polls Kafka for a batch of messages, and calls the worker_fn with these messages.