diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -139,31 +139,33 @@ timeout = self.process_timeout - elapsed - message = self.consumer.poll(timeout=timeout) - if not message: + messages = self.consumer.consume(timeout=timeout, num_messages=20) + if not messages: continue - error = message.error() - if error is not None: - if error.fatal(): - raise KafkaException(error) - logger.info('Received non-fatal kafka error: %s', error) - continue + for message in messages: + error = message.error() + if error is not None: + if error.fatal(): + raise KafkaException(error) + logger.info('Received non-fatal kafka error: %s', error) + continue - nb_messages += 1 + nb_messages += 1 - object_type = message.topic().split('.')[-1] - # Got a message from a topic we did not subscribe to. - assert object_type in self._object_types, object_type + object_type = message.topic().split('.')[-1] + # Got a message from a topic we did not subscribe to. + assert object_type in self._object_types, object_type - objects[object_type].append( - self.value_deserializer(message.value()) - ) + objects[object_type].append( + self.value_deserializer(message.value()) + ) if nb_messages >= self.max_messages: break - worker_fn(dict(objects)) + if nb_messages: + worker_fn(dict(objects)) - self.consumer.commit() + self.consumer.commit() return nb_messages