diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -142,8 +142,6 @@ start_time = time.monotonic() nb_messages = 0 - objects = defaultdict(list) - while True: # timeout for message poll timeout = 1.0 @@ -173,28 +171,38 @@ if not messages: continue - for message in messages: - error = message.error() - if error is not None: - _error_cb(error) - continue + # Process messages and add the successfully processed ones to the + # message counter. + nb_messages += self.handle_messages(messages, worker_fn) + + return nb_messages + + def handle_messages(self, messages, worker_fn): + objects = defaultdict(list) + nb_processed = 0 + + for message in messages: + error = message.error() + if error is not None: + _error_cb(error) + continue + + nb_processed += 1 - nb_messages += 1 + object_type = message.topic().split('.')[-1] + # Got a message from a topic we did not subscribe to. + assert object_type in self._object_types, object_type - object_type = message.topic().split('.')[-1] - # Got a message from a topic we did not subscribe to. - assert object_type in self._object_types, object_type + objects[object_type].append(self.deserialize_message(message)) - objects[object_type].append( - self.value_deserializer(message.value()) - ) + if objects: + worker_fn(dict(objects)) + self.consumer.commit() - if objects: - worker_fn(dict(objects)) - objects.clear() + return nb_processed - self.consumer.commit() - return nb_messages + def deserialize_message(self, message): + return self.value_deserializer(message.value()) def close(self): self.consumer.close()