diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -139,31 +139,33 @@ timeout = self.process_timeout - elapsed - message = self.consumer.poll(timeout=timeout) - if not message: + messages = self.consumer.consume(timeout=timeout, num_messages=20) + if not messages: continue - error = message.error() - if error is not None: - if error.fatal(): - raise KafkaException(error) - logger.info('Received non-fatal kafka error: %s', error) - continue + for message in messages: + error = message.error() + if error is not None: + if error.fatal(): + raise KafkaException(error) + logger.info('Received non-fatal kafka error: %s', error) + continue - nb_messages += 1 + nb_messages += 1 - object_type = message.topic().split('.')[-1] - # Got a message from a topic we did not subscribe to. - assert object_type in self._object_types, object_type + object_type = message.topic().split('.')[-1] + # Got a message from a topic we did not subscribe to. + assert object_type in self._object_types, object_type - objects[object_type].append( - self.value_deserializer(message.value()) - ) + objects[object_type].append( + self.value_deserializer(message.value()) + ) if nb_messages >= self.max_messages: break - worker_fn(dict(objects)) + if nb_messages: + worker_fn(dict(objects)) - self.consumer.commit() + self.consumer.commit() return nb_messages diff --git a/swh/journal/tests/utils.py b/swh/journal/tests/utils.py --- a/swh/journal/tests/utils.py +++ b/swh/journal/tests/utils.py @@ -47,8 +47,10 @@ self.queue = queue self.committed = False - def poll(self, timeout=None): - return self.queue.pop(0) + def consume(self, num_messages, timeout=None): + L = self.queue[0:num_messages] + self.queue[0:num_messages] = [] + return L def commit(self): if self.queue == []: