Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9312545
D6565.id23910.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
6 KB
Subscribers
None
D6565.id23910.diff
View Options
diff --git a/docs/journal-clients.rst b/docs/journal-clients.rst
--- a/docs/journal-clients.rst
+++ b/docs/journal-clients.rst
@@ -6,12 +6,16 @@
Journal client are processes that read data from the |swh| Journal,
in order to efficiently process all existing objects, and process new objects
as they come.
+
Some journal clients, such as :ref:`swh-dataset <swh-dataset>` only read
existing objects and stop when they are done.
-They can run in parallel, and the :mod:`swh.journal.client` module
-provides an abstraction handling all the setup, so actual clients are actually
-a single function that takes :mod:`model objects <swh.model.model>` as parameters.
+Other journal clients, such as the :ref:`mirror <swh-storage>` are expected to
+read constantly from the journal.
+
+They can run in parallel, and the :mod:`swh.journal.client` module provides an
+abstraction handling all the setup, so actual clients only consists in a single
+function that takes :mod:`model objects <swh.model.model>` as parameters.
For example, a very simple journal client that prints all revisions and releases
to the console can be implemented like this:
diff --git a/swh/journal/client.py b/swh/journal/client.py
--- a/swh/journal/client.py
+++ b/swh/journal/client.py
@@ -7,7 +7,7 @@
import logging
import os
import time
-from typing import Any, Dict, List, Optional, Set, Tuple, Union
+from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
from confluent_kafka import Consumer, KafkaError, KafkaException
@@ -73,6 +73,16 @@
Messages are processed by the `worker_fn` callback passed to the `process`
method, in batches of maximum `batch_size` messages (defaults to 200).
+ The objects passed to the `worker_fn` callback are the result of the kafka
+ message converted by the `value_deserializer` function. By default (if this
+ argument is not given), it will produce dicts (using the `kafka_to_value`
+ function). This signature of the function is:
+
+ `value_deserializer(object_type: str, kafka_msg: bytes) -> Any`
+
+ If the value returned by `value_deserializer` is None, it is ignored and
+ not passed the `worker_fn` function.
+
If set, the processing stops after processing `stop_after_objects` messages
in total.
@@ -99,6 +109,7 @@
process_timeout: Optional[float] = None,
auto_offset_reset: str = "earliest",
stop_on_eof: bool = False,
+ value_deserializer: Optional[Callable[[str, bytes], Any]] = None,
**kwargs,
):
if prefix is None:
@@ -111,8 +122,10 @@
if batch_size <= 0:
raise ValueError("Option 'batch_size' needs to be positive")
-
- self.value_deserializer = kafka_to_value
+ if value_deserializer:
+ self.value_deserializer = value_deserializer
+ else:
+ self.value_deserializer = lambda _, value: kafka_to_value(value)
if isinstance(brokers, str):
brokers = [brokers]
@@ -286,7 +299,11 @@
continue
nb_processed += 1
object_type = message.topic().split(".")[-1]
- objects[object_type].append(self.deserialize_message(message))
+ deserialized_object = self.deserialize_message(
+ message, object_type=object_type
+ )
+ if deserialized_object is not None:
+ objects[object_type].append(deserialized_object)
if objects:
worker_fn(dict(objects))
@@ -299,8 +316,8 @@
return nb_processed, at_eof
- def deserialize_message(self, message):
- return self.value_deserializer(message.value())
+ def deserialize_message(self, message, object_type=None):
+ return self.value_deserializer(object_type, message.value())
def close(self):
self.consumer.close()
diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py
--- a/swh/journal/tests/test_client.py
+++ b/swh/journal/tests/test_client.py
@@ -3,15 +3,16 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from typing import Dict, List
+from typing import Dict, List, cast
from unittest.mock import MagicMock
from confluent_kafka import Producer
import pytest
from swh.journal.client import JournalClient
-from swh.journal.serializers import key_to_kafka, value_to_kafka
-from swh.model.model import Content
+from swh.journal.serializers import kafka_to_value, key_to_kafka, value_to_kafka
+from swh.model.model import Content, Revision
+from swh.model.tests.swh_model_data import TEST_OBJECTS
REV = {
"message": b"something cool",
@@ -327,3 +328,45 @@
# we also only subscribed to the standard prefix, since there is no priviled prefix
# on the kafka broker
assert client.subscription == [kafka_prefix + ".revision"]
+
+
+def test_client_with_deserializer(
+ kafka_prefix: str, kafka_consumer_group: str, kafka_server: str
+):
+ producer = Producer(
+ {
+ "bootstrap.servers": kafka_server,
+ "client.id": "test producer",
+ "acks": "all",
+ }
+ )
+
+ # Fill Kafka
+ revisions = cast(List[Revision], TEST_OBJECTS["revision"])
+ for rev in revisions:
+ producer.produce(
+ topic=kafka_prefix + ".revision",
+ key=rev.id,
+ value=value_to_kafka(rev.to_dict()),
+ )
+ producer.flush()
+
+ def custom_deserializer(object_type, msg):
+ assert object_type == "revision"
+ obj = kafka_to_value(msg)
+ # filter the first revision
+ if obj["id"] == revisions[0].id:
+ return None
+ return Revision.from_dict(obj)
+
+ client = JournalClient(
+ brokers=[kafka_server],
+ group_id=kafka_consumer_group,
+ prefix=kafka_prefix,
+ stop_after_objects=1,
+ value_deserializer=custom_deserializer,
+ )
+ worker_fn = MagicMock()
+ client.process(worker_fn)
+ # check that the first Revision has not been passed to worker_fn
+ worker_fn.assert_called_once_with({"revision": revisions[1:]})
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Wed, Jul 2, 10:57 AM (1 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3225494
Attached To
D6565: Pass the object_type to JournalClient.value_serializer()
Event Timeline
Log In to Comment