diff --git a/docs/journal-clients.rst b/docs/journal-clients.rst index 65e8f3f..3441ebb 100644 --- a/docs/journal-clients.rst +++ b/docs/journal-clients.rst @@ -1,76 +1,80 @@ .. _journal_clients: Software Heritage Journal clients ================================= Journal client are processes that read data from the |swh| Journal, in order to efficiently process all existing objects, and process new objects as they come. + Some journal clients, such as :ref:`swh-dataset ` only read existing objects and stop when they are done. -They can run in parallel, and the :mod:`swh.journal.client` module -provides an abstraction handling all the setup, so actual clients are actually -a single function that takes :mod:`model objects ` as parameters. +Other journal clients, such as the :ref:`mirror ` are expected to +read constantly from the journal. + +They can run in parallel, and the :mod:`swh.journal.client` module provides an +abstraction handling all the setup, so actual clients only consists in a single +function that takes :mod:`model objects ` as parameters. For example, a very simple journal client that prints all revisions and releases to the console can be implemented like this: .. literalinclude:: example-journal-client.py Parallelization --------------- A single journal client, like the one above, is sequential. It can however run concurrently by running the same program multiple times. Kafka will coordinate the processes so the load is shared across processes. Authentication -------------- In production, journal clients need credentials to access the journal. Once you have credentials, they can be configured by adding this to the ``config``:: config = { "sasl.mechanism": "SCRAM-SHA-512", "security.protocol": "SASL_SSL", "sasl.username": "", "sasl.password": "", } There are two types of client: privileged and unprivileged. The former has access to all the data, the latter gets redacted authorship information, for privacy reasons. Instead, the ``name`` and ``email`` fields of ``author`` and ``committer`` attributes of release and revision objects are blank, and their ``fullname`` is a SHA256 hash of their actual fullname. The ``privileged`` parameter to ``get_journal_client`` must be set accordingly. Order guarantees and replaying ------------------------------ The journal client shares the ordering guarantees of Kafka. The short version is that you should not assume any order unless specified otherwise in the `Kafka documentation `__, nor that two related objects are sent to the same process. We call "replay" any workflow that involves a journal client writing all (or most) objects to a new database. This can be either continuous (in effect, this produces a mirror database), or one-off. Either way, particular attention should be given to this lax ordering, as replaying produces databases that are (temporarily) inconsistent, because some objects may point to objects that are not replayed yet. For one-off replays, this can be mostly solved by processing objects in reverse topologic order: as contents don't reference any object, directories only reference contents and directories, revisions only reference directories, etc. ; this means that replayers can first process all revisions, then all directories, then all contents. This keeps the number of inconsistencies relatively small. For continuous replays, replayed databases are eventually consistent. diff --git a/swh/journal/client.py b/swh/journal/client.py index 6ad0780..ff6f0ef 100644 --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -1,306 +1,323 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import logging import os import time -from typing import Any, Dict, List, Optional, Set, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union from confluent_kafka import Consumer, KafkaError, KafkaException from swh.journal import DEFAULT_PREFIX from .serializers import kafka_to_value logger = logging.getLogger(__name__) rdkafka_logger = logging.getLogger(__name__ + ".rdkafka") # Only accepted offset reset policy accepted ACCEPTED_OFFSET_RESET = ["earliest", "latest"] # Errors that Kafka raises too often and are not useful; therefore they # we lower their log level to DEBUG instead of INFO. _SPAMMY_ERRORS = [ KafkaError._NO_OFFSET, ] def get_journal_client(cls: str, **kwargs: Any): """Factory function to instantiate a journal client object. Currently, only the "kafka" journal client is supported. """ if cls == "kafka": return JournalClient(**kwargs) raise ValueError("Unknown journal client class `%s`" % cls) def _error_cb(error): if error.fatal(): raise KafkaException(error) if error.code() in _SPAMMY_ERRORS: logger.debug("Received non-fatal kafka error: %s", error) else: logger.info("Received non-fatal kafka error: %s", error) def _on_commit(error, partitions): if error is not None: _error_cb(error) class JournalClient: """A base client for the Software Heritage journal. The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each object type using a specific topic under that prefix. If the `prefix` argument is None (default value), it will take the default value `'swh.journal.objects'`. Clients subscribe to events specific to each object type as listed in the `object_types` argument (if unset, defaults to all existing kafka topic under the prefix). Clients can be sharded by setting the `group_id` to a common value across instances. The journal will share the message throughput across the nodes sharing the same group_id. Messages are processed by the `worker_fn` callback passed to the `process` method, in batches of maximum `batch_size` messages (defaults to 200). + The objects passed to the `worker_fn` callback are the result of the kafka + message converted by the `value_deserializer` function. By default (if this + argument is not given), it will produce dicts (using the `kafka_to_value` + function). This signature of the function is: + + `value_deserializer(object_type: str, kafka_msg: bytes) -> Any` + + If the value returned by `value_deserializer` is None, it is ignored and + not passed the `worker_fn` function. + If set, the processing stops after processing `stop_after_objects` messages in total. `stop_on_eof` stops the processing when the client has reached the end of each partition in turn. `auto_offset_reset` sets the behavior of the client when the consumer group initializes: `'earliest'` (the default) processes all objects since the inception of the topics; `''` Any other named argument is passed directly to KafkaConsumer(). """ def __init__( self, brokers: Union[str, List[str]], group_id: str, prefix: Optional[str] = None, object_types: Optional[List[str]] = None, privileged: bool = False, stop_after_objects: Optional[int] = None, batch_size: int = 200, process_timeout: Optional[float] = None, auto_offset_reset: str = "earliest", stop_on_eof: bool = False, + value_deserializer: Optional[Callable[[str, bytes], Any]] = None, **kwargs, ): if prefix is None: prefix = DEFAULT_PREFIX if auto_offset_reset not in ACCEPTED_OFFSET_RESET: raise ValueError( "Option 'auto_offset_reset' only accept %s, not %s" % (ACCEPTED_OFFSET_RESET, auto_offset_reset) ) if batch_size <= 0: raise ValueError("Option 'batch_size' needs to be positive") - - self.value_deserializer = kafka_to_value + if value_deserializer: + self.value_deserializer = value_deserializer + else: + self.value_deserializer = lambda _, value: kafka_to_value(value) if isinstance(brokers, str): brokers = [brokers] debug_logging = rdkafka_logger.isEnabledFor(logging.DEBUG) if debug_logging and "debug" not in kwargs: kwargs["debug"] = "consumer" # Static group instance id management group_instance_id = os.environ.get("KAFKA_GROUP_INSTANCE_ID") if group_instance_id: kwargs["group.instance.id"] = group_instance_id if "group.instance.id" in kwargs: # When doing static consumer group membership, set a higher default # session timeout. The session timeout is the duration after which # the broker considers that a consumer has left the consumer group # for good, and triggers a rebalance. Considering our current # processing pattern, 10 minutes gives the consumer ample time to # restart before that happens. if "session.timeout.ms" not in kwargs: kwargs["session.timeout.ms"] = 10 * 60 * 1000 # 10 minutes if "session.timeout.ms" in kwargs: # When the session timeout is set, rdkafka requires the max poll # interval to be set to a higher value; the max poll interval is # rdkafka's way of figuring out whether the client's message # processing thread has stalled: when the max poll interval lapses # between two calls to consumer.poll(), rdkafka leaves the consumer # group and terminates the connection to the brokers. # # We default to 1.5 times the session timeout if "max.poll.interval.ms" not in kwargs: kwargs["max.poll.interval.ms"] = kwargs["session.timeout.ms"] // 2 * 3 consumer_settings = { **kwargs, "bootstrap.servers": ",".join(brokers), "auto.offset.reset": auto_offset_reset, "group.id": group_id, "on_commit": _on_commit, "error_cb": _error_cb, "enable.auto.commit": False, "logger": rdkafka_logger, } self.stop_on_eof = stop_on_eof if self.stop_on_eof: consumer_settings["enable.partition.eof"] = True logger.debug("Consumer settings: %s", consumer_settings) self.consumer = Consumer(consumer_settings) if privileged: privileged_prefix = f"{prefix}_privileged" else: # do not attempt to subscribe to privileged topics privileged_prefix = f"{prefix}" existing_topics = [ topic for topic in self.consumer.list_topics(timeout=10).topics.keys() if ( topic.startswith(f"{prefix}.") or topic.startswith(f"{privileged_prefix}.") ) ] if not existing_topics: raise ValueError( f"The prefix {prefix} does not match any existing topic " "on the kafka broker" ) if not object_types: object_types = list({topic.split(".")[-1] for topic in existing_topics}) self.subscription = [] unknown_types = [] for object_type in object_types: topics = (f"{privileged_prefix}.{object_type}", f"{prefix}.{object_type}") for topic in topics: if topic in existing_topics: self.subscription.append(topic) break else: unknown_types.append(object_type) if unknown_types: raise ValueError( f"Topic(s) for object types {','.join(unknown_types)} " "are unknown on the kafka broker" ) logger.debug(f"Upstream topics: {existing_topics}") self.subscribe() self.stop_after_objects = stop_after_objects self.process_timeout = process_timeout self.eof_reached: Set[Tuple[str, str]] = set() self.batch_size = batch_size def subscribe(self): """Subscribe to topics listed in self.subscription This can be overridden if you need, for instance, to manually assign partitions. """ logger.debug(f"Subscribing to: {self.subscription}") self.consumer.subscribe(topics=self.subscription) def process(self, worker_fn): """Polls Kafka for a batch of messages, and calls the worker_fn with these messages. Args: worker_fn Callable[Dict[str, List[dict]]]: Function called with the messages as argument. """ start_time = time.monotonic() total_objects_processed = 0 while True: # timeout for message poll timeout = 1.0 elapsed = time.monotonic() - start_time if self.process_timeout: # +0.01 to prevent busy-waiting on / spamming consumer.poll. # consumer.consume() returns shortly before X expired # (a matter of milliseconds), so after it returns a first # time, it would then be called with a timeout in the order # of milliseconds, therefore returning immediately, then be # called again, etc. if elapsed + 0.01 >= self.process_timeout: break timeout = self.process_timeout - elapsed batch_size = self.batch_size if self.stop_after_objects: if total_objects_processed >= self.stop_after_objects: break # clamp batch size to avoid overrunning stop_after_objects batch_size = min( self.stop_after_objects - total_objects_processed, batch_size, ) messages = self.consumer.consume(timeout=timeout, num_messages=batch_size) if not messages: continue batch_processed, at_eof = self.handle_messages(messages, worker_fn) total_objects_processed += batch_processed if at_eof: break return total_objects_processed def handle_messages(self, messages, worker_fn): objects: Dict[str, List[Any]] = defaultdict(list) nb_processed = 0 for message in messages: error = message.error() if error is not None: if error.code() == KafkaError._PARTITION_EOF: self.eof_reached.add((message.topic(), message.partition())) else: _error_cb(error) continue if message.value() is None: # ignore message with no payload, these can be generated in tests continue nb_processed += 1 object_type = message.topic().split(".")[-1] - objects[object_type].append(self.deserialize_message(message)) + deserialized_object = self.deserialize_message( + message, object_type=object_type + ) + if deserialized_object is not None: + objects[object_type].append(deserialized_object) if objects: worker_fn(dict(objects)) self.consumer.commit() at_eof = self.stop_on_eof and all( (tp.topic, tp.partition) in self.eof_reached for tp in self.consumer.assignment() ) return nb_processed, at_eof - def deserialize_message(self, message): - return self.value_deserializer(message.value()) + def deserialize_message(self, message, object_type=None): + return self.value_deserializer(object_type, message.value()) def close(self): self.consumer.close() diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py index 63f8646..98a9baa 100644 --- a/swh/journal/tests/test_client.py +++ b/swh/journal/tests/test_client.py @@ -1,329 +1,372 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Dict, List +from typing import Dict, List, cast from unittest.mock import MagicMock from confluent_kafka import Producer import pytest from swh.journal.client import JournalClient -from swh.journal.serializers import key_to_kafka, value_to_kafka -from swh.model.model import Content +from swh.journal.serializers import kafka_to_value, key_to_kafka, value_to_kafka +from swh.model.model import Content, Revision +from swh.model.tests.swh_model_data import TEST_OBJECTS REV = { "message": b"something cool", "author": {"fullname": b"Peter", "name": None, "email": b"peter@ouiche.lo"}, "committer": {"fullname": b"Stephen", "name": b"From Outer Space", "email": None}, "date": { "timestamp": {"seconds": 123456789, "microseconds": 123}, "offset": 120, "negative_utc": False, }, "committer_date": { "timestamp": {"seconds": 123123456, "microseconds": 0}, "offset": 0, "negative_utc": False, }, "type": "git", "directory": ( b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" b"\x01\x02\x03\x04\x05" ), "synthetic": False, "metadata": None, "parents": [], "id": b"\x8b\xeb\xd1\x9d\x07\xe2\x1e0\xe2 \x91X\x8d\xbd\x1c\xa8\x86\xdeB\x0c", } def test_client(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) # Fill Kafka producer.produce( topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), ) producer.flush() client = JournalClient( brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=1, ) worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({"revision": [REV]}) def test_client_eof(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) # Fill Kafka producer.produce( topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), ) producer.flush() client = JournalClient( brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=None, stop_on_eof=True, ) worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({"revision": [REV]}) @pytest.mark.parametrize("batch_size", [1, 5, 100]) def test_client_batch_size( kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, batch_size: int, ): num_objects = 2 * batch_size + 1 assert num_objects < 256, "Too many objects, generation will fail" producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) contents = [Content.from_data(bytes([i])) for i in range(num_objects)] # Fill Kafka for content in contents: producer.produce( topic=kafka_prefix + ".content", key=key_to_kafka(content.sha1), value=value_to_kafka(content.to_dict()), ) producer.flush() client = JournalClient( brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=num_objects, batch_size=batch_size, ) collected_output: List[Dict] = [] def worker_fn(objects): received = objects["content"] assert len(received) <= batch_size collected_output.extend(received) client.process(worker_fn) expected_output = [content.to_dict() for content in contents] assert len(collected_output) == len(expected_output) for output in collected_output: assert output in expected_output @pytest.fixture() def kafka_producer(kafka_prefix: str, kafka_server_base: str): producer = Producer( { "bootstrap.servers": kafka_server_base, "client.id": "test producer", "acks": "all", } ) # Fill Kafka producer.produce( topic=kafka_prefix + ".something", key=key_to_kafka(b"key1"), value=value_to_kafka("value1"), ) producer.produce( topic=kafka_prefix + ".else", key=key_to_kafka(b"key1"), value=value_to_kafka("value2"), ) producer.flush() return producer def test_client_subscribe_all( kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): client = JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix=kafka_prefix, stop_after_objects=2, ) assert set(client.subscription) == { f"{kafka_prefix}.something", f"{kafka_prefix}.else", } worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with( {"something": ["value1"], "else": ["value2"],} ) def test_client_subscribe_one_topic( kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): client = JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix=kafka_prefix, stop_after_objects=1, object_types=["else"], ) assert client.subscription == [f"{kafka_prefix}.else"] worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({"else": ["value2"]}) def test_client_subscribe_absent_topic( kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): with pytest.raises(ValueError): JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix=kafka_prefix, stop_after_objects=1, object_types=["really"], ) def test_client_subscribe_absent_prefix( kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): with pytest.raises(ValueError): JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix="wrong.prefix", stop_after_objects=1, ) with pytest.raises(ValueError): JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix="wrong.prefix", stop_after_objects=1, object_types=["else"], ) def test_client_subscriptions_with_anonymized_topics( kafka_prefix: str, kafka_consumer_group: str, kafka_server_base: str ): producer = Producer( { "bootstrap.servers": kafka_server_base, "client.id": "test producer", "acks": "all", } ) # Fill Kafka with revision object on both the regular prefix (normally for # anonymized objects in this case) and privileged one producer.produce( topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), ) producer.produce( topic=kafka_prefix + "_privileged.revision", key=REV["id"], value=value_to_kafka(REV), ) producer.flush() # without privileged "channels" activated on the client side client = JournalClient( brokers=[kafka_server_base], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=1, privileged=False, ) # we only subscribed to "standard" topics assert client.subscription == [kafka_prefix + ".revision"] # with privileged "channels" activated on the client side client = JournalClient( brokers=[kafka_server_base], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=1, privileged=True, ) # we only subscribed to "privileged" topics assert client.subscription == [kafka_prefix + "_privileged.revision"] def test_client_subscriptions_without_anonymized_topics( kafka_prefix: str, kafka_consumer_group: str, kafka_server_base: str ): producer = Producer( { "bootstrap.servers": kafka_server_base, "client.id": "test producer", "acks": "all", } ) # Fill Kafka with revision objects only on the standard prefix producer.produce( topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), ) producer.flush() # without privileged channel activated on the client side client = JournalClient( brokers=[kafka_server_base], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=1, privileged=False, ) # we only subscribed to the standard prefix assert client.subscription == [kafka_prefix + ".revision"] # with privileged channel activated on the client side client = JournalClient( brokers=[kafka_server_base], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=1, privileged=True, ) # we also only subscribed to the standard prefix, since there is no priviled prefix # on the kafka broker assert client.subscription == [kafka_prefix + ".revision"] + + +def test_client_with_deserializer( + kafka_prefix: str, kafka_consumer_group: str, kafka_server: str +): + producer = Producer( + { + "bootstrap.servers": kafka_server, + "client.id": "test producer", + "acks": "all", + } + ) + + # Fill Kafka + revisions = cast(List[Revision], TEST_OBJECTS["revision"]) + for rev in revisions: + producer.produce( + topic=kafka_prefix + ".revision", + key=rev.id, + value=value_to_kafka(rev.to_dict()), + ) + producer.flush() + + def custom_deserializer(object_type, msg): + assert object_type == "revision" + obj = kafka_to_value(msg) + # filter the first revision + if obj["id"] == revisions[0].id: + return None + return Revision.from_dict(obj) + + client = JournalClient( + brokers=[kafka_server], + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_after_objects=1, + value_deserializer=custom_deserializer, + ) + worker_fn = MagicMock() + client.process(worker_fn) + # check that the first Revision has not been passed to worker_fn + worker_fn.assert_called_once_with({"revision": revisions[1:]})