diff --git a/swh/journal/client.py b/swh/journal/client.py index 028f94b..a8c4ef5 100644 --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -1,355 +1,356 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict from importlib import import_module import logging import os from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union from confluent_kafka import Consumer, KafkaError, KafkaException from swh.core.statsd import statsd from swh.journal import DEFAULT_PREFIX from .serializers import kafka_to_value logger = logging.getLogger(__name__) rdkafka_logger = logging.getLogger(__name__ + ".rdkafka") # Only accepted offset reset policy accepted ACCEPTED_OFFSET_RESET = ["earliest", "latest"] # Errors that Kafka raises too often and are not useful; therefore they # we lower their log level to DEBUG instead of INFO. _SPAMMY_ERRORS = [ KafkaError._NO_OFFSET, ] JOURNAL_MESSAGE_NUMBER_METRIC = "swh_journal_client_handle_message_total" JOURNAL_STATUS_METRIC = "swh_journal_client_status" def get_journal_client(cls: str, **kwargs: Any): """Factory function to instantiate a journal client object. Currently, only the "kafka" journal client is supported. """ if cls == "kafka": if "stats_cb" in kwargs: stats_cb = kwargs["stats_cb"] if isinstance(stats_cb, str): try: module_path, func_name = stats_cb.split(":") except ValueError: raise ValueError( "Invalid stats_cb configuration option: " "it should be a string like 'path.to.module:function'" ) try: module = import_module(module_path, package=__package__) except ModuleNotFoundError: raise ValueError( "Invalid stats_cb configuration option: " f"module {module_path} not found" ) try: kwargs["stats_cb"] = getattr(module, func_name) except AttributeError: raise ValueError( "Invalid stats_cb configuration option: " f"function {func_name} not found in module {module_path}" ) return JournalClient(**kwargs) raise ValueError("Unknown journal client class `%s`" % cls) def _error_cb(error): if error.fatal(): raise KafkaException(error) if error.code() in _SPAMMY_ERRORS: logger.debug("Received non-fatal kafka error: %s", error) else: logger.info("Received non-fatal kafka error: %s", error) def _on_commit(error, partitions): if error is not None: _error_cb(error) class JournalClient: """A base client for the Software Heritage journal. The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each object type using a specific topic under that prefix. If the `prefix` argument is None (default value), it will take the default value `'swh.journal.objects'`. Clients subscribe to events specific to each object type as listed in the `object_types` argument (if unset, defaults to all existing kafka topic under the prefix). Clients can be sharded by setting the `group_id` to a common value across instances. The journal will share the message throughput across the nodes sharing the same group_id. Messages are processed by the `worker_fn` callback passed to the `process` method, in batches of maximum `batch_size` messages (defaults to 200). The objects passed to the `worker_fn` callback are the result of the kafka message converted by the `value_deserializer` function. By default (if this argument is not given), it will produce dicts (using the `kafka_to_value` function). This signature of the function is: `value_deserializer(object_type: str, kafka_msg: bytes) -> Any` If the value returned by `value_deserializer` is None, it is ignored and not passed the `worker_fn` function. If set, the processing stops after processing `stop_after_objects` messages in total. `stop_on_eof` stops the processing when the client has reached the end of each partition in turn. `auto_offset_reset` sets the behavior of the client when the consumer group initializes: `'earliest'` (the default) processes all objects since the inception of the topics; `''` Any other named argument is passed directly to KafkaConsumer(). """ def __init__( self, brokers: Union[str, List[str]], group_id: str, prefix: Optional[str] = None, object_types: Optional[List[str]] = None, privileged: bool = False, stop_after_objects: Optional[int] = None, batch_size: int = 200, process_timeout: Optional[float] = None, auto_offset_reset: str = "earliest", stop_on_eof: bool = False, value_deserializer: Optional[Callable[[str, bytes], Any]] = None, **kwargs, ): if prefix is None: prefix = DEFAULT_PREFIX if auto_offset_reset not in ACCEPTED_OFFSET_RESET: raise ValueError( "Option 'auto_offset_reset' only accept %s, not %s" % (ACCEPTED_OFFSET_RESET, auto_offset_reset) ) if batch_size <= 0: raise ValueError("Option 'batch_size' needs to be positive") if value_deserializer: self.value_deserializer = value_deserializer else: self.value_deserializer = lambda _, value: kafka_to_value(value) if isinstance(brokers, str): brokers = [brokers] debug_logging = rdkafka_logger.isEnabledFor(logging.DEBUG) if debug_logging and "debug" not in kwargs: kwargs["debug"] = "consumer" # Static group instance id management group_instance_id = os.environ.get("KAFKA_GROUP_INSTANCE_ID") if group_instance_id: kwargs["group.instance.id"] = group_instance_id if "group.instance.id" in kwargs: # When doing static consumer group membership, set a higher default # session timeout. The session timeout is the duration after which # the broker considers that a consumer has left the consumer group # for good, and triggers a rebalance. Considering our current # processing pattern, 10 minutes gives the consumer ample time to # restart before that happens. if "session.timeout.ms" not in kwargs: kwargs["session.timeout.ms"] = 10 * 60 * 1000 # 10 minutes if "session.timeout.ms" in kwargs: # When the session timeout is set, rdkafka requires the max poll # interval to be set to a higher value; the max poll interval is # rdkafka's way of figuring out whether the client's message # processing thread has stalled: when the max poll interval lapses # between two calls to consumer.poll(), rdkafka leaves the consumer # group and terminates the connection to the brokers. # # We default to 1.5 times the session timeout if "max.poll.interval.ms" not in kwargs: kwargs["max.poll.interval.ms"] = kwargs["session.timeout.ms"] // 2 * 3 consumer_settings = { **kwargs, "bootstrap.servers": ",".join(brokers), "auto.offset.reset": auto_offset_reset, "group.id": group_id, "on_commit": _on_commit, "error_cb": _error_cb, "enable.auto.commit": False, "logger": rdkafka_logger, } self.stop_on_eof = stop_on_eof if self.stop_on_eof: consumer_settings["enable.partition.eof"] = True logger.debug("Consumer settings: %s", consumer_settings) self.consumer = Consumer(consumer_settings) if privileged: privileged_prefix = f"{prefix}_privileged" else: # do not attempt to subscribe to privileged topics privileged_prefix = f"{prefix}" existing_topics = [ topic for topic in self.consumer.list_topics(timeout=10).topics.keys() if ( topic.startswith(f"{prefix}.") or topic.startswith(f"{privileged_prefix}.") ) ] if not existing_topics: raise ValueError( f"The prefix {prefix} does not match any existing topic " "on the kafka broker" ) if not object_types: object_types = list({topic.split(".")[-1] for topic in existing_topics}) self.subscription = [] unknown_types = [] for object_type in object_types: topics = (f"{privileged_prefix}.{object_type}", f"{prefix}.{object_type}") for topic in topics: if topic in existing_topics: self.subscription.append(topic) break else: unknown_types.append(object_type) if unknown_types: raise ValueError( f"Topic(s) for object types {','.join(unknown_types)} " "are unknown on the kafka broker" ) logger.debug(f"Upstream topics: {existing_topics}") self.subscribe() self.stop_after_objects = stop_after_objects self.eof_reached: Set[Tuple[str, str]] = set() self.batch_size = batch_size if process_timeout is not None: raise DeprecationWarning( "'process_timeout' argument is not supported anymore by " "JournalClient; please remove it from your configuration.", ) def subscribe(self): """Subscribe to topics listed in self.subscription This can be overridden if you need, for instance, to manually assign partitions. """ logger.debug(f"Subscribing to: {self.subscription}") self.consumer.subscribe(topics=self.subscription) def process(self, worker_fn): """Polls Kafka for a batch of messages, and calls the worker_fn with these messages. Args: worker_fn Callable[Dict[str, List[dict]]]: Function called with the messages as argument. """ total_objects_processed = 0 # timeout for message poll timeout = 1.0 with statsd.status_gauge( JOURNAL_STATUS_METRIC, statuses=["idle", "processing", "waiting"] ) as set_status: set_status("idle") while True: batch_size = self.batch_size if self.stop_after_objects: if total_objects_processed >= self.stop_after_objects: break # clamp batch size to avoid overrunning stop_after_objects batch_size = min( - self.stop_after_objects - total_objects_processed, batch_size, + self.stop_after_objects - total_objects_processed, + batch_size, ) set_status("waiting") while True: messages = self.consumer.consume( timeout=timeout, num_messages=batch_size ) if messages: break set_status("processing") batch_processed, at_eof = self.handle_messages(messages, worker_fn) set_status("idle") # report the number of handled messages statsd.increment(JOURNAL_MESSAGE_NUMBER_METRIC, value=batch_processed) total_objects_processed += batch_processed if at_eof: break return total_objects_processed def handle_messages(self, messages, worker_fn): objects: Dict[str, List[Any]] = defaultdict(list) nb_processed = 0 for message in messages: error = message.error() if error is not None: if error.code() == KafkaError._PARTITION_EOF: self.eof_reached.add((message.topic(), message.partition())) else: _error_cb(error) continue if message.value() is None: # ignore message with no payload, these can be generated in tests continue nb_processed += 1 object_type = message.topic().split(".")[-1] deserialized_object = self.deserialize_message( message, object_type=object_type ) if deserialized_object is not None: objects[object_type].append(deserialized_object) if objects: worker_fn(dict(objects)) self.consumer.commit() at_eof = self.stop_on_eof and all( (tp.topic, tp.partition) in self.eof_reached for tp in self.consumer.assignment() ) return nb_processed, at_eof def deserialize_message(self, message, object_type=None): return self.value_deserializer(object_type, message.value()) def close(self): self.consumer.close() diff --git a/swh/journal/pytest_plugin.py b/swh/journal/pytest_plugin.py index eb82b69..3476d4e 100644 --- a/swh/journal/pytest_plugin.py +++ b/swh/journal/pytest_plugin.py @@ -1,261 +1,257 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import random import string from typing import Any, Collection, Dict, Iterator, Optional import attr from confluent_kafka import Consumer, KafkaException, Producer from confluent_kafka.admin import AdminClient import pytest from swh.journal.serializers import kafka_to_key, kafka_to_value, pprint_key from swh.model.tests.swh_model_data import TEST_OBJECTS def ensure_lists(value: Any) -> Any: """ >>> ensure_lists(["foo", 42]) ['foo', 42] >>> ensure_lists(("foo", 42)) ['foo', 42] >>> ensure_lists({"a": ["foo", 42]}) {'a': ['foo', 42]} >>> ensure_lists({"a": ("foo", 42)}) {'a': ['foo', 42]} """ if isinstance(value, (tuple, list)): return list(map(ensure_lists, value)) elif isinstance(value, dict): return dict(ensure_lists(list(value.items()))) else: return value def consume_messages(consumer, kafka_prefix, expected_messages): """Consume expected_messages from the consumer; Sort them all into a consumed_objects dict""" consumed_messages = defaultdict(list) fetched_messages = 0 retries_left = 1000 while fetched_messages < expected_messages: if retries_left == 0: raise ValueError( "Timed out fetching messages from kafka. " f"Only {fetched_messages}/{expected_messages} fetched" ) msg = consumer.poll(timeout=0.1) if not msg: retries_left -= 1 continue error = msg.error() if error is not None: if error.fatal(): raise KafkaException(error) retries_left -= 1 continue fetched_messages += 1 topic = msg.topic() assert topic.startswith(f"{kafka_prefix}.") or topic.startswith( f"{kafka_prefix}_privileged." ), "Unexpected topic" object_type = topic[len(kafka_prefix + ".") :] consumed_messages[object_type].append( (kafka_to_key(msg.key()), kafka_to_value(msg.value())) ) return consumed_messages def assert_all_objects_consumed( consumed_messages: Dict, exclude: Optional[Collection] = None ): """Check whether all objects from TEST_OBJECTS have been consumed `exclude` can be a list of object types for which we do not want to compare the values (eg. for anonymized object). """ for object_type, known_objects in TEST_OBJECTS.items(): known_keys = [obj.unique_key() for obj in known_objects] if not consumed_messages[object_type]: return (received_keys, received_values) = zip(*consumed_messages[object_type]) if object_type in ("content", "skipped_content"): for value in received_values: value.pop("ctime", None) if object_type == "content": known_objects = [attr.evolve(o, data=None) for o in known_objects] for key in known_keys: assert key in received_keys, ( f"expected {object_type} key {pprint_key(key)} " "absent from consumed messages" ) if exclude and object_type in exclude: continue for value in known_objects: expected_value = value.to_dict() if value.object_type in ("content", "skipped_content"): expected_value.pop("ctime", None) assert ensure_lists(expected_value) in received_values, ( f"expected {object_type} value {value!r} is " "absent from consumed messages" ) @pytest.fixture(scope="function") def kafka_prefix(): """Pick a random prefix for kafka topics on each call""" return "".join(random.choice(string.ascii_lowercase) for _ in range(10)) @pytest.fixture(scope="function") def kafka_consumer_group(kafka_prefix: str): """Pick a random consumer group for kafka consumers on each call""" return "test-consumer-%s" % kafka_prefix @pytest.fixture(scope="function") def object_types(): """Set of object types to precreate topics for.""" return set(TEST_OBJECTS.keys()) @pytest.fixture(scope="function") def privileged_object_types(): """Set of object types to precreate privileged topics for.""" return {"revision", "release"} @pytest.fixture(scope="function") def kafka_server( kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str], ) -> str: """A kafka server with existing topics Unprivileged topics are built as ``{kafka_prefix}.{object_type}`` with object_type from the ``object_types`` list. Privileged topics are built as ``{kafka_prefix}_privileged.{object_type}`` with object_type from the ``privileged_object_types`` list. """ topics = [f"{kafka_prefix}.{obj}" for obj in object_types] + [ f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types ] # unfortunately, the Mock broker does not support the CreatTopic admin API, so we # have to create topics using a Producer. producer = Producer( { "bootstrap.servers": kafka_server_base, "client.id": "bootstrap producer", "acks": "all", } ) for topic in topics: producer.produce(topic=topic, value=None) for i in range(10): if producer.flush(0.1) == 0: break return kafka_server_base @pytest.fixture(scope="session") def kafka_server_base() -> Iterator[str]: """Create a mock kafka cluster suitable for tests. Yield a connection string. Note: this is a generator to keep the mock broker alive during the whole test session. see https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_mock.h """ admin = AdminClient({"test.mock.num.brokers": "1"}) metadata = admin.list_topics() brokers = [str(broker) for broker in metadata.brokers.values()] assert len(brokers) == 1, "More than one broker found in the kafka cluster?!" broker_connstr, broker_id = brokers[0].split("/") yield broker_connstr TEST_CONFIG = { "consumer_id": "swh.journal.consumer", "stop_on_eof": True, "storage": {"cls": "memory", "args": {}}, } @pytest.fixture def test_config( kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str], ): - """Test configuration needed for producer/consumer - - """ + """Test configuration needed for producer/consumer""" return { **TEST_CONFIG, "object_types": object_types, "privileged_object_types": privileged_object_types, "brokers": [kafka_server_base], "prefix": kafka_prefix, } @pytest.fixture def consumer( kafka_server: str, test_config: Dict, kafka_consumer_group: str ) -> Consumer: - """Get a connected Kafka consumer. - - """ + """Get a connected Kafka consumer.""" consumer = Consumer( { "bootstrap.servers": kafka_server, "auto.offset.reset": "earliest", "enable.auto.commit": True, "group.id": kafka_consumer_group, } ) prefix = test_config["prefix"] kafka_topics = [ f"{prefix}.{object_type}" for object_type in test_config["object_types"] ] + [ f"{prefix}_privileged.{object_type}" for object_type in test_config["privileged_object_types"] ] consumer.subscribe(kafka_topics) yield consumer # Explicitly perform the commit operation on the consumer before closing it # to avoid possible hang since confluent-kafka v1.6.0 consumer.commit() consumer.close() diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py index 007e652..3d032d5 100644 --- a/swh/journal/tests/test_client.py +++ b/swh/journal/tests/test_client.py @@ -1,397 +1,409 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict, List, cast from unittest.mock import MagicMock from confluent_kafka import Producer import pytest from swh.journal.client import JournalClient from swh.journal.serializers import kafka_to_value, key_to_kafka, value_to_kafka from swh.model.model import Content, Revision from swh.model.tests.swh_model_data import TEST_OBJECTS REV = { "message": b"something cool", "author": {"fullname": b"Peter", "name": None, "email": b"peter@ouiche.lo"}, "committer": {"fullname": b"Stephen", "name": b"From Outer Space", "email": None}, "date": { "timestamp": {"seconds": 123456789, "microseconds": 123}, "offset": 120, "negative_utc": False, }, "committer_date": { "timestamp": {"seconds": 123123456, "microseconds": 0}, "offset": 0, "negative_utc": False, }, "type": "git", "directory": ( b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" b"\x01\x02\x03\x04\x05" ), "synthetic": False, "metadata": None, "parents": [], "id": b"\x8b\xeb\xd1\x9d\x07\xe2\x1e0\xe2 \x91X\x8d\xbd\x1c\xa8\x86\xdeB\x0c", } def test_client(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) # Fill Kafka producer.produce( - topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), + topic=kafka_prefix + ".revision", + key=REV["id"], + value=value_to_kafka(REV), ) producer.flush() client = JournalClient( brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, ) worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({"revision": [REV]}) @pytest.mark.parametrize("count", [1, 2]) def test_client_stop_after_objects( kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, count: int ): producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) # Fill Kafka revisions = cast(List[Revision], TEST_OBJECTS["revision"]) for rev in revisions: producer.produce( topic=kafka_prefix + ".revision", key=rev.id, value=value_to_kafka(rev.to_dict()), ) producer.flush() client = JournalClient( brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=False, stop_after_objects=count, ) worker_fn = MagicMock() client.process(worker_fn) # this code below is not pretty, but needed since we have to deal with # dicts (so no set) which can have values that are list vs tuple, and we do # not know for sure how many calls of the worker_fn will happen during the # consumption of the topic... worker_fn.assert_called() revs = [] # list of (unique) rev dicts we got from the client for call in worker_fn.call_args_list: callrevs = call[0][0]["revision"] for rev in callrevs: assert Revision.from_dict(rev) in revisions if rev not in revs: revs.append(rev) assert len(revs) == count @pytest.mark.parametrize("batch_size", [1, 5, 100]) def test_client_batch_size( - kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, batch_size: int, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: str, + batch_size: int, ): num_objects = 2 * batch_size + 1 assert num_objects < 256, "Too many objects, generation will fail" producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) contents = [Content.from_data(bytes([i])) for i in range(num_objects)] # Fill Kafka for content in contents: producer.produce( topic=kafka_prefix + ".content", key=key_to_kafka(content.sha1), value=value_to_kafka(content.to_dict()), ) producer.flush() client = JournalClient( brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, batch_size=batch_size, ) collected_output: List[Dict] = [] def worker_fn(objects): received = objects["content"] assert len(received) <= batch_size collected_output.extend(received) client.process(worker_fn) expected_output = [content.to_dict() for content in contents] assert len(collected_output) == len(expected_output) for output in collected_output: assert output in expected_output @pytest.fixture() def kafka_producer(kafka_prefix: str, kafka_server_base: str): producer = Producer( { "bootstrap.servers": kafka_server_base, "client.id": "test producer", "acks": "all", } ) # Fill Kafka producer.produce( topic=kafka_prefix + ".something", key=key_to_kafka(b"key1"), value=value_to_kafka("value1"), ) producer.produce( topic=kafka_prefix + ".else", key=key_to_kafka(b"key1"), value=value_to_kafka("value2"), ) producer.flush() return producer def test_client_subscribe_all( kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): client = JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix=kafka_prefix, stop_on_eof=True, ) assert set(client.subscription) == { f"{kafka_prefix}.something", f"{kafka_prefix}.else", } worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with( - {"something": ["value1"], "else": ["value2"],} + { + "something": ["value1"], + "else": ["value2"], + } ) def test_client_subscribe_one_topic( kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): client = JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix=kafka_prefix, stop_on_eof=True, object_types=["else"], ) assert client.subscription == [f"{kafka_prefix}.else"] worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({"else": ["value2"]}) def test_client_subscribe_absent_topic( kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): with pytest.raises(ValueError): JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix=kafka_prefix, stop_on_eof=True, object_types=["really"], ) def test_client_subscribe_absent_prefix( kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): with pytest.raises(ValueError): JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix="wrong.prefix", stop_on_eof=True, ) with pytest.raises(ValueError): JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix="wrong.prefix", stop_on_eof=True, object_types=["else"], ) def test_client_subscriptions_with_anonymized_topics( kafka_prefix: str, kafka_consumer_group: str, kafka_server_base: str ): producer = Producer( { "bootstrap.servers": kafka_server_base, "client.id": "test producer", "acks": "all", } ) # Fill Kafka with revision object on both the regular prefix (normally for # anonymized objects in this case) and privileged one producer.produce( - topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), + topic=kafka_prefix + ".revision", + key=REV["id"], + value=value_to_kafka(REV), ) producer.produce( topic=kafka_prefix + "_privileged.revision", key=REV["id"], value=value_to_kafka(REV), ) producer.flush() # without privileged "channels" activated on the client side client = JournalClient( brokers=[kafka_server_base], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, privileged=False, ) # we only subscribed to "standard" topics assert client.subscription == [kafka_prefix + ".revision"] # with privileged "channels" activated on the client side client = JournalClient( brokers=[kafka_server_base], group_id=kafka_consumer_group, prefix=kafka_prefix, privileged=True, ) # we only subscribed to "privileged" topics assert client.subscription == [kafka_prefix + "_privileged.revision"] def test_client_subscriptions_without_anonymized_topics( kafka_prefix: str, kafka_consumer_group: str, kafka_server_base: str ): producer = Producer( { "bootstrap.servers": kafka_server_base, "client.id": "test producer", "acks": "all", } ) # Fill Kafka with revision objects only on the standard prefix producer.produce( - topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), + topic=kafka_prefix + ".revision", + key=REV["id"], + value=value_to_kafka(REV), ) producer.flush() # without privileged channel activated on the client side client = JournalClient( brokers=[kafka_server_base], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, privileged=False, ) # we only subscribed to the standard prefix assert client.subscription == [kafka_prefix + ".revision"] # with privileged channel activated on the client side client = JournalClient( brokers=[kafka_server_base], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, privileged=True, ) # we also only subscribed to the standard prefix, since there is no priviled prefix # on the kafka broker assert client.subscription == [kafka_prefix + ".revision"] def test_client_with_deserializer( kafka_prefix: str, kafka_consumer_group: str, kafka_server: str ): producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) # Fill Kafka revisions = cast(List[Revision], TEST_OBJECTS["revision"]) for rev in revisions: producer.produce( topic=kafka_prefix + ".revision", key=rev.id, value=value_to_kafka(rev.to_dict()), ) producer.flush() def custom_deserializer(object_type, msg): assert object_type == "revision" obj = kafka_to_value(msg) # filter the first revision if obj["id"] == revisions[0].id: return None return Revision.from_dict(obj) client = JournalClient( brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, value_deserializer=custom_deserializer, ) worker_fn = MagicMock() client.process(worker_fn) # a commit seems to be needed to prevent some race condition situation # where the worker_fn has not yet been called at this point (not sure how) client.consumer.commit() # Check the first revision has not been passed to worker_fn processed_revisions = set(worker_fn.call_args[0][0]["revision"]) assert revisions[0] not in processed_revisions assert all(rev in processed_revisions for rev in revisions[1:]) diff --git a/swh/journal/tests/test_pytest_plugin.py b/swh/journal/tests/test_pytest_plugin.py index ece39ba..b5966da 100644 --- a/swh/journal/tests/test_pytest_plugin.py +++ b/swh/journal/tests/test_pytest_plugin.py @@ -1,73 +1,76 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Iterator from confluent_kafka.admin import AdminClient def test_kafka_server(kafka_server_base: str): ip, port_str = kafka_server_base.split(":") assert ip == "127.0.0.1" assert int(port_str) admin = AdminClient({"bootstrap.servers": kafka_server_base}) topics = admin.list_topics() assert len(topics.brokers) == 1 def test_kafka_server_with_topics( kafka_server: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str], ): admin = AdminClient({"bootstrap.servers": kafka_server}) # check unprivileged topics are present topics = { topic for topic in admin.list_topics().topics if topic.startswith(f"{kafka_prefix}.") } assert topics == {f"{kafka_prefix}.{obj}" for obj in object_types} # check privileged topics are present topics = { topic for topic in admin.list_topics().topics if topic.startswith(f"{kafka_prefix}_privileged.") } assert topics == { f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types } def test_test_config(test_config: dict, kafka_prefix: str, kafka_server_base: str): assert test_config == { "consumer_id": "swh.journal.consumer", "stop_on_eof": True, "storage": {"cls": "memory", "args": {}}, "object_types": { "content", "directory", "extid", "metadata_authority", "metadata_fetcher", "origin", "origin_visit", "origin_visit_status", "raw_extrinsic_metadata", "release", "revision", "snapshot", "skipped_content", }, - "privileged_object_types": {"release", "revision",}, + "privileged_object_types": { + "release", + "revision", + }, "brokers": [kafka_server_base], "prefix": kafka_prefix, } diff --git a/swh/journal/tests/test_serializers.py b/swh/journal/tests/test_serializers.py index b94825f..5dde7bd 100644 --- a/swh/journal/tests/test_serializers.py +++ b/swh/journal/tests/test_serializers.py @@ -1,115 +1,119 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import OrderedDict from datetime import datetime, timedelta, timezone import itertools from typing import Iterable import pytest from swh.journal import serializers from swh.model.tests.swh_model_data import TEST_OBJECTS def test_key_to_kafka_repeatable(): """Check the kafka key encoding is repeatable""" base_dict = { "a": "foo", "b": "bar", "c": "baz", } key = serializers.key_to_kafka(base_dict) for dict_keys in itertools.permutations(base_dict): d = OrderedDict() for k in dict_keys: d[k] = base_dict[k] assert key == serializers.key_to_kafka(d) def test_pprint_key(): """Test whether get_key works on all our objects""" for object_type, objects in TEST_OBJECTS.items(): for obj in objects: key = obj.unique_key() pprinted_key = serializers.pprint_key(key) assert isinstance(pprinted_key, str) if isinstance(key, dict): assert pprinted_key[0], pprinted_key[-1] == "{}" for dict_key in key.keys(): assert f"{dict_key}:" in pprinted_key if isinstance(key, bytes): assert pprinted_key == key.hex() def test_kafka_to_key(): - """Standard back and forth serialization with keys - - """ + """Standard back and forth serialization with keys""" # All KeyType(s) keys: Iterable[serializers.KeyType] = [ - {"a": "foo", "b": "bar", "c": "baz",}, - {"a": b"foobarbaz",}, + { + "a": "foo", + "b": "bar", + "c": "baz", + }, + { + "a": b"foobarbaz", + }, b"foo", ] for object_type, objects in TEST_OBJECTS.items(): for obj in objects: key = obj.unique_key() keys.append(key) for key in keys: ktk = serializers.key_to_kafka(key) v = serializers.kafka_to_key(ktk) assert v == key # limits of supported int values by msgpack -MININT = -(2 ** 63) -MAXINT = 2 ** 64 - 1 +MININT = -(2**63) +MAXINT = 2**64 - 1 intvalues = [ MININT * 2, MININT - 1, MININT, MININT + 1, -10, 0, 10, MAXINT - 1, MAXINT, MAXINT + 1, MAXINT * 2, ] @pytest.mark.parametrize("value", intvalues) def test_encode_int(value): assert serializers.kafka_to_value(serializers.value_to_kafka(value)) == value datevalues = [ datetime.now(tz=timezone.utc), datetime.now(tz=timezone(timedelta(hours=-23, minutes=-59))), datetime.now(tz=timezone(timedelta(hours=23, minutes=59))), datetime(1, 1, 1, 1, 1, tzinfo=timezone.utc), datetime(2100, 1, 1, 1, 1, tzinfo=timezone.utc), ] @pytest.mark.parametrize("value", datevalues) def test_encode_datetime(value): assert serializers.kafka_to_value(serializers.value_to_kafka(value)) == value @pytest.mark.parametrize("value", datevalues) def test_encode_datetime_bw(value): bwdate = {b"swhtype": "datetime", b"d": value.isoformat()} assert serializers.kafka_to_value(serializers.value_to_kafka(bwdate)) == value diff --git a/swh/journal/tests/test_stream.py b/swh/journal/tests/test_stream.py index c9bfc90..b11b8c4 100644 --- a/swh/journal/tests/test_stream.py +++ b/swh/journal/tests/test_stream.py @@ -1,47 +1,49 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import io import msgpack from swh.journal.serializers import msgpack_ext_hook from swh.journal.writer import get_journal_writer, model_object_dict_sanitizer from swh.model.tests.swh_model_data import TEST_OBJECTS def test_write_additions_with_test_objects(): outs = io.BytesIO() writer = get_journal_writer( - cls="stream", value_sanitizer=model_object_dict_sanitizer, output_stream=outs, + cls="stream", + value_sanitizer=model_object_dict_sanitizer, + output_stream=outs, ) expected = [] n = 0 for object_type, objects in TEST_OBJECTS.items(): writer.write_additions(object_type, objects) for object in objects: objd = object.to_dict() if object_type == "content": objd.pop("data") expected.append((object_type, objd)) n += len(objects) outs.seek(0, 0) unpacker = msgpack.Unpacker( outs, raw=False, ext_hook=msgpack_ext_hook, strict_map_key=False, use_list=False, timestamp=3, # convert Timestamp in datetime objects (tz UTC) ) for i, (objtype, objd) in enumerate(unpacker, start=1): assert (objtype, objd) in expected assert len(expected) == i diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py index 2434146..35f12c5 100644 --- a/swh/journal/writer/kafka.py +++ b/swh/journal/writer/kafka.py @@ -1,273 +1,275 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import time from typing import ( Any, Callable, Dict, Generic, Iterable, List, NamedTuple, Optional, Type, TypeVar, ) from confluent_kafka import KafkaException, Producer from swh.journal.serializers import KeyType, key_to_kafka, pprint_key, value_to_kafka from . import ValueProtocol logger = logging.getLogger(__name__) class DeliveryTag(NamedTuple): """Unique tag allowing us to check for a message delivery""" topic: str kafka_key: bytes class DeliveryFailureInfo(NamedTuple): """Verbose information for failed deliveries""" object_type: str key: KeyType message: str code: str def get_object_type(topic: str) -> str: """Get the object type from a topic string""" return topic.rsplit(".", 1)[-1] class KafkaDeliveryError(Exception): """Delivery failed on some kafka messages.""" def __init__(self, message: str, delivery_failures: Iterable[DeliveryFailureInfo]): self.message = message self.delivery_failures = list(delivery_failures) def pretty_failures(self) -> str: return ", ".join( f"{f.object_type} {pprint_key(f.key)} ({f.message})" for f in self.delivery_failures ) def __str__(self): return f"KafkaDeliveryError({self.message}, [{self.pretty_failures()}])" TValue = TypeVar("TValue", bound=ValueProtocol) class KafkaJournalWriter(Generic[TValue]): """This class is used to write serialized versions of value objects to a series of Kafka topics. The type parameter `TValue`, which must implement the `ValueProtocol`, is the type of values this writer will write. Typically, `TValue` will be `swh.model.model.BaseModel`. Topics used to send objects representations are built from a ``prefix`` plus the type of the object: ``{prefix}.{object_type}`` Objects can be sent as is, or can be anonymized. The anonymization feature, when activated, will write anonymized versions of value objects in the main topic, and stock (non-anonymized) objects will be sent to a dedicated (privileged) set of topics: ``{prefix}_privileged.{object_type}`` The anonymization of a value object is the result of calling its ``anonymize()`` method. An object is considered anonymizable if this method returns a (non-None) value. Args: brokers: list of broker addresses and ports. prefix: the prefix used to build the topic names for objects. client_id: the id of the writer sent to kafka. value_sanitizer: a function that takes the object type and the dict representation of an object as argument, and returns an other dict that should be actually stored in the journal (eg. removing keys that do no belong there) producer_config: extra configuration keys passed to the `Producer`. flush_timeout: timeout, in seconds, after which the `flush` operation will fail if some message deliveries are still pending. producer_class: override for the kafka producer class. anonymize: if True, activate the anonymization feature. """ def __init__( self, brokers: Iterable[str], prefix: str, client_id: str, value_sanitizer: Callable[[str, Dict[str, Any]], Dict[str, Any]], producer_config: Optional[Dict] = None, flush_timeout: float = 120, producer_class: Type[Producer] = Producer, anonymize: bool = False, ): self._prefix = prefix self._prefix_privileged = f"{self._prefix}_privileged" self.anonymize = anonymize if not producer_config: producer_config = {} if "message.max.bytes" not in producer_config: producer_config = { "message.max.bytes": 100 * 1024 * 1024, **producer_config, } self.producer = producer_class( { "bootstrap.servers": ",".join(brokers), "client.id": client_id, "on_delivery": self._on_delivery, "error_cb": self._error_cb, "logger": logger, "acks": "all", **producer_config, } ) # Delivery management self.flush_timeout = flush_timeout # delivery tag -> original object "key" mapping self.deliveries_pending: Dict[DeliveryTag, KeyType] = {} # List of (object_type, key, error_msg, error_name) for failed deliveries self.delivery_failures: List[DeliveryFailureInfo] = [] self.value_sanitizer = value_sanitizer def _error_cb(self, error): if error.fatal(): raise KafkaException(error) logger.info("Received non-fatal kafka error: %s", error) def _on_delivery(self, error, message): (topic, key) = delivery_tag = DeliveryTag(message.topic(), message.key()) sent_key = self.deliveries_pending.pop(delivery_tag, None) if error is not None: self.delivery_failures.append( DeliveryFailureInfo( get_object_type(topic), sent_key, error.str(), error.name() ) ) def send(self, topic: str, key: KeyType, value): kafka_key = key_to_kafka(key) max_attempts = 5 last_exception: Optional[Exception] = None for attempt in range(max_attempts): try: self.producer.produce( - topic=topic, key=kafka_key, value=value_to_kafka(value), + topic=topic, + key=kafka_key, + value=value_to_kafka(value), ) except BufferError as e: last_exception = e wait = 1 + 3 * attempt if logger.isEnabledFor(logging.DEBUG): # pprint_key is expensive logger.debug( "BufferError producing %s %s; waiting for %ss", get_object_type(topic), pprint_key(kafka_key), wait, ) self.producer.poll(wait) else: self.deliveries_pending[DeliveryTag(topic, kafka_key)] = key return # We reach this point if all delivery attempts have failed self.delivery_failures.append( DeliveryFailureInfo( get_object_type(topic), key, str(last_exception), "SWH_BUFFER_ERROR" ) ) def delivery_error(self, message) -> KafkaDeliveryError: """Get all failed deliveries, and clear them""" ret = self.delivery_failures self.delivery_failures = [] while self.deliveries_pending: delivery_tag, orig_key = self.deliveries_pending.popitem() (topic, kafka_key) = delivery_tag ret.append( DeliveryFailureInfo( get_object_type(topic), orig_key, "No delivery before flush() timeout", "SWH_FLUSH_TIMEOUT", ) ) return KafkaDeliveryError(message, ret) def flush(self): start = time.monotonic() self.producer.flush(self.flush_timeout) while self.deliveries_pending: if time.monotonic() - start > self.flush_timeout: break self.producer.poll(0.1) if self.deliveries_pending: # Delivery timeout raise self.delivery_error( "flush() exceeded timeout (%ss)" % self.flush_timeout, ) elif self.delivery_failures: raise self.delivery_error("Failed deliveries after flush()") def _write_addition(self, object_type: str, object_: TValue) -> None: """Write a single object to the journal""" key = object_.unique_key() if self.anonymize: anon_object_ = object_.anonymize() if anon_object_: # can be either None, or an anonymized object # if the object is anonymizable, send the non-anonymized version in the # privileged channel topic = f"{self._prefix_privileged}.{object_type}" dict_ = self.value_sanitizer(object_type, object_.to_dict()) logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) self.send(topic, key=key, value=dict_) object_ = anon_object_ topic = f"{self._prefix}.{object_type}" dict_ = self.value_sanitizer(object_type, object_.to_dict()) logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) self.send(topic, key=key, value=dict_) def write_addition(self, object_type: str, object_: TValue) -> None: """Write a single object to the journal""" self._write_addition(object_type, object_) self.flush() write_update = write_addition def write_additions(self, object_type: str, objects: Iterable[TValue]) -> None: """Write a set of objects to the journal""" for object_ in objects: self._write_addition(object_type, object_) self.flush()