diff --git a/docs/journal-clients.rst b/docs/journal-clients.rst --- a/docs/journal-clients.rst +++ b/docs/journal-clients.rst @@ -6,12 +6,16 @@ Journal client are processes that read data from the |swh| Journal, in order to efficiently process all existing objects, and process new objects as they come. + Some journal clients, such as :ref:`swh-dataset ` only read existing objects and stop when they are done. -They can run in parallel, and the :mod:`swh.journal.client` module -provides an abstraction handling all the setup, so actual clients are actually -a single function that takes :mod:`model objects ` as parameters. +Other journal clients, such as the :ref:`mirror ` are expected to +read constantly from the journal. + +They can run in parallel, and the :mod:`swh.journal.client` module provides an +abstraction handling all the setup, so actual clients only consists in a single +function that takes :mod:`model objects ` as parameters. For example, a very simple journal client that prints all revisions and releases to the console can be implemented like this: diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -7,7 +7,7 @@ import logging import os import time -from typing import Any, Dict, List, Optional, Set, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union from confluent_kafka import Consumer, KafkaError, KafkaException @@ -73,6 +73,11 @@ Messages are processed by the `worker_fn` callback passed to the `process` method, in batches of maximum `batch_size` messages (defaults to 200). + The objects passed to the `worker_fn` callback are the result of the kafka + message converted by the `value_deserializer` function. By default (if this + argument is not given), it will produce dicts (using the `kafka_to_value` + function). + If set, the processing stops after processing `stop_after_objects` messages in total. @@ -99,6 +104,7 @@ process_timeout: Optional[float] = None, auto_offset_reset: str = "earliest", stop_on_eof: bool = False, + value_deserializer: Optional[Callable[[str, bytes], Any]] = None, **kwargs, ): if prefix is None: @@ -111,8 +117,10 @@ if batch_size <= 0: raise ValueError("Option 'batch_size' needs to be positive") - - self.value_deserializer = kafka_to_value + if value_deserializer: + self.value_deserializer = value_deserializer + else: + self.value_deserializer = lambda _, value: kafka_to_value(value) if isinstance(brokers, str): brokers = [brokers] @@ -286,7 +294,9 @@ continue nb_processed += 1 object_type = message.topic().split(".")[-1] - objects[object_type].append(self.deserialize_message(message)) + objects[object_type].append( + self.deserialize_message(message, object_type=object_type) + ) if objects: worker_fn(dict(objects)) @@ -299,8 +309,8 @@ return nb_processed, at_eof - def deserialize_message(self, message): - return self.value_deserializer(message.value()) + def deserialize_message(self, message, object_type=None): + return self.value_deserializer(object_type, message.value()) def close(self): self.consumer.close()