diff --git a/docs/journal-clients.rst b/docs/journal-clients.rst --- a/docs/journal-clients.rst +++ b/docs/journal-clients.rst @@ -6,12 +6,16 @@ Journal client are processes that read data from the |swh| Journal, in order to efficiently process all existing objects, and process new objects as they come. + Some journal clients, such as :ref:`swh-dataset ` only read existing objects and stop when they are done. -They can run in parallel, and the :mod:`swh.journal.client` module -provides an abstraction handling all the setup, so actual clients are actually -a single function that takes :mod:`model objects ` as parameters. +Other journal clients, such as the :ref:`mirror ` are expected to +read constantly from the journal. + +They can run in parallel, and the :mod:`swh.journal.client` module provides an +abstraction handling all the setup, so actual clients only consists in a single +function that takes :mod:`model objects ` as parameters. For example, a very simple journal client that prints all revisions and releases to the console can be implemented like this: diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -7,7 +7,7 @@ import logging import os import time -from typing import Any, Dict, List, Optional, Set, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union from confluent_kafka import Consumer, KafkaError, KafkaException @@ -73,6 +73,15 @@ Messages are processed by the `worker_fn` callback passed to the `process` method, in batches of maximum `batch_size` messages (defaults to 200). + The objects passed to the `worker_fn` callback are the result of the kafka + message converted by the `value_deserializer` function. By default (if this + argument is not given), it will produce dicts (using the `kafka_to_value` + function). This signature of the function is: + + `value_deserializer(object_type: str, kafka_msg: bytes) -> Any` + + If the value returned by `value_deserializer` i + If set, the processing stops after processing `stop_after_objects` messages in total. @@ -99,6 +108,7 @@ process_timeout: Optional[float] = None, auto_offset_reset: str = "earliest", stop_on_eof: bool = False, + value_deserializer: Optional[Callable[[str, bytes], Any]] = None, **kwargs, ): if prefix is None: @@ -111,8 +121,10 @@ if batch_size <= 0: raise ValueError("Option 'batch_size' needs to be positive") - - self.value_deserializer = kafka_to_value + if value_deserializer: + self.value_deserializer = value_deserializer + else: + self.value_deserializer = lambda _, value: kafka_to_value(value) if isinstance(brokers, str): brokers = [brokers] @@ -286,7 +298,11 @@ continue nb_processed += 1 object_type = message.topic().split(".")[-1] - objects[object_type].append(self.deserialize_message(message)) + deserialized_object = self.deserialize_message( + message, object_type=object_type + ) + if deserialized_object is not None: + objects[object_type].append(deserialized_object) if objects: worker_fn(dict(objects)) @@ -299,8 +315,8 @@ return nb_processed, at_eof - def deserialize_message(self, message): - return self.value_deserializer(message.value()) + def deserialize_message(self, message, object_type=None): + return self.value_deserializer(object_type, message.value()) def close(self): self.consumer.close() diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py --- a/swh/journal/tests/test_client.py +++ b/swh/journal/tests/test_client.py @@ -3,15 +3,16 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Dict, List +from typing import Dict, List, cast from unittest.mock import MagicMock from confluent_kafka import Producer import pytest from swh.journal.client import JournalClient -from swh.journal.serializers import key_to_kafka, value_to_kafka -from swh.model.model import Content +from swh.journal.serializers import kafka_to_value, key_to_kafka, value_to_kafka +from swh.model.model import Content, Revision +from swh.model.tests.swh_model_data import TEST_OBJECTS REV = { "message": b"something cool", @@ -327,3 +328,45 @@ # we also only subscribed to the standard prefix, since there is no priviled prefix # on the kafka broker assert client.subscription == [kafka_prefix + ".revision"] + + +def test_client_with_deserializer( + kafka_prefix: str, kafka_consumer_group: str, kafka_server: str +): + producer = Producer( + { + "bootstrap.servers": kafka_server, + "client.id": "test producer", + "acks": "all", + } + ) + + # Fill Kafka + revisions = cast(List[Revision], TEST_OBJECTS["revision"]) + for rev in revisions: + producer.produce( + topic=kafka_prefix + ".revision", + key=rev.id, + value=value_to_kafka(rev.to_dict()), + ) + producer.flush() + + def custom_deserializer(object_type, msg): + assert object_type == "revision" + obj = kafka_to_value(msg) + # filter the first revision + if obj["id"] == revisions[0].id: + return None + return Revision.from_dict(obj) + + client = JournalClient( + brokers=[kafka_server], + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_after_objects=1, + value_deserializer=custom_deserializer, + ) + worker_fn = MagicMock() + client.process(worker_fn) + # check that the first Revision has not been passed to worker_fn + worker_fn.assert_called_once_with({"revision": revisions[1:]})