Page MenuHomeSoftware Heritage
Paste P595

(An Untitled Masterwork)
ActivePublic

Authored by seirl on Feb 11 2020, 4:44 PM.
diff --git swh/journal/client.py swh/journal/client.py
index 0d481b0..42e2b96 100644
--- swh/journal/client.py
+++ swh/journal/client.py
@@ -76,7 +76,7 @@ class JournalClient:
def __init__(
self, brokers, group_id, prefix=None, object_types=None,
max_messages=0, process_timeout=0, auto_offset_reset='earliest',
- **kwargs):
+ include_meta=False, **kwargs):
if prefix is None:
prefix = DEFAULT_PREFIX
if object_types is None:
@@ -127,6 +127,7 @@ class JournalClient:
self.max_messages = max_messages
self.process_timeout = process_timeout
+ self.include_meta = include_meta
self._object_types = object_types
@@ -179,9 +180,14 @@ class JournalClient:
# Got a message from a topic we did not subscribe to.
assert object_type in self._object_types, object_type
- objects[object_type].append(
- self.value_deserializer(message.value())
- )
+ obj = self.value_deserializer(message.value())
+ if self.include_meta:
+ obj['_meta'] = {
+ 'timpestamp': message.timestamp(),
+ 'offset': message.offset(),
+ 'partition': message.partition(),
+ }
+ objects[object_type].append(obj)
if objects:
worker_fn(dict(objects))