diff --git a/PKG-INFO b/PKG-INFO index 1b4f654..1da9607 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,76 +1,76 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.8.0 +Version: 0.9.0 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-journal/ Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing License-File: LICENSE License-File: AUTHORS swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` diff --git a/docs/example-journal-client.py b/docs/example-journal-client.py new file mode 100644 index 0000000..ee5ffd1 --- /dev/null +++ b/docs/example-journal-client.py @@ -0,0 +1,37 @@ +import pprint + + +def process_objects(all_objects): + """Worker function handling incoming objects""" + for (object_type, objects) in all_objects.items(): + for object_ in objects: + print(f"New {object_type} object:") + pprint.pprint(object_) + print() + + +def main(): + from swh.journal.client import get_journal_client + + # Usually read from a config file: + config = { + "brokers": ["localhost:9092"], + "group_id": "my-consumer-group", + "auto_offset_reset": "earliest", + } + + # Initialize the client + client = get_journal_client( + "kafka", object_types=["revision", "release"], privileged=True, **config + ) + + try: + # Run the client forever + client.process(process_objects) + except KeyboardInterrupt: + print("Called Ctrl-C, exiting.") + exit(0) + + +if __name__ == "__main__": + main() diff --git a/docs/index.rst b/docs/index.rst index 8be5717..d837f25 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,15 +1,13 @@ .. _swh-journal: Software Heritage - Journal =========================== Persistent logger of changes to the archive, with publish-subscribe support. -Reference Documentation ------------------------ - .. toctree:: :maxdepth: 2 + journal-clients /apidoc/swh.journal diff --git a/docs/journal-clients.rst b/docs/journal-clients.rst new file mode 100644 index 0000000..3441ebb --- /dev/null +++ b/docs/journal-clients.rst @@ -0,0 +1,80 @@ +.. _journal_clients: + +Software Heritage Journal clients +================================= + +Journal client are processes that read data from the |swh| Journal, +in order to efficiently process all existing objects, and process new objects +as they come. + +Some journal clients, such as :ref:`swh-dataset ` only read +existing objects and stop when they are done. + +Other journal clients, such as the :ref:`mirror ` are expected to +read constantly from the journal. + +They can run in parallel, and the :mod:`swh.journal.client` module provides an +abstraction handling all the setup, so actual clients only consists in a single +function that takes :mod:`model objects ` as parameters. + +For example, a very simple journal client that prints all revisions and releases +to the console can be implemented like this: + +.. literalinclude:: example-journal-client.py + + +Parallelization +--------------- + +A single journal client, like the one above, is sequential. +It can however run concurrently by running the same program multiple times. +Kafka will coordinate the processes so the load is shared across processes. + +Authentication +-------------- + +In production, journal clients need credentials to access the journal. +Once you have credentials, they can be configured by adding this to the ``config``:: + + config = { + "sasl.mechanism": "SCRAM-SHA-512", + "security.protocol": "SASL_SSL", + "sasl.username": "", + "sasl.password": "", + } + +There are two types of client: privileged and unprivileged. +The former has access to all the data, the latter gets redacted authorship information, +for privacy reasons. +Instead, the ``name`` and ``email`` fields of ``author`` and ``committer`` attributes +of release and revision objects are blank, and their ``fullname`` is a SHA256 hash +of their actual fullname. +The ``privileged`` parameter to ``get_journal_client`` must be set accordingly. + +Order guarantees and replaying +------------------------------ + +The journal client shares the ordering guarantees of Kafka. +The short version is that you should not assume any order unless specified otherwise in +the `Kafka documentation `__, +nor that two related objects are sent to the same process. + +We call "replay" any workflow that involves a journal client writing all (or most) +objects to a new database. +This can be either continuous (in effect, this produces a mirror database), +or one-off. + +Either way, particular attention should be given to this lax ordering, as replaying +produces databases that are (temporarily) inconsistent, because some objects may +point to objects that are not replayed yet. + +For one-off replays, this can be mostly solved by processing objects +in reverse topologic order: +as contents don't reference any object, +directories only reference contents and directories, +revisions only reference directories, etc. ; +this means that replayers can first process all revisions, then all directories, +then all contents. +This keeps the number of inconsistencies relatively small. + +For continuous replays, replayed databases are eventually consistent. diff --git a/swh.journal.egg-info/PKG-INFO b/swh.journal.egg-info/PKG-INFO index 1b4f654..1da9607 100644 --- a/swh.journal.egg-info/PKG-INFO +++ b/swh.journal.egg-info/PKG-INFO @@ -1,76 +1,76 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.8.0 +Version: 0.9.0 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-journal/ Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing License-File: LICENSE License-File: AUTHORS swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` diff --git a/swh.journal.egg-info/SOURCES.txt b/swh.journal.egg-info/SOURCES.txt index 15665ce..1e46df5 100644 --- a/swh.journal.egg-info/SOURCES.txt +++ b/swh.journal.egg-info/SOURCES.txt @@ -1,49 +1,51 @@ .gitignore .pre-commit-config.yaml AUTHORS CODE_OF_CONDUCT.md CONTRIBUTORS LICENSE MANIFEST.in Makefile README.md mypy.ini pyproject.toml pytest.ini requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py tox.ini docs/.gitignore docs/Makefile docs/conf.py +docs/example-journal-client.py docs/index.rst +docs/journal-clients.rst docs/_static/.placeholder docs/_templates/.placeholder swh/__init__.py swh.journal.egg-info/PKG-INFO swh.journal.egg-info/SOURCES.txt swh.journal.egg-info/dependency_links.txt swh.journal.egg-info/entry_points.txt swh.journal.egg-info/requires.txt swh.journal.egg-info/top_level.txt swh/journal/__init__.py swh/journal/client.py swh/journal/py.typed swh/journal/pytest_plugin.py swh/journal/serializers.py swh/journal/tests/__init__.py swh/journal/tests/journal_data.py swh/journal/tests/log4j.properties swh/journal/tests/test_client.py swh/journal/tests/test_inmemory.py swh/journal/tests/test_kafka_writer.py swh/journal/tests/test_pytest_plugin.py swh/journal/tests/test_serializers.py swh/journal/tests/test_stream.py swh/journal/writer/__init__.py swh/journal/writer/inmemory.py swh/journal/writer/kafka.py swh/journal/writer/stream.py \ No newline at end of file diff --git a/swh/journal/client.py b/swh/journal/client.py index 704a2fe..ff6f0ef 100644 --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -1,306 +1,323 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import logging import os import time -from typing import Any, Dict, List, Optional, Set, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union from confluent_kafka import Consumer, KafkaError, KafkaException from swh.journal import DEFAULT_PREFIX from .serializers import kafka_to_value logger = logging.getLogger(__name__) rdkafka_logger = logging.getLogger(__name__ + ".rdkafka") # Only accepted offset reset policy accepted ACCEPTED_OFFSET_RESET = ["earliest", "latest"] # Errors that Kafka raises too often and are not useful; therefore they # we lower their log level to DEBUG instead of INFO. _SPAMMY_ERRORS = [ KafkaError._NO_OFFSET, ] def get_journal_client(cls: str, **kwargs: Any): """Factory function to instantiate a journal client object. Currently, only the "kafka" journal client is supported. """ if cls == "kafka": return JournalClient(**kwargs) raise ValueError("Unknown journal client class `%s`" % cls) def _error_cb(error): if error.fatal(): raise KafkaException(error) if error.code() in _SPAMMY_ERRORS: logger.debug("Received non-fatal kafka error: %s", error) else: logger.info("Received non-fatal kafka error: %s", error) def _on_commit(error, partitions): if error is not None: _error_cb(error) class JournalClient: """A base client for the Software Heritage journal. The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each object type using a specific topic under that prefix. If the `prefix` argument is None (default value), it will take the default value `'swh.journal.objects'`. Clients subscribe to events specific to each object type as listed in the `object_types` argument (if unset, defaults to all existing kafka topic under the prefix). Clients can be sharded by setting the `group_id` to a common value across instances. The journal will share the message throughput across the nodes sharing the same group_id. Messages are processed by the `worker_fn` callback passed to the `process` method, in batches of maximum `batch_size` messages (defaults to 200). + The objects passed to the `worker_fn` callback are the result of the kafka + message converted by the `value_deserializer` function. By default (if this + argument is not given), it will produce dicts (using the `kafka_to_value` + function). This signature of the function is: + + `value_deserializer(object_type: str, kafka_msg: bytes) -> Any` + + If the value returned by `value_deserializer` is None, it is ignored and + not passed the `worker_fn` function. + If set, the processing stops after processing `stop_after_objects` messages in total. `stop_on_eof` stops the processing when the client has reached the end of each partition in turn. `auto_offset_reset` sets the behavior of the client when the consumer group initializes: `'earliest'` (the default) processes all objects since the inception of the topics; `''` Any other named argument is passed directly to KafkaConsumer(). """ def __init__( self, brokers: Union[str, List[str]], group_id: str, prefix: Optional[str] = None, object_types: Optional[List[str]] = None, privileged: bool = False, stop_after_objects: Optional[int] = None, batch_size: int = 200, process_timeout: Optional[float] = None, auto_offset_reset: str = "earliest", stop_on_eof: bool = False, + value_deserializer: Optional[Callable[[str, bytes], Any]] = None, **kwargs, ): if prefix is None: prefix = DEFAULT_PREFIX if auto_offset_reset not in ACCEPTED_OFFSET_RESET: raise ValueError( "Option 'auto_offset_reset' only accept %s, not %s" % (ACCEPTED_OFFSET_RESET, auto_offset_reset) ) if batch_size <= 0: raise ValueError("Option 'batch_size' needs to be positive") - - self.value_deserializer = kafka_to_value + if value_deserializer: + self.value_deserializer = value_deserializer + else: + self.value_deserializer = lambda _, value: kafka_to_value(value) if isinstance(brokers, str): brokers = [brokers] debug_logging = rdkafka_logger.isEnabledFor(logging.DEBUG) if debug_logging and "debug" not in kwargs: kwargs["debug"] = "consumer" # Static group instance id management group_instance_id = os.environ.get("KAFKA_GROUP_INSTANCE_ID") if group_instance_id: kwargs["group.instance.id"] = group_instance_id if "group.instance.id" in kwargs: # When doing static consumer group membership, set a higher default # session timeout. The session timeout is the duration after which # the broker considers that a consumer has left the consumer group # for good, and triggers a rebalance. Considering our current # processing pattern, 10 minutes gives the consumer ample time to # restart before that happens. if "session.timeout.ms" not in kwargs: kwargs["session.timeout.ms"] = 10 * 60 * 1000 # 10 minutes if "session.timeout.ms" in kwargs: # When the session timeout is set, rdkafka requires the max poll # interval to be set to a higher value; the max poll interval is # rdkafka's way of figuring out whether the client's message # processing thread has stalled: when the max poll interval lapses # between two calls to consumer.poll(), rdkafka leaves the consumer # group and terminates the connection to the brokers. # # We default to 1.5 times the session timeout if "max.poll.interval.ms" not in kwargs: kwargs["max.poll.interval.ms"] = kwargs["session.timeout.ms"] // 2 * 3 consumer_settings = { **kwargs, "bootstrap.servers": ",".join(brokers), "auto.offset.reset": auto_offset_reset, "group.id": group_id, "on_commit": _on_commit, "error_cb": _error_cb, "enable.auto.commit": False, "logger": rdkafka_logger, } self.stop_on_eof = stop_on_eof if self.stop_on_eof: consumer_settings["enable.partition.eof"] = True logger.debug("Consumer settings: %s", consumer_settings) self.consumer = Consumer(consumer_settings) if privileged: privileged_prefix = f"{prefix}_privileged" else: # do not attempt to subscribe to privileged topics privileged_prefix = f"{prefix}" existing_topics = [ topic for topic in self.consumer.list_topics(timeout=10).topics.keys() if ( topic.startswith(f"{prefix}.") or topic.startswith(f"{privileged_prefix}.") ) ] if not existing_topics: raise ValueError( f"The prefix {prefix} does not match any existing topic " "on the kafka broker" ) if not object_types: object_types = list({topic.split(".")[-1] for topic in existing_topics}) self.subscription = [] unknown_types = [] for object_type in object_types: topics = (f"{privileged_prefix}.{object_type}", f"{prefix}.{object_type}") for topic in topics: if topic in existing_topics: self.subscription.append(topic) break else: unknown_types.append(object_type) if unknown_types: raise ValueError( f"Topic(s) for object types {','.join(unknown_types)} " "are unknown on the kafka broker" ) logger.debug(f"Upstream topics: {existing_topics}") self.subscribe() self.stop_after_objects = stop_after_objects self.process_timeout = process_timeout self.eof_reached: Set[Tuple[str, str]] = set() self.batch_size = batch_size def subscribe(self): """Subscribe to topics listed in self.subscription This can be overridden if you need, for instance, to manually assign partitions. """ logger.debug(f"Subscribing to: {self.subscription}") self.consumer.subscribe(topics=self.subscription) def process(self, worker_fn): """Polls Kafka for a batch of messages, and calls the worker_fn with these messages. Args: worker_fn Callable[Dict[str, List[dict]]]: Function called with the messages as argument. """ start_time = time.monotonic() total_objects_processed = 0 while True: # timeout for message poll timeout = 1.0 elapsed = time.monotonic() - start_time if self.process_timeout: # +0.01 to prevent busy-waiting on / spamming consumer.poll. # consumer.consume() returns shortly before X expired # (a matter of milliseconds), so after it returns a first # time, it would then be called with a timeout in the order # of milliseconds, therefore returning immediately, then be # called again, etc. if elapsed + 0.01 >= self.process_timeout: break timeout = self.process_timeout - elapsed batch_size = self.batch_size if self.stop_after_objects: if total_objects_processed >= self.stop_after_objects: break # clamp batch size to avoid overrunning stop_after_objects batch_size = min( self.stop_after_objects - total_objects_processed, batch_size, ) messages = self.consumer.consume(timeout=timeout, num_messages=batch_size) if not messages: continue batch_processed, at_eof = self.handle_messages(messages, worker_fn) total_objects_processed += batch_processed if at_eof: break return total_objects_processed def handle_messages(self, messages, worker_fn): objects: Dict[str, List[Any]] = defaultdict(list) nb_processed = 0 for message in messages: error = message.error() if error is not None: if error.code() == KafkaError._PARTITION_EOF: self.eof_reached.add((message.topic(), message.partition())) else: _error_cb(error) continue if message.value() is None: # ignore message with no payload, these can be generated in tests continue nb_processed += 1 object_type = message.topic().split(".")[-1] - objects[object_type].append(self.deserialize_message(message)) + deserialized_object = self.deserialize_message( + message, object_type=object_type + ) + if deserialized_object is not None: + objects[object_type].append(deserialized_object) if objects: worker_fn(dict(objects)) - self.consumer.commit() + self.consumer.commit() at_eof = self.stop_on_eof and all( (tp.topic, tp.partition) in self.eof_reached for tp in self.consumer.assignment() ) return nb_processed, at_eof - def deserialize_message(self, message): - return self.value_deserializer(message.value()) + def deserialize_message(self, message, object_type=None): + return self.value_deserializer(object_type, message.value()) def close(self): self.consumer.close() diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py index 63f8646..55649c7 100644 --- a/swh/journal/tests/test_client.py +++ b/swh/journal/tests/test_client.py @@ -1,329 +1,377 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Dict, List +from typing import Dict, List, cast from unittest.mock import MagicMock from confluent_kafka import Producer import pytest from swh.journal.client import JournalClient -from swh.journal.serializers import key_to_kafka, value_to_kafka -from swh.model.model import Content +from swh.journal.serializers import kafka_to_value, key_to_kafka, value_to_kafka +from swh.model.model import Content, Revision +from swh.model.tests.swh_model_data import TEST_OBJECTS REV = { "message": b"something cool", "author": {"fullname": b"Peter", "name": None, "email": b"peter@ouiche.lo"}, "committer": {"fullname": b"Stephen", "name": b"From Outer Space", "email": None}, "date": { "timestamp": {"seconds": 123456789, "microseconds": 123}, "offset": 120, "negative_utc": False, }, "committer_date": { "timestamp": {"seconds": 123123456, "microseconds": 0}, "offset": 0, "negative_utc": False, }, "type": "git", "directory": ( b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" b"\x01\x02\x03\x04\x05" ), "synthetic": False, "metadata": None, "parents": [], "id": b"\x8b\xeb\xd1\x9d\x07\xe2\x1e0\xe2 \x91X\x8d\xbd\x1c\xa8\x86\xdeB\x0c", } def test_client(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) # Fill Kafka producer.produce( topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), ) producer.flush() client = JournalClient( brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=1, ) worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({"revision": [REV]}) def test_client_eof(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) # Fill Kafka producer.produce( topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), ) producer.flush() client = JournalClient( brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=None, stop_on_eof=True, ) worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({"revision": [REV]}) @pytest.mark.parametrize("batch_size", [1, 5, 100]) def test_client_batch_size( kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, batch_size: int, ): num_objects = 2 * batch_size + 1 assert num_objects < 256, "Too many objects, generation will fail" producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) contents = [Content.from_data(bytes([i])) for i in range(num_objects)] # Fill Kafka for content in contents: producer.produce( topic=kafka_prefix + ".content", key=key_to_kafka(content.sha1), value=value_to_kafka(content.to_dict()), ) producer.flush() client = JournalClient( brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=num_objects, batch_size=batch_size, ) collected_output: List[Dict] = [] def worker_fn(objects): received = objects["content"] assert len(received) <= batch_size collected_output.extend(received) client.process(worker_fn) expected_output = [content.to_dict() for content in contents] assert len(collected_output) == len(expected_output) for output in collected_output: assert output in expected_output @pytest.fixture() def kafka_producer(kafka_prefix: str, kafka_server_base: str): producer = Producer( { "bootstrap.servers": kafka_server_base, "client.id": "test producer", "acks": "all", } ) # Fill Kafka producer.produce( topic=kafka_prefix + ".something", key=key_to_kafka(b"key1"), value=value_to_kafka("value1"), ) producer.produce( topic=kafka_prefix + ".else", key=key_to_kafka(b"key1"), value=value_to_kafka("value2"), ) producer.flush() return producer def test_client_subscribe_all( kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): client = JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix=kafka_prefix, stop_after_objects=2, ) assert set(client.subscription) == { f"{kafka_prefix}.something", f"{kafka_prefix}.else", } worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with( {"something": ["value1"], "else": ["value2"],} ) def test_client_subscribe_one_topic( kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): client = JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix=kafka_prefix, stop_after_objects=1, object_types=["else"], ) assert client.subscription == [f"{kafka_prefix}.else"] worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({"else": ["value2"]}) def test_client_subscribe_absent_topic( kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): with pytest.raises(ValueError): JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix=kafka_prefix, stop_after_objects=1, object_types=["really"], ) def test_client_subscribe_absent_prefix( kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): with pytest.raises(ValueError): JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix="wrong.prefix", stop_after_objects=1, ) with pytest.raises(ValueError): JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix="wrong.prefix", stop_after_objects=1, object_types=["else"], ) def test_client_subscriptions_with_anonymized_topics( kafka_prefix: str, kafka_consumer_group: str, kafka_server_base: str ): producer = Producer( { "bootstrap.servers": kafka_server_base, "client.id": "test producer", "acks": "all", } ) # Fill Kafka with revision object on both the regular prefix (normally for # anonymized objects in this case) and privileged one producer.produce( topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), ) producer.produce( topic=kafka_prefix + "_privileged.revision", key=REV["id"], value=value_to_kafka(REV), ) producer.flush() # without privileged "channels" activated on the client side client = JournalClient( brokers=[kafka_server_base], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=1, privileged=False, ) # we only subscribed to "standard" topics assert client.subscription == [kafka_prefix + ".revision"] # with privileged "channels" activated on the client side client = JournalClient( brokers=[kafka_server_base], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=1, privileged=True, ) # we only subscribed to "privileged" topics assert client.subscription == [kafka_prefix + "_privileged.revision"] def test_client_subscriptions_without_anonymized_topics( kafka_prefix: str, kafka_consumer_group: str, kafka_server_base: str ): producer = Producer( { "bootstrap.servers": kafka_server_base, "client.id": "test producer", "acks": "all", } ) # Fill Kafka with revision objects only on the standard prefix producer.produce( topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), ) producer.flush() # without privileged channel activated on the client side client = JournalClient( brokers=[kafka_server_base], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=1, privileged=False, ) # we only subscribed to the standard prefix assert client.subscription == [kafka_prefix + ".revision"] # with privileged channel activated on the client side client = JournalClient( brokers=[kafka_server_base], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=1, privileged=True, ) # we also only subscribed to the standard prefix, since there is no priviled prefix # on the kafka broker assert client.subscription == [kafka_prefix + ".revision"] + + +def test_client_with_deserializer( + kafka_prefix: str, kafka_consumer_group: str, kafka_server: str +): + producer = Producer( + { + "bootstrap.servers": kafka_server, + "client.id": "test producer", + "acks": "all", + } + ) + + # Fill Kafka + revisions = cast(List[Revision], TEST_OBJECTS["revision"]) + for rev in revisions: + producer.produce( + topic=kafka_prefix + ".revision", + key=rev.id, + value=value_to_kafka(rev.to_dict()), + ) + producer.flush() + + def custom_deserializer(object_type, msg): + assert object_type == "revision" + obj = kafka_to_value(msg) + # filter the first revision + if obj["id"] == revisions[0].id: + return None + return Revision.from_dict(obj) + + client = JournalClient( + brokers=[kafka_server], + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_after_objects=1, + value_deserializer=custom_deserializer, + ) + worker_fn = MagicMock() + client.process(worker_fn) + + # a commit seems to be needed to prevent some race condition situation + # where the worker_fn has not yet been called at this point (not sure how) + client.consumer.commit() + + # Check the first revision has not been passed to worker_fn + worker_fn.assert_called_once_with({"revision": revisions[1:]})