Page MenuHomeSoftware Heritage

D2651.id9688.diff
No OneTemporary

D2651.id9688.diff

diff --git a/swh/journal/client.py b/swh/journal/client.py
--- a/swh/journal/client.py
+++ b/swh/journal/client.py
@@ -142,8 +142,6 @@
start_time = time.monotonic()
nb_messages = 0
- objects = defaultdict(list)
-
while True:
# timeout for message poll
timeout = 1.0
@@ -173,28 +171,38 @@
if not messages:
continue
- for message in messages:
- error = message.error()
- if error is not None:
- _error_cb(error)
- continue
+ # Process messages and add the successfully processed ones to the
+ # message counter.
+ nb_messages += self.handle_messages(messages, worker_fn)
+
+ return nb_messages
+
+ def handle_messages(self, messages, worker_fn):
+ objects = defaultdict(list)
+ nb_processed = 0
+
+ for message in messages:
+ error = message.error()
+ if error is not None:
+ _error_cb(error)
+ continue
+
+ nb_processed += 1
- nb_messages += 1
+ object_type = message.topic().split('.')[-1]
+ # Got a message from a topic we did not subscribe to.
+ assert object_type in self._object_types, object_type
- object_type = message.topic().split('.')[-1]
- # Got a message from a topic we did not subscribe to.
- assert object_type in self._object_types, object_type
+ objects[object_type].append(self.deserialize_message(message))
- objects[object_type].append(
- self.value_deserializer(message.value())
- )
+ if objects:
+ worker_fn(dict(objects))
+ self.consumer.commit()
- if objects:
- worker_fn(dict(objects))
- objects.clear()
+ return nb_processed
- self.consumer.commit()
- return nb_messages
+ def deserialize_message(self, message):
+ return self.value_deserializer(message.value())
def close(self):
self.consumer.close()

File Metadata

Mime Type
text/plain
Expires
Dec 17 2024, 1:18 AM (13 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3219144

Event Timeline