diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs new file mode 100644 index 0000000..cf9ce60 --- /dev/null +++ b/.git-blame-ignore-revs @@ -0,0 +1,5 @@ +# Enable black +c48ca0dc97327b726fad46e5fb29f3965490d89e + +# python: Reformat code with black 22.3.0 +1250404d220adbc2ab0e7256b2104d734b23b60c diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 24811ab..1c95e3d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,49 +1,40 @@ repos: -- repo: https://github.com/pre-commit/pre-commit-hooks - rev: v2.4.0 - hooks: - - id: trailing-whitespace - - id: check-json - - id: check-yaml + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.1.0 + hooks: + - id: trailing-whitespace + - id: check-json + - id: check-yaml -- repo: https://gitlab.com/pycqa/flake8 - rev: 3.8.4 - hooks: - - id: flake8 + - repo: https://gitlab.com/pycqa/flake8 + rev: 4.0.1 + hooks: + - id: flake8 + additional_dependencies: [flake8-bugbear==22.3.23] -- repo: https://github.com/codespell-project/codespell - rev: v1.16.0 - hooks: - - id: codespell + - repo: https://github.com/codespell-project/codespell + rev: v2.1.0 + hooks: + - id: codespell + name: Check source code spelling + stages: [commit] -- repo: local - hooks: - - id: mypy - name: mypy - entry: mypy - args: [swh] - pass_filenames: false - language: system - types: [python] + - repo: local + hooks: + - id: mypy + name: mypy + entry: mypy + args: [swh] + pass_filenames: false + language: system + types: [python] -- repo: https://github.com/PyCQA/isort - rev: 5.5.2 - hooks: - - id: isort - -- repo: https://github.com/python/black - rev: 19.10b0 - hooks: - - id: black - -# unfortunately, we are far from being able to enable this... -# - repo: https://github.com/PyCQA/pydocstyle.git -# rev: 4.0.0 -# hooks: -# - id: pydocstyle -# name: pydocstyle -# description: pydocstyle is a static analysis tool for checking compliance with Python docstring conventions. -# entry: pydocstyle --convention=google -# language: python -# types: [python] + - repo: https://github.com/PyCQA/isort + rev: 5.10.1 + hooks: + - id: isort + - repo: https://github.com/python/black + rev: 22.3.0 + hooks: + - id: black diff --git a/PKG-INFO b/PKG-INFO index 108b45f..891c2c8 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,76 +1,72 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 1.0.0 +Version: 1.1.0 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr -License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-journal/ -Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing License-File: LICENSE License-File: AUTHORS swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` - - diff --git a/docs/journal-clients.rst b/docs/journal-clients.rst index 3441ebb..80b9462 100644 --- a/docs/journal-clients.rst +++ b/docs/journal-clients.rst @@ -1,80 +1,82 @@ .. _journal_clients: Software Heritage Journal clients ================================= Journal client are processes that read data from the |swh| Journal, in order to efficiently process all existing objects, and process new objects as they come. Some journal clients, such as :ref:`swh-dataset ` only read existing objects and stop when they are done. Other journal clients, such as the :ref:`mirror ` are expected to read constantly from the journal. They can run in parallel, and the :mod:`swh.journal.client` module provides an abstraction handling all the setup, so actual clients only consists in a single function that takes :mod:`model objects ` as parameters. For example, a very simple journal client that prints all revisions and releases to the console can be implemented like this: .. literalinclude:: example-journal-client.py Parallelization --------------- A single journal client, like the one above, is sequential. It can however run concurrently by running the same program multiple times. Kafka will coordinate the processes so the load is shared across processes. +.. _journal-client-authentication: + Authentication -------------- In production, journal clients need credentials to access the journal. Once you have credentials, they can be configured by adding this to the ``config``:: config = { "sasl.mechanism": "SCRAM-SHA-512", "security.protocol": "SASL_SSL", "sasl.username": "", "sasl.password": "", } There are two types of client: privileged and unprivileged. The former has access to all the data, the latter gets redacted authorship information, for privacy reasons. Instead, the ``name`` and ``email`` fields of ``author`` and ``committer`` attributes of release and revision objects are blank, and their ``fullname`` is a SHA256 hash of their actual fullname. The ``privileged`` parameter to ``get_journal_client`` must be set accordingly. Order guarantees and replaying ------------------------------ The journal client shares the ordering guarantees of Kafka. The short version is that you should not assume any order unless specified otherwise in the `Kafka documentation `__, nor that two related objects are sent to the same process. We call "replay" any workflow that involves a journal client writing all (or most) objects to a new database. This can be either continuous (in effect, this produces a mirror database), or one-off. Either way, particular attention should be given to this lax ordering, as replaying produces databases that are (temporarily) inconsistent, because some objects may point to objects that are not replayed yet. For one-off replays, this can be mostly solved by processing objects in reverse topologic order: as contents don't reference any object, directories only reference contents and directories, revisions only reference directories, etc. ; this means that replayers can first process all revisions, then all directories, then all contents. This keeps the number of inconsistencies relatively small. For continuous replays, replayed databases are eventually consistent. diff --git a/pytest.ini b/pytest.ini index afa4cf3..ddc1b32 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,2 +1,4 @@ [pytest] -norecursedirs = docs +norecursedirs = build docs + +asyncio_mode = strict diff --git a/setup.cfg b/setup.cfg index 1d722c2..f65ba0a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,8 +1,9 @@ [flake8] -ignore = E203,E231,W503 +select = C,E,F,W,B950 +ignore = E203,E231,E501,W503 max-line-length = 88 [egg_info] tag_build = tag_date = 0 diff --git a/swh.journal.egg-info/PKG-INFO b/swh.journal.egg-info/PKG-INFO index 108b45f..891c2c8 100644 --- a/swh.journal.egg-info/PKG-INFO +++ b/swh.journal.egg-info/PKG-INFO @@ -1,76 +1,72 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 1.0.0 +Version: 1.1.0 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr -License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-journal/ -Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing License-File: LICENSE License-File: AUTHORS swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` - - diff --git a/swh.journal.egg-info/SOURCES.txt b/swh.journal.egg-info/SOURCES.txt index 1e46df5..8be9710 100644 --- a/swh.journal.egg-info/SOURCES.txt +++ b/swh.journal.egg-info/SOURCES.txt @@ -1,51 +1,53 @@ +.git-blame-ignore-revs .gitignore .pre-commit-config.yaml AUTHORS CODE_OF_CONDUCT.md CONTRIBUTORS LICENSE MANIFEST.in Makefile README.md mypy.ini pyproject.toml pytest.ini requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py tox.ini docs/.gitignore docs/Makefile docs/conf.py docs/example-journal-client.py docs/index.rst docs/journal-clients.rst docs/_static/.placeholder docs/_templates/.placeholder swh/__init__.py swh.journal.egg-info/PKG-INFO swh.journal.egg-info/SOURCES.txt swh.journal.egg-info/dependency_links.txt swh.journal.egg-info/entry_points.txt swh.journal.egg-info/requires.txt swh.journal.egg-info/top_level.txt swh/journal/__init__.py swh/journal/client.py swh/journal/py.typed swh/journal/pytest_plugin.py swh/journal/serializers.py swh/journal/tests/__init__.py swh/journal/tests/journal_data.py swh/journal/tests/log4j.properties swh/journal/tests/test_client.py swh/journal/tests/test_inmemory.py swh/journal/tests/test_kafka_writer.py swh/journal/tests/test_pytest_plugin.py swh/journal/tests/test_serializers.py swh/journal/tests/test_stream.py swh/journal/writer/__init__.py swh/journal/writer/inmemory.py +swh/journal/writer/interface.py swh/journal/writer/kafka.py swh/journal/writer/stream.py \ No newline at end of file diff --git a/swh.journal.egg-info/entry_points.txt b/swh.journal.egg-info/entry_points.txt index 467e9f5..6e787c5 100644 --- a/swh.journal.egg-info/entry_points.txt +++ b/swh.journal.egg-info/entry_points.txt @@ -1,4 +1,2 @@ - - [pytest11] - pytest_swh_journal = swh.journal.pytest_plugin - \ No newline at end of file +[pytest11] +pytest_swh_journal = swh.journal.pytest_plugin diff --git a/swh/journal/client.py b/swh/journal/client.py index 028f94b..7550e22 100644 --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -1,355 +1,370 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict from importlib import import_module +from itertools import cycle import logging import os from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union from confluent_kafka import Consumer, KafkaError, KafkaException from swh.core.statsd import statsd from swh.journal import DEFAULT_PREFIX from .serializers import kafka_to_value logger = logging.getLogger(__name__) rdkafka_logger = logging.getLogger(__name__ + ".rdkafka") # Only accepted offset reset policy accepted ACCEPTED_OFFSET_RESET = ["earliest", "latest"] # Errors that Kafka raises too often and are not useful; therefore they # we lower their log level to DEBUG instead of INFO. _SPAMMY_ERRORS = [ KafkaError._NO_OFFSET, ] JOURNAL_MESSAGE_NUMBER_METRIC = "swh_journal_client_handle_message_total" JOURNAL_STATUS_METRIC = "swh_journal_client_status" def get_journal_client(cls: str, **kwargs: Any): """Factory function to instantiate a journal client object. Currently, only the "kafka" journal client is supported. """ if cls == "kafka": if "stats_cb" in kwargs: stats_cb = kwargs["stats_cb"] if isinstance(stats_cb, str): try: module_path, func_name = stats_cb.split(":") except ValueError: raise ValueError( "Invalid stats_cb configuration option: " "it should be a string like 'path.to.module:function'" ) try: module = import_module(module_path, package=__package__) except ModuleNotFoundError: raise ValueError( "Invalid stats_cb configuration option: " f"module {module_path} not found" ) try: kwargs["stats_cb"] = getattr(module, func_name) except AttributeError: raise ValueError( "Invalid stats_cb configuration option: " f"function {func_name} not found in module {module_path}" ) return JournalClient(**kwargs) raise ValueError("Unknown journal client class `%s`" % cls) def _error_cb(error): if error.fatal(): raise KafkaException(error) if error.code() in _SPAMMY_ERRORS: logger.debug("Received non-fatal kafka error: %s", error) else: logger.info("Received non-fatal kafka error: %s", error) def _on_commit(error, partitions): if error is not None: _error_cb(error) class JournalClient: """A base client for the Software Heritage journal. The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each object type using a specific topic under that prefix. If the `prefix` argument is None (default value), it will take the default value `'swh.journal.objects'`. Clients subscribe to events specific to each object type as listed in the `object_types` argument (if unset, defaults to all existing kafka topic under the prefix). Clients can be sharded by setting the `group_id` to a common value across instances. The journal will share the message throughput across the nodes sharing the same group_id. Messages are processed by the `worker_fn` callback passed to the `process` method, in batches of maximum `batch_size` messages (defaults to 200). The objects passed to the `worker_fn` callback are the result of the kafka message converted by the `value_deserializer` function. By default (if this argument is not given), it will produce dicts (using the `kafka_to_value` function). This signature of the function is: `value_deserializer(object_type: str, kafka_msg: bytes) -> Any` If the value returned by `value_deserializer` is None, it is ignored and not passed the `worker_fn` function. If set, the processing stops after processing `stop_after_objects` messages in total. `stop_on_eof` stops the processing when the client has reached the end of each partition in turn. `auto_offset_reset` sets the behavior of the client when the consumer group initializes: `'earliest'` (the default) processes all objects since the inception of the topics; `''` Any other named argument is passed directly to KafkaConsumer(). """ def __init__( self, brokers: Union[str, List[str]], group_id: str, prefix: Optional[str] = None, object_types: Optional[List[str]] = None, privileged: bool = False, stop_after_objects: Optional[int] = None, batch_size: int = 200, process_timeout: Optional[float] = None, auto_offset_reset: str = "earliest", stop_on_eof: bool = False, value_deserializer: Optional[Callable[[str, bytes], Any]] = None, **kwargs, ): if prefix is None: prefix = DEFAULT_PREFIX if auto_offset_reset not in ACCEPTED_OFFSET_RESET: raise ValueError( "Option 'auto_offset_reset' only accept %s, not %s" % (ACCEPTED_OFFSET_RESET, auto_offset_reset) ) if batch_size <= 0: raise ValueError("Option 'batch_size' needs to be positive") if value_deserializer: self.value_deserializer = value_deserializer else: self.value_deserializer = lambda _, value: kafka_to_value(value) if isinstance(brokers, str): brokers = [brokers] debug_logging = rdkafka_logger.isEnabledFor(logging.DEBUG) if debug_logging and "debug" not in kwargs: kwargs["debug"] = "consumer" # Static group instance id management group_instance_id = os.environ.get("KAFKA_GROUP_INSTANCE_ID") if group_instance_id: kwargs["group.instance.id"] = group_instance_id if "group.instance.id" in kwargs: # When doing static consumer group membership, set a higher default # session timeout. The session timeout is the duration after which # the broker considers that a consumer has left the consumer group # for good, and triggers a rebalance. Considering our current # processing pattern, 10 minutes gives the consumer ample time to # restart before that happens. if "session.timeout.ms" not in kwargs: kwargs["session.timeout.ms"] = 10 * 60 * 1000 # 10 minutes if "session.timeout.ms" in kwargs: # When the session timeout is set, rdkafka requires the max poll # interval to be set to a higher value; the max poll interval is # rdkafka's way of figuring out whether the client's message # processing thread has stalled: when the max poll interval lapses # between two calls to consumer.poll(), rdkafka leaves the consumer # group and terminates the connection to the brokers. # # We default to 1.5 times the session timeout if "max.poll.interval.ms" not in kwargs: kwargs["max.poll.interval.ms"] = kwargs["session.timeout.ms"] // 2 * 3 consumer_settings = { **kwargs, "bootstrap.servers": ",".join(brokers), "auto.offset.reset": auto_offset_reset, "group.id": group_id, "on_commit": _on_commit, "error_cb": _error_cb, "enable.auto.commit": False, "logger": rdkafka_logger, } self.stop_on_eof = stop_on_eof if self.stop_on_eof: consumer_settings["enable.partition.eof"] = True logger.debug("Consumer settings: %s", consumer_settings) self.consumer = Consumer(consumer_settings) if privileged: privileged_prefix = f"{prefix}_privileged" else: # do not attempt to subscribe to privileged topics privileged_prefix = f"{prefix}" existing_topics = [ topic for topic in self.consumer.list_topics(timeout=10).topics.keys() if ( topic.startswith(f"{prefix}.") or topic.startswith(f"{privileged_prefix}.") ) ] if not existing_topics: raise ValueError( f"The prefix {prefix} does not match any existing topic " "on the kafka broker" ) if not object_types: object_types = list({topic.split(".")[-1] for topic in existing_topics}) self.subscription = [] unknown_types = [] for object_type in object_types: topics = (f"{privileged_prefix}.{object_type}", f"{prefix}.{object_type}") for topic in topics: if topic in existing_topics: self.subscription.append(topic) break else: unknown_types.append(object_type) if unknown_types: raise ValueError( f"Topic(s) for object types {','.join(unknown_types)} " "are unknown on the kafka broker" ) logger.debug(f"Upstream topics: {existing_topics}") self.subscribe() self.stop_after_objects = stop_after_objects self.eof_reached: Set[Tuple[str, str]] = set() self.batch_size = batch_size if process_timeout is not None: raise DeprecationWarning( "'process_timeout' argument is not supported anymore by " "JournalClient; please remove it from your configuration.", ) def subscribe(self): """Subscribe to topics listed in self.subscription This can be overridden if you need, for instance, to manually assign partitions. """ logger.debug(f"Subscribing to: {self.subscription}") self.consumer.subscribe(topics=self.subscription) def process(self, worker_fn): """Polls Kafka for a batch of messages, and calls the worker_fn with these messages. Args: worker_fn Callable[Dict[str, List[dict]]]: Function called with the messages as argument. """ total_objects_processed = 0 # timeout for message poll timeout = 1.0 with statsd.status_gauge( JOURNAL_STATUS_METRIC, statuses=["idle", "processing", "waiting"] ) as set_status: set_status("idle") while True: batch_size = self.batch_size if self.stop_after_objects: if total_objects_processed >= self.stop_after_objects: break # clamp batch size to avoid overrunning stop_after_objects batch_size = min( - self.stop_after_objects - total_objects_processed, batch_size, + self.stop_after_objects - total_objects_processed, + batch_size, ) - set_status("waiting") - while True: + for i in cycle(reversed(range(10))): messages = self.consumer.consume( timeout=timeout, num_messages=batch_size ) if messages: break - set_status("processing") - batch_processed, at_eof = self.handle_messages(messages, worker_fn) + # do check for an EOF condition iff we already consumed + # messages, otherwise we could detect an EOF condition + # before messages had a chance to reach us (e.g. in tests) + if total_objects_processed > 0 and self.stop_on_eof and i == 0: + at_eof = all( + (tp.topic, tp.partition) in self.eof_reached + for tp in self.consumer.assignment() + ) + if at_eof: + break + if messages: + set_status("processing") + batch_processed, at_eof = self.handle_messages(messages, worker_fn) + + set_status("idle") + # report the number of handled messages + statsd.increment( + JOURNAL_MESSAGE_NUMBER_METRIC, value=batch_processed + ) + total_objects_processed += batch_processed - set_status("idle") - # report the number of handled messages - statsd.increment(JOURNAL_MESSAGE_NUMBER_METRIC, value=batch_processed) - total_objects_processed += batch_processed if at_eof: break return total_objects_processed def handle_messages(self, messages, worker_fn): objects: Dict[str, List[Any]] = defaultdict(list) nb_processed = 0 for message in messages: error = message.error() if error is not None: if error.code() == KafkaError._PARTITION_EOF: self.eof_reached.add((message.topic(), message.partition())) else: _error_cb(error) continue if message.value() is None: # ignore message with no payload, these can be generated in tests continue nb_processed += 1 object_type = message.topic().split(".")[-1] deserialized_object = self.deserialize_message( message, object_type=object_type ) if deserialized_object is not None: objects[object_type].append(deserialized_object) if objects: worker_fn(dict(objects)) self.consumer.commit() at_eof = self.stop_on_eof and all( (tp.topic, tp.partition) in self.eof_reached for tp in self.consumer.assignment() ) return nb_processed, at_eof def deserialize_message(self, message, object_type=None): return self.value_deserializer(object_type, message.value()) def close(self): self.consumer.close() diff --git a/swh/journal/pytest_plugin.py b/swh/journal/pytest_plugin.py index eb82b69..3476d4e 100644 --- a/swh/journal/pytest_plugin.py +++ b/swh/journal/pytest_plugin.py @@ -1,261 +1,257 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import random import string from typing import Any, Collection, Dict, Iterator, Optional import attr from confluent_kafka import Consumer, KafkaException, Producer from confluent_kafka.admin import AdminClient import pytest from swh.journal.serializers import kafka_to_key, kafka_to_value, pprint_key from swh.model.tests.swh_model_data import TEST_OBJECTS def ensure_lists(value: Any) -> Any: """ >>> ensure_lists(["foo", 42]) ['foo', 42] >>> ensure_lists(("foo", 42)) ['foo', 42] >>> ensure_lists({"a": ["foo", 42]}) {'a': ['foo', 42]} >>> ensure_lists({"a": ("foo", 42)}) {'a': ['foo', 42]} """ if isinstance(value, (tuple, list)): return list(map(ensure_lists, value)) elif isinstance(value, dict): return dict(ensure_lists(list(value.items()))) else: return value def consume_messages(consumer, kafka_prefix, expected_messages): """Consume expected_messages from the consumer; Sort them all into a consumed_objects dict""" consumed_messages = defaultdict(list) fetched_messages = 0 retries_left = 1000 while fetched_messages < expected_messages: if retries_left == 0: raise ValueError( "Timed out fetching messages from kafka. " f"Only {fetched_messages}/{expected_messages} fetched" ) msg = consumer.poll(timeout=0.1) if not msg: retries_left -= 1 continue error = msg.error() if error is not None: if error.fatal(): raise KafkaException(error) retries_left -= 1 continue fetched_messages += 1 topic = msg.topic() assert topic.startswith(f"{kafka_prefix}.") or topic.startswith( f"{kafka_prefix}_privileged." ), "Unexpected topic" object_type = topic[len(kafka_prefix + ".") :] consumed_messages[object_type].append( (kafka_to_key(msg.key()), kafka_to_value(msg.value())) ) return consumed_messages def assert_all_objects_consumed( consumed_messages: Dict, exclude: Optional[Collection] = None ): """Check whether all objects from TEST_OBJECTS have been consumed `exclude` can be a list of object types for which we do not want to compare the values (eg. for anonymized object). """ for object_type, known_objects in TEST_OBJECTS.items(): known_keys = [obj.unique_key() for obj in known_objects] if not consumed_messages[object_type]: return (received_keys, received_values) = zip(*consumed_messages[object_type]) if object_type in ("content", "skipped_content"): for value in received_values: value.pop("ctime", None) if object_type == "content": known_objects = [attr.evolve(o, data=None) for o in known_objects] for key in known_keys: assert key in received_keys, ( f"expected {object_type} key {pprint_key(key)} " "absent from consumed messages" ) if exclude and object_type in exclude: continue for value in known_objects: expected_value = value.to_dict() if value.object_type in ("content", "skipped_content"): expected_value.pop("ctime", None) assert ensure_lists(expected_value) in received_values, ( f"expected {object_type} value {value!r} is " "absent from consumed messages" ) @pytest.fixture(scope="function") def kafka_prefix(): """Pick a random prefix for kafka topics on each call""" return "".join(random.choice(string.ascii_lowercase) for _ in range(10)) @pytest.fixture(scope="function") def kafka_consumer_group(kafka_prefix: str): """Pick a random consumer group for kafka consumers on each call""" return "test-consumer-%s" % kafka_prefix @pytest.fixture(scope="function") def object_types(): """Set of object types to precreate topics for.""" return set(TEST_OBJECTS.keys()) @pytest.fixture(scope="function") def privileged_object_types(): """Set of object types to precreate privileged topics for.""" return {"revision", "release"} @pytest.fixture(scope="function") def kafka_server( kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str], ) -> str: """A kafka server with existing topics Unprivileged topics are built as ``{kafka_prefix}.{object_type}`` with object_type from the ``object_types`` list. Privileged topics are built as ``{kafka_prefix}_privileged.{object_type}`` with object_type from the ``privileged_object_types`` list. """ topics = [f"{kafka_prefix}.{obj}" for obj in object_types] + [ f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types ] # unfortunately, the Mock broker does not support the CreatTopic admin API, so we # have to create topics using a Producer. producer = Producer( { "bootstrap.servers": kafka_server_base, "client.id": "bootstrap producer", "acks": "all", } ) for topic in topics: producer.produce(topic=topic, value=None) for i in range(10): if producer.flush(0.1) == 0: break return kafka_server_base @pytest.fixture(scope="session") def kafka_server_base() -> Iterator[str]: """Create a mock kafka cluster suitable for tests. Yield a connection string. Note: this is a generator to keep the mock broker alive during the whole test session. see https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_mock.h """ admin = AdminClient({"test.mock.num.brokers": "1"}) metadata = admin.list_topics() brokers = [str(broker) for broker in metadata.brokers.values()] assert len(brokers) == 1, "More than one broker found in the kafka cluster?!" broker_connstr, broker_id = brokers[0].split("/") yield broker_connstr TEST_CONFIG = { "consumer_id": "swh.journal.consumer", "stop_on_eof": True, "storage": {"cls": "memory", "args": {}}, } @pytest.fixture def test_config( kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str], ): - """Test configuration needed for producer/consumer - - """ + """Test configuration needed for producer/consumer""" return { **TEST_CONFIG, "object_types": object_types, "privileged_object_types": privileged_object_types, "brokers": [kafka_server_base], "prefix": kafka_prefix, } @pytest.fixture def consumer( kafka_server: str, test_config: Dict, kafka_consumer_group: str ) -> Consumer: - """Get a connected Kafka consumer. - - """ + """Get a connected Kafka consumer.""" consumer = Consumer( { "bootstrap.servers": kafka_server, "auto.offset.reset": "earliest", "enable.auto.commit": True, "group.id": kafka_consumer_group, } ) prefix = test_config["prefix"] kafka_topics = [ f"{prefix}.{object_type}" for object_type in test_config["object_types"] ] + [ f"{prefix}_privileged.{object_type}" for object_type in test_config["privileged_object_types"] ] consumer.subscribe(kafka_topics) yield consumer # Explicitly perform the commit operation on the consumer before closing it # to avoid possible hang since confluent-kafka v1.6.0 consumer.commit() consumer.close() diff --git a/swh/journal/serializers.py b/swh/journal/serializers.py index 274ef46..06c60a7 100644 --- a/swh/journal/serializers.py +++ b/swh/journal/serializers.py @@ -1,113 +1,126 @@ -# Copyright (C) 2016-2021 The Software Heritage developers +# Copyright (C) 2016-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from enum import Enum -from typing import Any, Union +from typing import Any, BinaryIO, Union import msgpack from swh.model.model import KeyType class MsgpackExtTypeCodes(Enum): LONG_INT = 1 LONG_NEG_INT = 2 # this as been copied from swh.core.api.serializer # TODO refactor swh.core to make this function available def _msgpack_encode_longint(value): # needed because msgpack will not handle long integers with more than 64 bits # which we unfortunately happen to have to deal with from time to time if value > 0: code = MsgpackExtTypeCodes.LONG_INT.value else: code = MsgpackExtTypeCodes.LONG_NEG_INT.value value = -value length, rem = divmod(value.bit_length(), 8) if rem: length += 1 return msgpack.ExtType(code, int.to_bytes(value, length, "big")) def msgpack_ext_encode_types(obj): if isinstance(obj, int): return _msgpack_encode_longint(obj) return obj def msgpack_ext_hook(code, data): if code == MsgpackExtTypeCodes.LONG_INT.value: return int.from_bytes(data, "big") if code == MsgpackExtTypeCodes.LONG_NEG_INT.value: return -int.from_bytes(data, "big") raise ValueError("Unknown msgpack extended code %s" % code) # for BW compat def decode_types_bw(obj): if set(obj.keys()) == {b"d", b"swhtype"} and obj[b"swhtype"] == "datetime": return datetime.datetime.fromisoformat(obj[b"d"]) return obj def stringify_key_item(k: str, v: Union[str, bytes]) -> str: """Turn the item of a dict key into a string""" if isinstance(v, str): return v if k == "url": return v.decode("utf-8") return v.hex() def pprint_key(key: KeyType) -> str: """Pretty-print a kafka key""" if isinstance(key, dict): return "{%s}" % ", ".join( f"{k}: {stringify_key_item(k, v)}" for k, v in key.items() ) elif isinstance(key, bytes): return key.hex() else: return key def key_to_kafka(key: KeyType) -> bytes: """Serialize a key, possibly a dict, in a predictable way""" p = msgpack.Packer(use_bin_type=True) if isinstance(key, dict): return p.pack_map_pairs(sorted(key.items())) else: return p.pack(key) def kafka_to_key(kafka_key: bytes) -> KeyType: """Deserialize a key""" return msgpack.loads(kafka_key, raw=False) def value_to_kafka(value: Any) -> bytes: """Serialize some data for storage in kafka""" return msgpack.packb( value, use_bin_type=True, datetime=True, # encode datetime as msgpack.Timestamp default=msgpack_ext_encode_types, ) def kafka_to_value(kafka_value: bytes) -> Any: """Deserialize some data stored in kafka""" return msgpack.unpackb( kafka_value, raw=False, object_hook=decode_types_bw, ext_hook=msgpack_ext_hook, strict_map_key=False, timestamp=3, # convert Timestamp in datetime objects (tz UTC) ) + + +def kafka_stream_to_value(file_like: BinaryIO) -> msgpack.Unpacker: + """Return a deserializer for data stored in kafka""" + return msgpack.Unpacker( + file_like, + raw=False, + object_hook=decode_types_bw, + ext_hook=msgpack_ext_hook, + strict_map_key=False, + use_list=False, + timestamp=3, # convert Timestamp in datetime objects (tz UTC) + ) diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py index 82d6979..3d032d5 100644 --- a/swh/journal/tests/test_client.py +++ b/swh/journal/tests/test_client.py @@ -1,395 +1,409 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict, List, cast from unittest.mock import MagicMock from confluent_kafka import Producer import pytest from swh.journal.client import JournalClient from swh.journal.serializers import kafka_to_value, key_to_kafka, value_to_kafka from swh.model.model import Content, Revision from swh.model.tests.swh_model_data import TEST_OBJECTS REV = { "message": b"something cool", "author": {"fullname": b"Peter", "name": None, "email": b"peter@ouiche.lo"}, "committer": {"fullname": b"Stephen", "name": b"From Outer Space", "email": None}, "date": { "timestamp": {"seconds": 123456789, "microseconds": 123}, "offset": 120, "negative_utc": False, }, "committer_date": { "timestamp": {"seconds": 123123456, "microseconds": 0}, "offset": 0, "negative_utc": False, }, "type": "git", "directory": ( b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" b"\x01\x02\x03\x04\x05" ), "synthetic": False, "metadata": None, "parents": [], "id": b"\x8b\xeb\xd1\x9d\x07\xe2\x1e0\xe2 \x91X\x8d\xbd\x1c\xa8\x86\xdeB\x0c", } def test_client(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) # Fill Kafka producer.produce( - topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), + topic=kafka_prefix + ".revision", + key=REV["id"], + value=value_to_kafka(REV), ) producer.flush() client = JournalClient( brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, ) worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({"revision": [REV]}) @pytest.mark.parametrize("count", [1, 2]) def test_client_stop_after_objects( kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, count: int ): producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) # Fill Kafka revisions = cast(List[Revision], TEST_OBJECTS["revision"]) for rev in revisions: producer.produce( topic=kafka_prefix + ".revision", key=rev.id, value=value_to_kafka(rev.to_dict()), ) producer.flush() client = JournalClient( brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=False, stop_after_objects=count, ) worker_fn = MagicMock() client.process(worker_fn) # this code below is not pretty, but needed since we have to deal with # dicts (so no set) which can have values that are list vs tuple, and we do # not know for sure how many calls of the worker_fn will happen during the # consumption of the topic... worker_fn.assert_called() revs = [] # list of (unique) rev dicts we got from the client for call in worker_fn.call_args_list: callrevs = call[0][0]["revision"] for rev in callrevs: assert Revision.from_dict(rev) in revisions if rev not in revs: revs.append(rev) assert len(revs) == count @pytest.mark.parametrize("batch_size", [1, 5, 100]) def test_client_batch_size( - kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, batch_size: int, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: str, + batch_size: int, ): num_objects = 2 * batch_size + 1 assert num_objects < 256, "Too many objects, generation will fail" producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) contents = [Content.from_data(bytes([i])) for i in range(num_objects)] # Fill Kafka for content in contents: producer.produce( topic=kafka_prefix + ".content", key=key_to_kafka(content.sha1), value=value_to_kafka(content.to_dict()), ) producer.flush() client = JournalClient( brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, batch_size=batch_size, ) collected_output: List[Dict] = [] def worker_fn(objects): received = objects["content"] assert len(received) <= batch_size collected_output.extend(received) client.process(worker_fn) expected_output = [content.to_dict() for content in contents] assert len(collected_output) == len(expected_output) for output in collected_output: assert output in expected_output @pytest.fixture() def kafka_producer(kafka_prefix: str, kafka_server_base: str): producer = Producer( { "bootstrap.servers": kafka_server_base, "client.id": "test producer", "acks": "all", } ) # Fill Kafka producer.produce( topic=kafka_prefix + ".something", key=key_to_kafka(b"key1"), value=value_to_kafka("value1"), ) producer.produce( topic=kafka_prefix + ".else", key=key_to_kafka(b"key1"), value=value_to_kafka("value2"), ) producer.flush() return producer def test_client_subscribe_all( kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): client = JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix=kafka_prefix, stop_on_eof=True, ) assert set(client.subscription) == { f"{kafka_prefix}.something", f"{kafka_prefix}.else", } worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with( - {"something": ["value1"], "else": ["value2"],} + { + "something": ["value1"], + "else": ["value2"], + } ) def test_client_subscribe_one_topic( kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): client = JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix=kafka_prefix, stop_on_eof=True, object_types=["else"], ) assert client.subscription == [f"{kafka_prefix}.else"] worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({"else": ["value2"]}) def test_client_subscribe_absent_topic( kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): with pytest.raises(ValueError): JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix=kafka_prefix, stop_on_eof=True, object_types=["really"], ) def test_client_subscribe_absent_prefix( kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): with pytest.raises(ValueError): JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix="wrong.prefix", stop_on_eof=True, ) with pytest.raises(ValueError): JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix="wrong.prefix", stop_on_eof=True, object_types=["else"], ) def test_client_subscriptions_with_anonymized_topics( kafka_prefix: str, kafka_consumer_group: str, kafka_server_base: str ): producer = Producer( { "bootstrap.servers": kafka_server_base, "client.id": "test producer", "acks": "all", } ) # Fill Kafka with revision object on both the regular prefix (normally for # anonymized objects in this case) and privileged one producer.produce( - topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), + topic=kafka_prefix + ".revision", + key=REV["id"], + value=value_to_kafka(REV), ) producer.produce( topic=kafka_prefix + "_privileged.revision", key=REV["id"], value=value_to_kafka(REV), ) producer.flush() # without privileged "channels" activated on the client side client = JournalClient( brokers=[kafka_server_base], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, privileged=False, ) # we only subscribed to "standard" topics assert client.subscription == [kafka_prefix + ".revision"] # with privileged "channels" activated on the client side client = JournalClient( brokers=[kafka_server_base], group_id=kafka_consumer_group, prefix=kafka_prefix, privileged=True, ) # we only subscribed to "privileged" topics assert client.subscription == [kafka_prefix + "_privileged.revision"] def test_client_subscriptions_without_anonymized_topics( kafka_prefix: str, kafka_consumer_group: str, kafka_server_base: str ): producer = Producer( { "bootstrap.servers": kafka_server_base, "client.id": "test producer", "acks": "all", } ) # Fill Kafka with revision objects only on the standard prefix producer.produce( - topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), + topic=kafka_prefix + ".revision", + key=REV["id"], + value=value_to_kafka(REV), ) producer.flush() # without privileged channel activated on the client side client = JournalClient( brokers=[kafka_server_base], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, privileged=False, ) # we only subscribed to the standard prefix assert client.subscription == [kafka_prefix + ".revision"] # with privileged channel activated on the client side client = JournalClient( brokers=[kafka_server_base], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, privileged=True, ) # we also only subscribed to the standard prefix, since there is no priviled prefix # on the kafka broker assert client.subscription == [kafka_prefix + ".revision"] def test_client_with_deserializer( kafka_prefix: str, kafka_consumer_group: str, kafka_server: str ): producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) # Fill Kafka revisions = cast(List[Revision], TEST_OBJECTS["revision"]) for rev in revisions: producer.produce( topic=kafka_prefix + ".revision", key=rev.id, value=value_to_kafka(rev.to_dict()), ) producer.flush() def custom_deserializer(object_type, msg): assert object_type == "revision" obj = kafka_to_value(msg) # filter the first revision if obj["id"] == revisions[0].id: return None return Revision.from_dict(obj) client = JournalClient( brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, value_deserializer=custom_deserializer, ) worker_fn = MagicMock() client.process(worker_fn) # a commit seems to be needed to prevent some race condition situation # where the worker_fn has not yet been called at this point (not sure how) client.consumer.commit() # Check the first revision has not been passed to worker_fn - worker_fn.assert_called_once_with({"revision": revisions[1:]}) + processed_revisions = set(worker_fn.call_args[0][0]["revision"]) + assert revisions[0] not in processed_revisions + assert all(rev in processed_revisions for rev in revisions[1:]) diff --git a/swh/journal/tests/test_inmemory.py b/swh/journal/tests/test_inmemory.py index c414a9b..c1fa5f8 100644 --- a/swh/journal/tests/test_inmemory.py +++ b/swh/journal/tests/test_inmemory.py @@ -1,48 +1,53 @@ +# Copyright (C) 2019-2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + import pytest from swh.journal.writer import model_object_dict_sanitizer from swh.journal.writer.inmemory import InMemoryJournalWriter from swh.model.model import BaseModel from swh.model.tests.swh_model_data import TEST_OBJECTS -def test_write_additions_with_test_objects(): - writer = InMemoryJournalWriter[BaseModel]( - value_sanitizer=model_object_dict_sanitizer +def test_write_additions_anonymized(): + writer = InMemoryJournalWriter( + value_sanitizer=model_object_dict_sanitizer, anonymize=True ) expected = [] + priv_expected = [] for object_type, objects in TEST_OBJECTS.items(): writer.write_additions(object_type, objects) for object in objects: - expected.append((object_type, object)) + if object.anonymize(): + expected.append((object_type, object.anonymize())) + priv_expected.append((object_type, object)) + else: + expected.append((object_type, object)) - assert list(writer.privileged_objects) == [] + assert set(priv_expected) == set(writer.privileged_objects) assert set(expected) == set(writer.objects) -def test_write_additions_with_privileged_test_objects(): - writer = InMemoryJournalWriter[BaseModel]( - value_sanitizer=model_object_dict_sanitizer - ) - - expected = [] +def test_write_additions(): + writer = InMemoryJournalWriter(value_sanitizer=model_object_dict_sanitizer) + expected = set() for object_type, objects in TEST_OBJECTS.items(): - writer.write_additions(object_type, objects, True) + writer.write_additions(object_type, objects) for object in objects: - expected.append((object_type, object)) + expected.add((object_type, object)) - assert list(writer.objects) == [] - assert set(expected) == set(writer.privileged_objects) + assert not set(writer.privileged_objects) + assert expected == set(writer.objects) def test_write_addition_errors_without_unique_key(): - writer = InMemoryJournalWriter[BaseModel]( - value_sanitizer=model_object_dict_sanitizer - ) + writer = InMemoryJournalWriter(value_sanitizer=model_object_dict_sanitizer) with pytest.raises(NotImplementedError): writer.write_addition("BaseModel", BaseModel()) diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py index 241e767..e19b3b6 100644 --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -1,248 +1,248 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from typing import Iterable from confluent_kafka import Consumer, Producer import pytest from swh.journal.pytest_plugin import assert_all_objects_consumed, consume_messages from swh.journal.writer import model_object_dict_sanitizer from swh.journal.writer.kafka import KafkaDeliveryError, KafkaJournalWriter from swh.model.model import BaseModel, Directory, Release, Revision from swh.model.tests.swh_model_data import TEST_OBJECTS def test_kafka_writer( kafka_prefix: str, kafka_server: str, consumer: Consumer, privileged_object_types: Iterable[str], ): - writer = KafkaJournalWriter[BaseModel]( + writer = KafkaJournalWriter( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, value_sanitizer=model_object_dict_sanitizer, anonymize=False, ) expected_messages = 0 for object_type, objects in TEST_OBJECTS.items(): writer.write_additions(object_type, objects) expected_messages += len(objects) consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages) for key, obj_dict in consumed_messages["revision"]: obj = Revision.from_dict(obj_dict) for person in (obj.author, obj.committer): assert not ( len(person.fullname) == 32 and person.name is None and person.email is None ) for key, obj_dict in consumed_messages["release"]: obj = Release.from_dict(obj_dict) # author is optional for release if obj.author is None: continue for person in (obj.author,): assert not ( len(person.fullname) == 32 and person.name is None and person.email is None ) def test_kafka_writer_anonymized( kafka_prefix: str, kafka_server: str, consumer: Consumer, privileged_object_types: Iterable[str], ): - writer = KafkaJournalWriter[BaseModel]( + writer = KafkaJournalWriter( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, value_sanitizer=model_object_dict_sanitizer, anonymize=True, ) expected_messages = 0 for object_type, objects in TEST_OBJECTS.items(): writer.write_additions(object_type, objects) expected_messages += len(objects) if object_type in privileged_object_types: expected_messages += len(objects) consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages, exclude=["revision", "release"]) for key, obj_dict in consumed_messages["revision"]: obj = Revision.from_dict(obj_dict) for person in (obj.author, obj.committer): assert ( len(person.fullname) == 32 and person.name is None and person.email is None ) for key, obj_dict in consumed_messages["release"]: obj = Release.from_dict(obj_dict) # author is optional for release if obj.author is None: continue for person in (obj.author,): assert ( len(person.fullname) == 32 and person.name is None and person.email is None ) def test_write_delivery_failure(kafka_prefix: str, kafka_server: str): class MockKafkaError: """A mocked kafka error""" def str(self): return "Mocked Kafka Error" def name(self): return "SWH_MOCK_ERROR" class KafkaJournalWriterFailDelivery(KafkaJournalWriter): """A journal writer which always fails delivering messages""" def _on_delivery(self, error, message): """Replace the inbound error with a fake delivery error""" super()._on_delivery(MockKafkaError(), message) kafka_prefix += ".swh.journal.objects" writer = KafkaJournalWriterFailDelivery( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, value_sanitizer=model_object_dict_sanitizer, ) empty_dir = Directory(entries=()) with pytest.raises(KafkaDeliveryError) as exc: writer.write_addition("directory", empty_dir) assert "Failed deliveries" in exc.value.message assert len(exc.value.delivery_failures) == 1 delivery_failure = exc.value.delivery_failures[0] assert delivery_failure.key == empty_dir.id assert delivery_failure.code == "SWH_MOCK_ERROR" def test_write_delivery_timeout(kafka_prefix: str, kafka_server: str): produced = [] class MockProducer(Producer): """A kafka producer which pretends to produce messages, but never sends any delivery acknowledgements""" def produce(self, **kwargs): produced.append(kwargs) - writer = KafkaJournalWriter[BaseModel]( + writer = KafkaJournalWriter( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, value_sanitizer=model_object_dict_sanitizer, flush_timeout=1, producer_class=MockProducer, ) empty_dir = Directory(entries=()) with pytest.raises(KafkaDeliveryError) as exc: writer.write_addition("directory", empty_dir) assert len(produced) == 1 assert "timeout" in exc.value.message assert len(exc.value.delivery_failures) == 1 delivery_failure = exc.value.delivery_failures[0] assert delivery_failure.key == empty_dir.id assert delivery_failure.code == "SWH_FLUSH_TIMEOUT" class MockBufferErrorProducer(Producer): """A Kafka producer that returns a BufferError on the `n_buffererrors` first calls to produce.""" def __init__(self, *args, **kwargs): self.n_buffererrors = kwargs.pop("n_bufferrors", 0) self.produce_calls = 0 super().__init__(*args, **kwargs) def produce(self, **kwargs): self.produce_calls += 1 if self.produce_calls <= self.n_buffererrors: raise BufferError("Local: Queue full") self.produce_calls = 0 return super().produce(**kwargs) def test_write_BufferError_retry(kafka_prefix: str, kafka_server: str, caplog): - writer = KafkaJournalWriter[BaseModel]( + writer = KafkaJournalWriter( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, value_sanitizer=model_object_dict_sanitizer, flush_timeout=1, producer_class=MockBufferErrorProducer, ) writer.producer.n_buffererrors = 4 empty_dir = Directory(entries=()) caplog.set_level(logging.DEBUG, "swh.journal.writer.kafka") writer.write_addition("directory", empty_dir) records = [] for record in caplog.records: if "BufferError" in record.getMessage(): records.append(record) assert len(records) == writer.producer.n_buffererrors def test_write_BufferError_give_up(kafka_prefix: str, kafka_server: str, caplog): - writer = KafkaJournalWriter[BaseModel]( + writer = KafkaJournalWriter( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, value_sanitizer=model_object_dict_sanitizer, flush_timeout=1, producer_class=MockBufferErrorProducer, ) writer.producer.n_buffererrors = 5 empty_dir = Directory(entries=()) with pytest.raises(KafkaDeliveryError): writer.write_addition("directory", empty_dir) def test_write_addition_errors_without_unique_key(kafka_prefix: str, kafka_server: str): - writer = KafkaJournalWriter[BaseModel]( + writer = KafkaJournalWriter( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, value_sanitizer=model_object_dict_sanitizer, ) with pytest.raises(NotImplementedError): writer.write_addition("BaseModel", BaseModel()) diff --git a/swh/journal/tests/test_pytest_plugin.py b/swh/journal/tests/test_pytest_plugin.py index ece39ba..b5966da 100644 --- a/swh/journal/tests/test_pytest_plugin.py +++ b/swh/journal/tests/test_pytest_plugin.py @@ -1,73 +1,76 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Iterator from confluent_kafka.admin import AdminClient def test_kafka_server(kafka_server_base: str): ip, port_str = kafka_server_base.split(":") assert ip == "127.0.0.1" assert int(port_str) admin = AdminClient({"bootstrap.servers": kafka_server_base}) topics = admin.list_topics() assert len(topics.brokers) == 1 def test_kafka_server_with_topics( kafka_server: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str], ): admin = AdminClient({"bootstrap.servers": kafka_server}) # check unprivileged topics are present topics = { topic for topic in admin.list_topics().topics if topic.startswith(f"{kafka_prefix}.") } assert topics == {f"{kafka_prefix}.{obj}" for obj in object_types} # check privileged topics are present topics = { topic for topic in admin.list_topics().topics if topic.startswith(f"{kafka_prefix}_privileged.") } assert topics == { f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types } def test_test_config(test_config: dict, kafka_prefix: str, kafka_server_base: str): assert test_config == { "consumer_id": "swh.journal.consumer", "stop_on_eof": True, "storage": {"cls": "memory", "args": {}}, "object_types": { "content", "directory", "extid", "metadata_authority", "metadata_fetcher", "origin", "origin_visit", "origin_visit_status", "raw_extrinsic_metadata", "release", "revision", "snapshot", "skipped_content", }, - "privileged_object_types": {"release", "revision",}, + "privileged_object_types": { + "release", + "revision", + }, "brokers": [kafka_server_base], "prefix": kafka_prefix, } diff --git a/swh/journal/tests/test_serializers.py b/swh/journal/tests/test_serializers.py index b94825f..5dde7bd 100644 --- a/swh/journal/tests/test_serializers.py +++ b/swh/journal/tests/test_serializers.py @@ -1,115 +1,119 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import OrderedDict from datetime import datetime, timedelta, timezone import itertools from typing import Iterable import pytest from swh.journal import serializers from swh.model.tests.swh_model_data import TEST_OBJECTS def test_key_to_kafka_repeatable(): """Check the kafka key encoding is repeatable""" base_dict = { "a": "foo", "b": "bar", "c": "baz", } key = serializers.key_to_kafka(base_dict) for dict_keys in itertools.permutations(base_dict): d = OrderedDict() for k in dict_keys: d[k] = base_dict[k] assert key == serializers.key_to_kafka(d) def test_pprint_key(): """Test whether get_key works on all our objects""" for object_type, objects in TEST_OBJECTS.items(): for obj in objects: key = obj.unique_key() pprinted_key = serializers.pprint_key(key) assert isinstance(pprinted_key, str) if isinstance(key, dict): assert pprinted_key[0], pprinted_key[-1] == "{}" for dict_key in key.keys(): assert f"{dict_key}:" in pprinted_key if isinstance(key, bytes): assert pprinted_key == key.hex() def test_kafka_to_key(): - """Standard back and forth serialization with keys - - """ + """Standard back and forth serialization with keys""" # All KeyType(s) keys: Iterable[serializers.KeyType] = [ - {"a": "foo", "b": "bar", "c": "baz",}, - {"a": b"foobarbaz",}, + { + "a": "foo", + "b": "bar", + "c": "baz", + }, + { + "a": b"foobarbaz", + }, b"foo", ] for object_type, objects in TEST_OBJECTS.items(): for obj in objects: key = obj.unique_key() keys.append(key) for key in keys: ktk = serializers.key_to_kafka(key) v = serializers.kafka_to_key(ktk) assert v == key # limits of supported int values by msgpack -MININT = -(2 ** 63) -MAXINT = 2 ** 64 - 1 +MININT = -(2**63) +MAXINT = 2**64 - 1 intvalues = [ MININT * 2, MININT - 1, MININT, MININT + 1, -10, 0, 10, MAXINT - 1, MAXINT, MAXINT + 1, MAXINT * 2, ] @pytest.mark.parametrize("value", intvalues) def test_encode_int(value): assert serializers.kafka_to_value(serializers.value_to_kafka(value)) == value datevalues = [ datetime.now(tz=timezone.utc), datetime.now(tz=timezone(timedelta(hours=-23, minutes=-59))), datetime.now(tz=timezone(timedelta(hours=23, minutes=59))), datetime(1, 1, 1, 1, 1, tzinfo=timezone.utc), datetime(2100, 1, 1, 1, 1, tzinfo=timezone.utc), ] @pytest.mark.parametrize("value", datevalues) def test_encode_datetime(value): assert serializers.kafka_to_value(serializers.value_to_kafka(value)) == value @pytest.mark.parametrize("value", datevalues) def test_encode_datetime_bw(value): bwdate = {b"swhtype": "datetime", b"d": value.isoformat()} assert serializers.kafka_to_value(serializers.value_to_kafka(bwdate)) == value diff --git a/swh/journal/tests/test_stream.py b/swh/journal/tests/test_stream.py index c9bfc90..fb74505 100644 --- a/swh/journal/tests/test_stream.py +++ b/swh/journal/tests/test_stream.py @@ -1,47 +1,79 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import io +from typing import Dict, List, Tuple -import msgpack - -from swh.journal.serializers import msgpack_ext_hook +from swh.journal.serializers import kafka_stream_to_value from swh.journal.writer import get_journal_writer, model_object_dict_sanitizer +from swh.journal.writer.interface import JournalWriterInterface from swh.model.tests.swh_model_data import TEST_OBJECTS -def test_write_additions_with_test_objects(): - outs = io.BytesIO() - - writer = get_journal_writer( - cls="stream", value_sanitizer=model_object_dict_sanitizer, output_stream=outs, - ) +def fill_writer(writer: JournalWriterInterface) -> List[Tuple[str, Dict]]: expected = [] - - n = 0 for object_type, objects in TEST_OBJECTS.items(): writer.write_additions(object_type, objects) for object in objects: objd = object.to_dict() if object_type == "content": objd.pop("data") expected.append((object_type, objd)) - n += len(objects) + writer.flush() + return expected + + +def test_stream_journal_writer_stream(): + outs = io.BytesIO() + + writer = get_journal_writer( + cls="stream", + value_sanitizer=model_object_dict_sanitizer, + output_stream=outs, + ) + expected = fill_writer(writer) outs.seek(0, 0) - unpacker = msgpack.Unpacker( - outs, - raw=False, - ext_hook=msgpack_ext_hook, - strict_map_key=False, - use_list=False, - timestamp=3, # convert Timestamp in datetime objects (tz UTC) + unpacker = kafka_stream_to_value(outs) + for i, (objtype, objd) in enumerate(unpacker, start=1): + assert (objtype, objd) in expected + assert len(expected) == i + + +def test_stream_journal_writer_filename(tmp_path): + out_fname = str(tmp_path / "journal.msgpack") + + writer = get_journal_writer( + cls="stream", + value_sanitizer=model_object_dict_sanitizer, + output_stream=out_fname, ) + expected = fill_writer(writer) + + with open(out_fname, "rb") as outs: + unpacker = kafka_stream_to_value(outs) + for i, (objtype, objd) in enumerate(unpacker, start=1): + assert (objtype, objd) in expected + assert len(expected) == i + + +def test_stream_journal_writer_stdout(capfdbinary): + writer = get_journal_writer( + cls="stream", + value_sanitizer=model_object_dict_sanitizer, + output_stream="-", + ) + expected = fill_writer(writer) + + captured = capfdbinary.readouterr() + assert captured.err == b"" + outs = io.BytesIO(captured.out) + unpacker = kafka_stream_to_value(outs) for i, (objtype, objd) in enumerate(unpacker, start=1): assert (objtype, objd) in expected assert len(expected) == i diff --git a/swh/journal/writer/__init__.py b/swh/journal/writer/__init__.py index 662fa80..bf82eab 100644 --- a/swh/journal/writer/__init__.py +++ b/swh/journal/writer/__init__.py @@ -1,62 +1,66 @@ -# Copyright (C) 2019-2021 The Software Heritage developers +# Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Dict, Optional, TypeVar +import os +import sys +from typing import Any, BinaryIO, Dict, Type import warnings -from typing_extensions import Protocol - -from swh.model.model import KeyType - -TSelf = TypeVar("TSelf") - - -class ValueProtocol(Protocol): - def anonymize(self: TSelf) -> Optional[TSelf]: - ... - - def unique_key(self) -> KeyType: - ... - - def to_dict(self) -> Dict[str, Any]: - ... +from .interface import JournalWriterInterface def model_object_dict_sanitizer( object_type: str, object_dict: Dict[str, Any] ) -> Dict[str, str]: object_dict = object_dict.copy() if object_type == "content": object_dict.pop("data", None) return object_dict -def get_journal_writer(cls, **kwargs): +def get_journal_writer(cls, **kwargs) -> JournalWriterInterface: + if "args" in kwargs: warnings.warn( 'Explicit "args" key is deprecated, use keys directly instead.', DeprecationWarning, ) kwargs = kwargs["args"] kwargs.setdefault("value_sanitizer", model_object_dict_sanitizer) if cls == "inmemory": # FIXME: Remove inmemory in due time warnings.warn( "cls = 'inmemory' is deprecated, use 'memory' instead", DeprecationWarning ) cls = "memory" + + JournalWriter: Type[JournalWriterInterface] if cls == "memory": - from .inmemory import InMemoryJournalWriter as JournalWriter + from .inmemory import InMemoryJournalWriter + + JournalWriter = InMemoryJournalWriter elif cls == "kafka": - from .kafka import KafkaJournalWriter as JournalWriter + from .kafka import KafkaJournalWriter + + JournalWriter = KafkaJournalWriter elif cls == "stream": - from .stream import StreamJournalWriter as JournalWriter + from .stream import StreamJournalWriter + + JournalWriter = StreamJournalWriter assert "output_stream" in kwargs + outstream: BinaryIO + if kwargs["output_stream"] in ("-", b"-"): + outstream = os.fdopen(sys.stdout.fileno(), "wb", closefd=False) + elif isinstance(kwargs["output_stream"], (str, bytes)): + outstream = open(kwargs["output_stream"], "wb") + else: + outstream = kwargs["output_stream"] + kwargs["output_stream"] = outstream else: raise ValueError("Unknown journal writer class `%s`" % cls) return JournalWriter(**kwargs) diff --git a/swh/journal/writer/inmemory.py b/swh/journal/writer/inmemory.py index 69f63f8..909ff46 100644 --- a/swh/journal/writer/inmemory.py +++ b/swh/journal/writer/inmemory.py @@ -1,45 +1,48 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from multiprocessing import Manager -from typing import Any, Callable, Dict, Generic, List, Tuple, TypeVar +from typing import Any, Callable, Dict, Iterable, List, Tuple -from . import ValueProtocol +from .interface import ValueProtocol logger = logging.getLogger(__name__) -TValue = TypeVar("TValue", bound=ValueProtocol) - - -class InMemoryJournalWriter(Generic[TValue]): - objects: List[Tuple[str, TValue]] - privileged_objects: List[Tuple[str, TValue]] +class InMemoryJournalWriter: + objects: List[Tuple[str, ValueProtocol]] + privileged_objects: List[Tuple[str, ValueProtocol]] def __init__( - self, value_sanitizer: Callable[[str, Dict[str, Any]], Dict[str, Any]] + self, + value_sanitizer: Callable[[str, Dict[str, Any]], Dict[str, Any]], + anonymize: bool = False, ): # Share the list of objects across processes, for RemoteAPI tests. self.manager = Manager() self.objects = self.manager.list() self.privileged_objects = self.manager.list() + self.anonymize = anonymize - def write_addition( - self, object_type: str, object_: TValue, privileged: bool = False - ) -> None: + def write_addition(self, object_type: str, object_: ValueProtocol) -> None: object_.unique_key() # Check this does not error, to mimic the kafka writer - if privileged: + anon_object_ = None + if self.anonymize: + anon_object_ = object_.anonymize() + if anon_object_ is not None: self.privileged_objects.append((object_type, object_)) + self.objects.append((object_type, anon_object_)) else: self.objects.append((object_type, object_)) - write_update = write_addition - def write_additions( - self, object_type: str, objects: List[TValue], privileged: bool = False + self, object_type: str, objects: Iterable[ValueProtocol] ) -> None: for object_ in objects: - self.write_addition(object_type, object_, privileged) + self.write_addition(object_type, object_) + + def flush(self) -> None: + pass diff --git a/swh/journal/writer/interface.py b/swh/journal/writer/interface.py new file mode 100644 index 0000000..c357137 --- /dev/null +++ b/swh/journal/writer/interface.py @@ -0,0 +1,40 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Any, Dict, Iterable, Optional, TypeVar + +from typing_extensions import Protocol, runtime_checkable + +from swh.model.model import KeyType + +TSelf = TypeVar("TSelf") + + +class ValueProtocol(Protocol): + def anonymize(self: TSelf) -> Optional[TSelf]: + ... + + def unique_key(self) -> KeyType: + ... + + def to_dict(self) -> Dict[str, Any]: + ... + + +@runtime_checkable +class JournalWriterInterface(Protocol): + def write_addition(self, object_type: str, object_: ValueProtocol) -> None: + """Add a SWH object of type object_type in the journal.""" + ... + + def write_additions( + self, object_type: str, objects: Iterable[ValueProtocol] + ) -> None: + """Add a list of SWH objects of type object_type in the journal.""" + ... + + def flush(self) -> None: + """Flush the pending object additions in the backend, if any.""" + ... diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py index 2434146..0fbb3ae 100644 --- a/swh/journal/writer/kafka.py +++ b/swh/journal/writer/kafka.py @@ -1,273 +1,261 @@ -# Copyright (C) 2019-2020 The Software Heritage developers +# Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import time -from typing import ( - Any, - Callable, - Dict, - Generic, - Iterable, - List, - NamedTuple, - Optional, - Type, - TypeVar, -) +from typing import Any, Callable, Dict, Iterable, List, NamedTuple, Optional, Type from confluent_kafka import KafkaException, Producer from swh.journal.serializers import KeyType, key_to_kafka, pprint_key, value_to_kafka -from . import ValueProtocol +from .interface import ValueProtocol logger = logging.getLogger(__name__) class DeliveryTag(NamedTuple): """Unique tag allowing us to check for a message delivery""" topic: str kafka_key: bytes class DeliveryFailureInfo(NamedTuple): """Verbose information for failed deliveries""" object_type: str key: KeyType message: str code: str def get_object_type(topic: str) -> str: """Get the object type from a topic string""" return topic.rsplit(".", 1)[-1] class KafkaDeliveryError(Exception): """Delivery failed on some kafka messages.""" def __init__(self, message: str, delivery_failures: Iterable[DeliveryFailureInfo]): self.message = message self.delivery_failures = list(delivery_failures) def pretty_failures(self) -> str: return ", ".join( f"{f.object_type} {pprint_key(f.key)} ({f.message})" for f in self.delivery_failures ) def __str__(self): return f"KafkaDeliveryError({self.message}, [{self.pretty_failures()}])" -TValue = TypeVar("TValue", bound=ValueProtocol) - - -class KafkaJournalWriter(Generic[TValue]): - """This class is used to write serialized versions of value objects to a - series of Kafka topics. The type parameter `TValue`, which must implement the - `ValueProtocol`, is the type of values this writer will write. - Typically, `TValue` will be `swh.model.model.BaseModel`. +class KafkaJournalWriter: + """This class is used to write serialized versions of value objects to a series + of Kafka topics. The type parameter of value objects, which must implement + the `ValueProtocol`, is the type of values this writer will write. + Typically, `ValueProtocol` will be `swh.model.model.BaseModel`. Topics used to send objects representations are built from a ``prefix`` plus the type of the object: ``{prefix}.{object_type}`` Objects can be sent as is, or can be anonymized. The anonymization feature, when activated, will write anonymized versions of value objects in the main topic, and stock (non-anonymized) objects will be sent to a dedicated (privileged) set of topics: ``{prefix}_privileged.{object_type}`` The anonymization of a value object is the result of calling its ``anonymize()`` method. An object is considered anonymizable if this method returns a (non-None) value. Args: brokers: list of broker addresses and ports. prefix: the prefix used to build the topic names for objects. client_id: the id of the writer sent to kafka. value_sanitizer: a function that takes the object type and the dict representation of an object as argument, and returns an other dict that should be actually stored in the journal (eg. removing keys that do no belong there) producer_config: extra configuration keys passed to the `Producer`. flush_timeout: timeout, in seconds, after which the `flush` operation will fail if some message deliveries are still pending. producer_class: override for the kafka producer class. anonymize: if True, activate the anonymization feature. """ def __init__( self, brokers: Iterable[str], prefix: str, client_id: str, value_sanitizer: Callable[[str, Dict[str, Any]], Dict[str, Any]], producer_config: Optional[Dict] = None, flush_timeout: float = 120, producer_class: Type[Producer] = Producer, anonymize: bool = False, ): self._prefix = prefix self._prefix_privileged = f"{self._prefix}_privileged" self.anonymize = anonymize if not producer_config: producer_config = {} if "message.max.bytes" not in producer_config: producer_config = { "message.max.bytes": 100 * 1024 * 1024, **producer_config, } self.producer = producer_class( { "bootstrap.servers": ",".join(brokers), "client.id": client_id, "on_delivery": self._on_delivery, "error_cb": self._error_cb, "logger": logger, "acks": "all", **producer_config, } ) # Delivery management self.flush_timeout = flush_timeout # delivery tag -> original object "key" mapping self.deliveries_pending: Dict[DeliveryTag, KeyType] = {} # List of (object_type, key, error_msg, error_name) for failed deliveries self.delivery_failures: List[DeliveryFailureInfo] = [] self.value_sanitizer = value_sanitizer def _error_cb(self, error): if error.fatal(): raise KafkaException(error) logger.info("Received non-fatal kafka error: %s", error) def _on_delivery(self, error, message): (topic, key) = delivery_tag = DeliveryTag(message.topic(), message.key()) sent_key = self.deliveries_pending.pop(delivery_tag, None) if error is not None: self.delivery_failures.append( DeliveryFailureInfo( get_object_type(topic), sent_key, error.str(), error.name() ) ) def send(self, topic: str, key: KeyType, value): kafka_key = key_to_kafka(key) max_attempts = 5 last_exception: Optional[Exception] = None for attempt in range(max_attempts): try: self.producer.produce( - topic=topic, key=kafka_key, value=value_to_kafka(value), + topic=topic, + key=kafka_key, + value=value_to_kafka(value), ) except BufferError as e: last_exception = e wait = 1 + 3 * attempt if logger.isEnabledFor(logging.DEBUG): # pprint_key is expensive logger.debug( "BufferError producing %s %s; waiting for %ss", get_object_type(topic), pprint_key(kafka_key), wait, ) self.producer.poll(wait) else: self.deliveries_pending[DeliveryTag(topic, kafka_key)] = key return # We reach this point if all delivery attempts have failed self.delivery_failures.append( DeliveryFailureInfo( get_object_type(topic), key, str(last_exception), "SWH_BUFFER_ERROR" ) ) def delivery_error(self, message) -> KafkaDeliveryError: """Get all failed deliveries, and clear them""" ret = self.delivery_failures self.delivery_failures = [] while self.deliveries_pending: delivery_tag, orig_key = self.deliveries_pending.popitem() (topic, kafka_key) = delivery_tag ret.append( DeliveryFailureInfo( get_object_type(topic), orig_key, "No delivery before flush() timeout", "SWH_FLUSH_TIMEOUT", ) ) return KafkaDeliveryError(message, ret) - def flush(self): + def flush(self) -> None: start = time.monotonic() self.producer.flush(self.flush_timeout) while self.deliveries_pending: if time.monotonic() - start > self.flush_timeout: break self.producer.poll(0.1) if self.deliveries_pending: # Delivery timeout raise self.delivery_error( "flush() exceeded timeout (%ss)" % self.flush_timeout, ) elif self.delivery_failures: raise self.delivery_error("Failed deliveries after flush()") - def _write_addition(self, object_type: str, object_: TValue) -> None: + def _write_addition(self, object_type: str, object_: ValueProtocol) -> None: """Write a single object to the journal""" key = object_.unique_key() if self.anonymize: anon_object_ = object_.anonymize() if anon_object_: # can be either None, or an anonymized object # if the object is anonymizable, send the non-anonymized version in the # privileged channel topic = f"{self._prefix_privileged}.{object_type}" dict_ = self.value_sanitizer(object_type, object_.to_dict()) logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) self.send(topic, key=key, value=dict_) object_ = anon_object_ topic = f"{self._prefix}.{object_type}" dict_ = self.value_sanitizer(object_type, object_.to_dict()) logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) self.send(topic, key=key, value=dict_) - def write_addition(self, object_type: str, object_: TValue) -> None: + def write_addition(self, object_type: str, object_: ValueProtocol) -> None: """Write a single object to the journal""" self._write_addition(object_type, object_) self.flush() - write_update = write_addition - - def write_additions(self, object_type: str, objects: Iterable[TValue]) -> None: + def write_additions( + self, object_type: str, objects: Iterable[ValueProtocol] + ) -> None: """Write a set of objects to the journal""" for object_ in objects: self._write_addition(object_type, object_) self.flush() diff --git a/swh/journal/writer/stream.py b/swh/journal/writer/stream.py index 202e13c..f796295 100644 --- a/swh/journal/writer/stream.py +++ b/swh/journal/writer/stream.py @@ -1,47 +1,43 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging -from typing import Any, BinaryIO, Callable, Dict, Generic, List, TypeVar +from typing import Any, BinaryIO, Callable, Dict, Iterable from swh.journal.serializers import value_to_kafka -from . import ValueProtocol +from .interface import ValueProtocol logger = logging.getLogger(__name__) -TValue = TypeVar("TValue", bound=ValueProtocol) - - -class StreamJournalWriter(Generic[TValue]): +class StreamJournalWriter: """A simple JournalWriter which serializes objects in a stream Might be used to serialize a storage in a file to generate a test dataset. """ def __init__( self, output_stream: BinaryIO, value_sanitizer: Callable[[str, Dict[str, Any]], Dict[str, Any]], ): # Share the list of objects across processes, for RemoteAPI tests. self.output = output_stream self.value_sanitizer = value_sanitizer - def write_addition( - self, object_type: str, object_: TValue, privileged: bool = False - ) -> None: + def write_addition(self, object_type: str, object_: ValueProtocol) -> None: object_.unique_key() # Check this does not error, to mimic the kafka writer dict_ = self.value_sanitizer(object_type, object_.to_dict()) self.output.write(value_to_kafka((object_type, dict_))) - write_update = write_addition - def write_additions( - self, object_type: str, objects: List[TValue], privileged: bool = False + self, object_type: str, objects: Iterable[ValueProtocol] ) -> None: for object_ in objects: - self.write_addition(object_type, object_, privileged) + self.write_addition(object_type, object_) + + def flush(self) -> None: + self.output.flush() diff --git a/tox.ini b/tox.ini index 4f23cf2..23f61fe 100644 --- a/tox.ini +++ b/tox.ini @@ -1,75 +1,76 @@ [tox] envlist=black,flake8,mypy,py3 [testenv] extras = testing deps = pytest-cov dev: pdbpp commands = pytest --cov={envsitepackagesdir}/swh/journal \ {envsitepackagesdir}/swh/journal \ --cov-branch \ --doctest-modules {posargs} [testenv:black] skip_install = true deps = - black==19.10b0 + black==22.3.0 commands = {envpython} -m black --check swh [testenv:flake8] skip_install = true deps = git+https://github.com/PyCQA/pyflakes.git - flake8 + flake8==4.0.1 + flake8-bugbear==22.3.23 commands = {envpython} -m flake8 [testenv:mypy] extras = testing deps = mypy==0.920 commands = mypy swh # build documentation outside swh-environment using the current # git HEAD of swh-docs, is executed on CI for each diff to prevent # breaking doc build [testenv:sphinx] whitelist_externals = make usedevelop = true extras = testing deps = # fetch and install swh-docs in develop mode -e git+https://forge.softwareheritage.org/source/swh-docs#egg=swh.docs setenv = SWH_PACKAGE_DOC_TOX_BUILD = 1 # turn warnings into errors SPHINXOPTS = -W commands = make -I ../.tox/sphinx/src/swh-docs/swh/ -C docs # build documentation only inside swh-environment using local state # of swh-docs package [testenv:sphinx-dev] whitelist_externals = make usedevelop = true extras = testing deps = # install swh-docs in develop mode -e ../swh-docs setenv = SWH_PACKAGE_DOC_TOX_BUILD = 1 # turn warnings into errors SPHINXOPTS = -W commands = make -I ../.tox/sphinx-dev/src/swh-docs/swh/ -C docs