Page MenuHomeSoftware Heritage

D6565.id23910.diff
No OneTemporary

D6565.id23910.diff

diff --git a/docs/journal-clients.rst b/docs/journal-clients.rst
--- a/docs/journal-clients.rst
+++ b/docs/journal-clients.rst
@@ -6,12 +6,16 @@
Journal client are processes that read data from the |swh| Journal,
in order to efficiently process all existing objects, and process new objects
as they come.
+
Some journal clients, such as :ref:`swh-dataset <swh-dataset>` only read
existing objects and stop when they are done.
-They can run in parallel, and the :mod:`swh.journal.client` module
-provides an abstraction handling all the setup, so actual clients are actually
-a single function that takes :mod:`model objects <swh.model.model>` as parameters.
+Other journal clients, such as the :ref:`mirror <swh-storage>` are expected to
+read constantly from the journal.
+
+They can run in parallel, and the :mod:`swh.journal.client` module provides an
+abstraction handling all the setup, so actual clients only consists in a single
+function that takes :mod:`model objects <swh.model.model>` as parameters.
For example, a very simple journal client that prints all revisions and releases
to the console can be implemented like this:
diff --git a/swh/journal/client.py b/swh/journal/client.py
--- a/swh/journal/client.py
+++ b/swh/journal/client.py
@@ -7,7 +7,7 @@
import logging
import os
import time
-from typing import Any, Dict, List, Optional, Set, Tuple, Union
+from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
from confluent_kafka import Consumer, KafkaError, KafkaException
@@ -73,6 +73,16 @@
Messages are processed by the `worker_fn` callback passed to the `process`
method, in batches of maximum `batch_size` messages (defaults to 200).
+ The objects passed to the `worker_fn` callback are the result of the kafka
+ message converted by the `value_deserializer` function. By default (if this
+ argument is not given), it will produce dicts (using the `kafka_to_value`
+ function). This signature of the function is:
+
+ `value_deserializer(object_type: str, kafka_msg: bytes) -> Any`
+
+ If the value returned by `value_deserializer` is None, it is ignored and
+ not passed the `worker_fn` function.
+
If set, the processing stops after processing `stop_after_objects` messages
in total.
@@ -99,6 +109,7 @@
process_timeout: Optional[float] = None,
auto_offset_reset: str = "earliest",
stop_on_eof: bool = False,
+ value_deserializer: Optional[Callable[[str, bytes], Any]] = None,
**kwargs,
):
if prefix is None:
@@ -111,8 +122,10 @@
if batch_size <= 0:
raise ValueError("Option 'batch_size' needs to be positive")
-
- self.value_deserializer = kafka_to_value
+ if value_deserializer:
+ self.value_deserializer = value_deserializer
+ else:
+ self.value_deserializer = lambda _, value: kafka_to_value(value)
if isinstance(brokers, str):
brokers = [brokers]
@@ -286,7 +299,11 @@
continue
nb_processed += 1
object_type = message.topic().split(".")[-1]
- objects[object_type].append(self.deserialize_message(message))
+ deserialized_object = self.deserialize_message(
+ message, object_type=object_type
+ )
+ if deserialized_object is not None:
+ objects[object_type].append(deserialized_object)
if objects:
worker_fn(dict(objects))
@@ -299,8 +316,8 @@
return nb_processed, at_eof
- def deserialize_message(self, message):
- return self.value_deserializer(message.value())
+ def deserialize_message(self, message, object_type=None):
+ return self.value_deserializer(object_type, message.value())
def close(self):
self.consumer.close()
diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py
--- a/swh/journal/tests/test_client.py
+++ b/swh/journal/tests/test_client.py
@@ -3,15 +3,16 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from typing import Dict, List
+from typing import Dict, List, cast
from unittest.mock import MagicMock
from confluent_kafka import Producer
import pytest
from swh.journal.client import JournalClient
-from swh.journal.serializers import key_to_kafka, value_to_kafka
-from swh.model.model import Content
+from swh.journal.serializers import kafka_to_value, key_to_kafka, value_to_kafka
+from swh.model.model import Content, Revision
+from swh.model.tests.swh_model_data import TEST_OBJECTS
REV = {
"message": b"something cool",
@@ -327,3 +328,45 @@
# we also only subscribed to the standard prefix, since there is no priviled prefix
# on the kafka broker
assert client.subscription == [kafka_prefix + ".revision"]
+
+
+def test_client_with_deserializer(
+ kafka_prefix: str, kafka_consumer_group: str, kafka_server: str
+):
+ producer = Producer(
+ {
+ "bootstrap.servers": kafka_server,
+ "client.id": "test producer",
+ "acks": "all",
+ }
+ )
+
+ # Fill Kafka
+ revisions = cast(List[Revision], TEST_OBJECTS["revision"])
+ for rev in revisions:
+ producer.produce(
+ topic=kafka_prefix + ".revision",
+ key=rev.id,
+ value=value_to_kafka(rev.to_dict()),
+ )
+ producer.flush()
+
+ def custom_deserializer(object_type, msg):
+ assert object_type == "revision"
+ obj = kafka_to_value(msg)
+ # filter the first revision
+ if obj["id"] == revisions[0].id:
+ return None
+ return Revision.from_dict(obj)
+
+ client = JournalClient(
+ brokers=[kafka_server],
+ group_id=kafka_consumer_group,
+ prefix=kafka_prefix,
+ stop_after_objects=1,
+ value_deserializer=custom_deserializer,
+ )
+ worker_fn = MagicMock()
+ client.process(worker_fn)
+ # check that the first Revision has not been passed to worker_fn
+ worker_fn.assert_called_once_with({"revision": revisions[1:]})

File Metadata

Mime Type
text/plain
Expires
Wed, Jul 2, 10:57 AM (1 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3225494

Event Timeline