diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -7,7 +7,7 @@ import logging import time -from confluent_kafka import Consumer, KafkaException +from confluent_kafka import Consumer, KafkaException, KafkaError from .serializers import kafka_to_value from swh.journal import DEFAULT_PREFIX @@ -30,11 +30,20 @@ 'origin_visit' ] +# Errors that Kafka raises too often and are not useful; therefore they +# we lower their log level to DEBUG instead of INFO. +_SPAMMY_ERRORS = [ + KafkaError._NO_OFFSET, +] + def _error_cb(error): if error.fatal(): raise KafkaException(error) - logger.info('Received non-fatal kafka error: %s', error) + if error.code() in _SPAMMY_ERRORS: + logger.debug('Received non-fatal kafka error: %s', error) + else: + logger.info('Received non-fatal kafka error: %s', error) def _on_commit(error, partitions): @@ -161,9 +170,7 @@ for message in messages: error = message.error() if error is not None: - if error.fatal(): - raise KafkaException(error) - logger.info('Received non-fatal kafka error: %s', error) + _error_cb(error) continue nb_messages += 1