diff --git swh/journal/client.py swh/journal/client.py index 0d481b0..42e2b96 100644 --- swh/journal/client.py +++ swh/journal/client.py @@ -76,7 +76,7 @@ class JournalClient: def __init__( self, brokers, group_id, prefix=None, object_types=None, max_messages=0, process_timeout=0, auto_offset_reset='earliest', - **kwargs): + include_meta=False, **kwargs): if prefix is None: prefix = DEFAULT_PREFIX if object_types is None: @@ -127,6 +127,7 @@ class JournalClient: self.max_messages = max_messages self.process_timeout = process_timeout + self.include_meta = include_meta self._object_types = object_types @@ -179,9 +180,14 @@ class JournalClient: # Got a message from a topic we did not subscribe to. assert object_type in self._object_types, object_type - objects[object_type].append( - self.value_deserializer(message.value()) - ) + obj = self.value_deserializer(message.value()) + if self.include_meta: + obj['_meta'] = { + 'timpestamp': message.timestamp(), + 'offset': message.offset(), + 'partition': message.partition(), + } + objects[object_type].append(obj) if objects: worker_fn(dict(objects))