diff --git a/PKG-INFO b/PKG-INFO index 903c32d..226048f 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,70 +1,71 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.0.30 +Version: 0.0.31 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal Description: swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable +Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/requirements-test.txt b/requirements-test.txt index 7f2dc48..c727717 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,4 +1,3 @@ pytest swh.model >= 0.0.34 -pytest-kafka hypothesis diff --git a/setup.py b/setup.py index 78e49ef..c4dab38 100755 --- a/setup.py +++ b/setup.py @@ -1,71 +1,74 @@ #!/usr/bin/env python3 -# Copyright (C) 2015-2018 The Software Heritage developers +# Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from setuptools import setup, find_packages from os import path from io import open here = path.abspath(path.dirname(__file__)) # Get the long description from the README file with open(path.join(here, "README.md"), encoding="utf-8") as f: long_description = f.read() def parse_requirements(name=None): if name: reqf = "requirements-%s.txt" % name else: reqf = "requirements.txt" requirements = [] if not path.exists(reqf): return requirements with open(reqf) as f: for line in f.readlines(): line = line.strip() if not line or line.startswith("#"): continue requirements.append(line) return requirements setup( name="swh.journal", description="Software Heritage Journal utilities", long_description=long_description, long_description_content_type="text/markdown", + python_requires=">=3.7", author="Software Heritage developers", author_email="swh-devel@inria.fr", url="https://forge.softwareheritage.org/diffusion/DJNL/", packages=find_packages(), scripts=[], entry_points=""" [console_scripts] swh-journal=swh.journal.cli:main [swh.cli.subcommands] journal=swh.journal.cli:cli + [pytest11] + pytest_swh_journal = swh.journal.pytest_plugin """, install_requires=parse_requirements() + parse_requirements("swh"), setup_requires=["vcversioner"], extras_require={"testing": parse_requirements("test")}, vcversioner={}, include_package_data=True, classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 5 - Production/Stable", ], project_urls={ "Bug Reports": "https://forge.softwareheritage.org/maniphest", "Funding": "https://www.softwareheritage.org/donate", "Source": "https://forge.softwareheritage.org/source/swh-journal", }, ) diff --git a/swh.journal.egg-info/PKG-INFO b/swh.journal.egg-info/PKG-INFO index 903c32d..226048f 100644 --- a/swh.journal.egg-info/PKG-INFO +++ b/swh.journal.egg-info/PKG-INFO @@ -1,70 +1,71 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.0.30 +Version: 0.0.31 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal Description: swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable +Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.journal.egg-info/SOURCES.txt b/swh.journal.egg-info/SOURCES.txt index 702febf..219568a 100644 --- a/swh.journal.egg-info/SOURCES.txt +++ b/swh.journal.egg-info/SOURCES.txt @@ -1,40 +1,42 @@ MANIFEST.in Makefile README.md pyproject.toml requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py version.txt swh/__init__.py swh.journal.egg-info/PKG-INFO swh.journal.egg-info/SOURCES.txt swh.journal.egg-info/dependency_links.txt swh.journal.egg-info/entry_points.txt swh.journal.egg-info/requires.txt swh.journal.egg-info/top_level.txt swh/journal/__init__.py swh/journal/backfill.py swh/journal/cli.py swh/journal/client.py swh/journal/direct_writer.py swh/journal/fixer.py swh/journal/py.typed +swh/journal/pytest_plugin.py swh/journal/replay.py swh/journal/serializers.py swh/journal/tests/__init__.py swh/journal/tests/conftest.py +swh/journal/tests/journal_data.py swh/journal/tests/log4j.properties swh/journal/tests/test_backfill.py swh/journal/tests/test_cli.py swh/journal/tests/test_client.py swh/journal/tests/test_kafka_writer.py swh/journal/tests/test_replay.py swh/journal/tests/test_serializers.py swh/journal/tests/test_write_replay.py swh/journal/tests/utils.py swh/journal/writer/__init__.py swh/journal/writer/inmemory.py swh/journal/writer/kafka.py \ No newline at end of file diff --git a/swh.journal.egg-info/entry_points.txt b/swh.journal.egg-info/entry_points.txt index ff07b75..e00a011 100644 --- a/swh.journal.egg-info/entry_points.txt +++ b/swh.journal.egg-info/entry_points.txt @@ -1,6 +1,8 @@ [console_scripts] swh-journal=swh.journal.cli:main [swh.cli.subcommands] journal=swh.journal.cli:cli + [pytest11] + pytest_swh_journal = swh.journal.pytest_plugin \ No newline at end of file diff --git a/swh.journal.egg-info/requires.txt b/swh.journal.egg-info/requires.txt index 4a6da92..276c16f 100644 --- a/swh.journal.egg-info/requires.txt +++ b/swh.journal.egg-info/requires.txt @@ -1,13 +1,12 @@ confluent-kafka msgpack tenacity vcversioner swh.core[db,http]>=0.0.60 swh.model>=0.0.61 swh.storage>=0.0.181 [testing] pytest swh.model>=0.0.34 -pytest-kafka hypothesis diff --git a/swh/journal/cli.py b/swh/journal/cli.py index 8928137..d88bfa8 100644 --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -1,255 +1,265 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import logging import mmap import os +import warnings import click try: from systemd.daemon import notify except ImportError: notify = None from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.model.model import SHA1_SIZE from swh.storage import get_storage from swh.objstorage import get_objstorage -from swh.journal.client import JournalClient +from swh.journal.client import get_journal_client as get_client from swh.journal.replay import is_hash_in_bytearray from swh.journal.replay import process_replay_objects from swh.journal.replay import process_replay_objects_content from swh.journal.backfill import JournalBackfiller @click.group(name="journal", context_settings=CONTEXT_SETTINGS) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.", ) @click.pass_context def cli(ctx, config_file): """Software Heritage Journal tools. The journal is a persistent logger of changes to the archive, with publish-subscribe support. """ if not config_file: config_file = os.environ.get("SWH_CONFIG_FILENAME") if config_file: if not os.path.exists(config_file): raise ValueError("%s does not exist" % config_file) conf = config.read(config_file) else: conf = {} ctx.ensure_object(dict) ctx.obj["config"] = conf def get_journal_client(ctx, **kwargs): - conf = ctx.obj["config"].get("journal", {}) - conf.update({k: v for (k, v) in kwargs.items() if v not in (None, ())}) - if not conf.get("brokers"): - ctx.fail("You must specify at least one kafka broker.") - if not isinstance(conf["brokers"], (list, tuple)): - conf["brokers"] = [conf["brokers"]] - return JournalClient(**conf) + conf = ctx.obj["config"].copy() + if "journal" in conf: + warnings.warn( + "Journal client configuration should now be under the " + "`journal_client` field and have a `cls` argument.", + DeprecationWarning, + ) + conf["journal_client"] = {"cls": "kafka", **conf.pop("journal")} + + client_conf = conf.get("journal_client").copy() + client_conf.update(kwargs) + + try: + return get_client(**client_conf) + except ValueError as exc: + ctx.fail(exc) @cli.command() @click.option( "--stop-after-objects", "-n", default=None, type=int, help="Stop after processing this many objects. Default is to " "run forever.", ) @click.pass_context def replay(ctx, stop_after_objects): """Fill a Storage by reading a Journal. There can be several 'replayers' filling a Storage as long as they use the same `group-id`. """ conf = ctx.obj["config"] try: storage = get_storage(**conf.pop("storage")) except KeyError: ctx.fail("You must have a storage configured in your config file.") client = get_journal_client(ctx, stop_after_objects=stop_after_objects) worker_fn = functools.partial(process_replay_objects, storage=storage) if notify: notify("READY=1") try: client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: if notify: notify("STOPPING=1") client.close() @cli.command() @click.argument("object_type") @click.option("--start-object", default=None) @click.option("--end-object", default=None) @click.option("--dry-run", is_flag=True, default=False) @click.pass_context def backfiller(ctx, object_type, start_object, end_object, dry_run): """Run the backfiller The backfiller list objects from a Storage and produce journal entries from there. Typically used to rebuild a journal or compensate for missing objects in a journal (eg. due to a downtime of this later). The configuration file requires the following entries: - brokers: a list of kafka endpoints (the journal) in which entries will be added. - storage_dbconn: URL to connect to the storage DB. - prefix: the prefix of the topics (topics will be .). - client_id: the kafka client ID. """ conf = ctx.obj["config"] backfiller = JournalBackfiller(conf) if notify: notify("READY=1") try: backfiller.run( object_type=object_type, start_object=start_object, end_object=end_object, dry_run=dry_run, ) except KeyboardInterrupt: if notify: notify("STOPPING=1") ctx.exit(0) @cli.command("content-replay") @click.option( "--stop-after-objects", "-n", default=None, type=int, help="Stop after processing this many objects. Default is to " "run forever.", ) @click.option( "--exclude-sha1-file", default=None, type=click.File("rb"), help="File containing a sorted array of hashes to be excluded.", ) @click.option( "--check-dst/--no-check-dst", default=True, help="Check whether the destination contains the object before " "copying.", ) @click.pass_context def content_replay(ctx, stop_after_objects, exclude_sha1_file, check_dst): """Fill a destination Object Storage (typically a mirror) by reading a Journal and retrieving objects from an existing source ObjStorage. There can be several 'replayers' filling a given ObjStorage as long as they use the same `group-id`. You can use the `KAFKA_GROUP_INSTANCE_ID` environment variable to use KIP-345 static group membership. This service retrieves object ids to copy from the 'content' topic. It will only copy object's content if the object's description in the kafka nmessage has the status:visible set. `--exclude-sha1-file` may be used to exclude some hashes to speed-up the replay in case many of the contents are already in the destination objstorage. It must contain a concatenation of all (sha1) hashes, and it must be sorted. This file will not be fully loaded into memory at any given time, so it can be arbitrarily large. `--check-dst` sets whether the replayer should check in the destination ObjStorage before copying an object. You can turn that off if you know you're copying to an empty ObjStorage. """ conf = ctx.obj["config"] try: objstorage_src = get_objstorage(**conf.pop("objstorage_src")) except KeyError: ctx.fail("You must have a source objstorage configured in " "your config file.") try: objstorage_dst = get_objstorage(**conf.pop("objstorage_dst")) except KeyError: ctx.fail( "You must have a destination objstorage configured " "in your config file." ) if exclude_sha1_file: map_ = mmap.mmap(exclude_sha1_file.fileno(), 0, prot=mmap.PROT_READ) if map_.size() % SHA1_SIZE != 0: ctx.fail( "--exclude-sha1 must link to a file whose size is an " "exact multiple of %d bytes." % SHA1_SIZE ) nb_excluded_hashes = int(map_.size() / SHA1_SIZE) def exclude_fn(obj): return is_hash_in_bytearray(obj["sha1"], map_, nb_excluded_hashes) else: exclude_fn = None client = get_journal_client( ctx, stop_after_objects=stop_after_objects, object_types=("content",) ) worker_fn = functools.partial( process_replay_objects_content, src=objstorage_src, dst=objstorage_dst, exclude_fn=exclude_fn, check_dst=check_dst, ) if notify: notify("READY=1") try: client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: if notify: notify("STOPPING=1") client.close() def main(): logging.basicConfig() return cli(auto_envvar_prefix="SWH_JOURNAL") if __name__ == "__main__": main() diff --git a/swh/journal/client.py b/swh/journal/client.py index 0a9d3d1..aa0d2c3 100644 --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -1,281 +1,291 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import logging import os import time from typing import Any, Dict, List, Optional, Set, Tuple, Union from confluent_kafka import Consumer, KafkaException, KafkaError from .serializers import kafka_to_value from swh.journal import DEFAULT_PREFIX logger = logging.getLogger(__name__) rdkafka_logger = logging.getLogger(__name__ + ".rdkafka") # Only accepted offset reset policy accepted ACCEPTED_OFFSET_RESET = ["earliest", "latest"] # Only accepted object types ACCEPTED_OBJECT_TYPES = [ "content", "directory", "revision", "release", "snapshot", "origin", "origin_visit", ] # Errors that Kafka raises too often and are not useful; therefore they # we lower their log level to DEBUG instead of INFO. _SPAMMY_ERRORS = [ KafkaError._NO_OFFSET, ] +def get_journal_client(cls: str, **kwargs: Any): + """Factory function to instantiate a journal client object. + + Currently, only the "kafka" journal client is supported. + """ + if cls == "kafka": + return JournalClient(**kwargs) + raise ValueError("Unknown journal client class `%s`" % cls) + + def _error_cb(error): if error.fatal(): raise KafkaException(error) if error.code() in _SPAMMY_ERRORS: logger.debug("Received non-fatal kafka error: %s", error) else: logger.info("Received non-fatal kafka error: %s", error) def _on_commit(error, partitions): if error is not None: _error_cb(error) class JournalClient: """A base client for the Software Heritage journal. The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each object type using a specific topic under that prefix. If the `prefix` argument is None (default value), it will take the default value `'swh.journal.objects'`. Clients subscribe to events specific to each object type as listed in the `object_types` argument (if unset, defaults to all accepted object types). Clients can be sharded by setting the `group_id` to a common value across instances. The journal will share the message throughput across the nodes sharing the same group_id. Messages are processed by the `worker_fn` callback passed to the `process` method, in batches of maximum `batch_size` messages (defaults to 200). If set, the processing stops after processing `stop_after_objects` messages in total. `stop_on_eof` stops the processing when the client has reached the end of each partition in turn. `auto_offset_reset` sets the behavior of the client when the consumer group initializes: `'earliest'` (the default) processes all objects since the inception of the topics; `''` Any other named argument is passed directly to KafkaConsumer(). """ def __init__( self, brokers: Union[str, List[str]], group_id: str, prefix: Optional[str] = None, object_types: Optional[List[str]] = None, stop_after_objects: Optional[int] = None, batch_size: int = 200, process_timeout: Optional[float] = None, auto_offset_reset: str = "earliest", stop_on_eof: bool = False, **kwargs, ): if prefix is None: prefix = DEFAULT_PREFIX if object_types is None: object_types = ACCEPTED_OBJECT_TYPES if auto_offset_reset not in ACCEPTED_OFFSET_RESET: raise ValueError( "Option 'auto_offset_reset' only accept %s, not %s" % (ACCEPTED_OFFSET_RESET, auto_offset_reset) ) for object_type in object_types: if object_type not in ACCEPTED_OBJECT_TYPES: raise ValueError( "Option 'object_types' only accepts %s, not %s." % (ACCEPTED_OBJECT_TYPES, object_type) ) if batch_size <= 0: raise ValueError("Option 'batch_size' needs to be positive") self.value_deserializer = kafka_to_value if isinstance(brokers, str): brokers = [brokers] debug_logging = rdkafka_logger.isEnabledFor(logging.DEBUG) if debug_logging and "debug" not in kwargs: kwargs["debug"] = "consumer" # Static group instance id management group_instance_id = os.environ.get("KAFKA_GROUP_INSTANCE_ID") if group_instance_id: kwargs["group.instance.id"] = group_instance_id if "group.instance.id" in kwargs: # When doing static consumer group membership, set a higher default # session timeout. The session timeout is the duration after which # the broker considers that a consumer has left the consumer group # for good, and triggers a rebalance. Considering our current # processing pattern, 10 minutes gives the consumer ample time to # restart before that happens. if "session.timeout.ms" not in kwargs: kwargs["session.timeout.ms"] = 10 * 60 * 1000 # 10 minutes if "session.timeout.ms" in kwargs: # When the session timeout is set, rdkafka requires the max poll # interval to be set to a higher value; the max poll interval is # rdkafka's way of figuring out whether the client's message # processing thread has stalled: when the max poll interval lapses # between two calls to consumer.poll(), rdkafka leaves the consumer # group and terminates the connection to the brokers. # # We default to 1.5 times the session timeout if "max.poll.interval.ms" not in kwargs: kwargs["max.poll.interval.ms"] = kwargs["session.timeout.ms"] // 2 * 3 consumer_settings = { **kwargs, "bootstrap.servers": ",".join(brokers), "auto.offset.reset": auto_offset_reset, "group.id": group_id, "on_commit": _on_commit, "error_cb": _error_cb, "enable.auto.commit": False, "logger": rdkafka_logger, } self.stop_on_eof = stop_on_eof if self.stop_on_eof: consumer_settings["enable.partition.eof"] = True logger.debug("Consumer settings: %s", consumer_settings) self.consumer = Consumer(consumer_settings) self.subscription = [ "%s.%s" % (prefix, object_type) for object_type in object_types ] self.subscribe() self.stop_after_objects = stop_after_objects self.process_timeout = process_timeout self.eof_reached: Set[Tuple[str, str]] = set() self.batch_size = batch_size self._object_types = object_types def subscribe(self): logger.debug("Upstream topics: %s", self.consumer.list_topics(timeout=10)) logger.debug("Subscribing to: %s", self.subscription) self.consumer.subscribe(topics=self.subscription) def process(self, worker_fn): """Polls Kafka for a batch of messages, and calls the worker_fn with these messages. Args: worker_fn Callable[Dict[str, List[dict]]]: Function called with the messages as argument. """ start_time = time.monotonic() total_objects_processed = 0 while True: # timeout for message poll timeout = 1.0 elapsed = time.monotonic() - start_time if self.process_timeout: # +0.01 to prevent busy-waiting on / spamming consumer.poll. # consumer.consume() returns shortly before X expired # (a matter of milliseconds), so after it returns a first # time, it would then be called with a timeout in the order # of milliseconds, therefore returning immediately, then be # called again, etc. if elapsed + 0.01 >= self.process_timeout: break timeout = self.process_timeout - elapsed batch_size = self.batch_size if self.stop_after_objects: if total_objects_processed >= self.stop_after_objects: break # clamp batch size to avoid overrunning stop_after_objects batch_size = min( self.stop_after_objects - total_objects_processed, batch_size, ) messages = self.consumer.consume(timeout=timeout, num_messages=batch_size) if not messages: continue batch_processed, at_eof = self.handle_messages(messages, worker_fn) total_objects_processed += batch_processed if at_eof: break return total_objects_processed def handle_messages(self, messages, worker_fn): objects: Dict[str, List[Any]] = defaultdict(list) nb_processed = 0 for message in messages: error = message.error() if error is not None: if error.code() == KafkaError._PARTITION_EOF: self.eof_reached.add((message.topic(), message.partition())) else: _error_cb(error) continue nb_processed += 1 object_type = message.topic().split(".")[-1] # Got a message from a topic we did not subscribe to. assert object_type in self._object_types, object_type objects[object_type].append(self.deserialize_message(message)) if objects: worker_fn(dict(objects)) self.consumer.commit() at_eof = self.stop_on_eof and all( (tp.topic, tp.partition) in self.eof_reached for tp in self.consumer.assignment() ) return nb_processed, at_eof def deserialize_message(self, message): return self.value_deserializer(message.value()) def close(self): self.consumer.close() diff --git a/swh/journal/pytest_plugin.py b/swh/journal/pytest_plugin.py new file mode 100644 index 0000000..deea93d --- /dev/null +++ b/swh/journal/pytest_plugin.py @@ -0,0 +1,153 @@ +# Copyright (C) 2019-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import random +import string + +from typing import Dict, Iterator +from collections import defaultdict + +import pytest + +from confluent_kafka import Consumer, Producer, KafkaException + +from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value +from swh.journal.tests.journal_data import TEST_OBJECTS, TEST_OBJECT_DICTS + + +def consume_messages(consumer, kafka_prefix, expected_messages): + """Consume expected_messages from the consumer; + Sort them all into a consumed_objects dict""" + consumed_messages = defaultdict(list) + + fetched_messages = 0 + retries_left = 1000 + + while fetched_messages < expected_messages: + if retries_left == 0: + raise ValueError("Timed out fetching messages from kafka") + + msg = consumer.poll(timeout=0.01) + + if not msg: + retries_left -= 1 + continue + + error = msg.error() + if error is not None: + if error.fatal(): + raise KafkaException(error) + retries_left -= 1 + continue + + fetched_messages += 1 + topic = msg.topic() + assert topic.startswith(kafka_prefix + "."), "Unexpected topic" + object_type = topic[len(kafka_prefix + ".") :] + + consumed_messages[object_type].append( + (kafka_to_key(msg.key()), kafka_to_value(msg.value())) + ) + + return consumed_messages + + +def assert_all_objects_consumed(consumed_messages): + """Check whether all objects from TEST_OBJECT_DICTS have been consumed""" + for object_type, known_values in TEST_OBJECT_DICTS.items(): + known_keys = [object_key(object_type, obj) for obj in TEST_OBJECTS[object_type]] + + (received_keys, received_values) = zip(*consumed_messages[object_type]) + + if object_type == "origin_visit": + for value in received_values: + del value["visit"] + elif object_type == "content": + for value in received_values: + del value["ctime"] + + for key in known_keys: + assert key in received_keys + + for value in known_values: + assert value in received_values + + +@pytest.fixture(scope="function") +def kafka_prefix(): + """Pick a random prefix for kafka topics on each call""" + return "".join(random.choice(string.ascii_lowercase) for _ in range(10)) + + +@pytest.fixture(scope="function") +def kafka_consumer_group(kafka_prefix: str): + """Pick a random consumer group for kafka consumers on each call""" + return "test-consumer-%s" % kafka_prefix + + +@pytest.fixture(scope="session") +def kafka_server() -> Iterator[str]: + p = Producer({"test.mock.num.brokers": "1"}) + + metadata = p.list_topics() + brokers = [str(broker) for broker in metadata.brokers.values()] + assert len(brokers) == 1, "More than one broker found in the kafka cluster?!" + + broker_connstr, broker_id = brokers[0].split("/") + ip, port_str = broker_connstr.split(":") + assert ip == "127.0.0.1" + assert int(port_str) + + yield broker_connstr + + p.flush() + + +TEST_CONFIG = { + "consumer_id": "swh.journal.consumer", + "object_types": TEST_OBJECT_DICTS.keys(), + "stop_after_objects": 1, # will read 1 object and stop + "storage": {"cls": "memory", "args": {}}, +} + + +@pytest.fixture +def test_config(kafka_server: str, kafka_prefix: str): + """Test configuration needed for producer/consumer + + """ + return { + **TEST_CONFIG, + "brokers": [kafka_server], + "prefix": kafka_prefix + ".swh.journal.objects", + } + + +@pytest.fixture +def consumer( + kafka_server: str, test_config: Dict, kafka_consumer_group: str, +) -> Consumer: + """Get a connected Kafka consumer. + + """ + consumer = Consumer( + { + "bootstrap.servers": kafka_server, + "auto.offset.reset": "earliest", + "enable.auto.commit": True, + "group.id": kafka_consumer_group, + } + ) + + kafka_topics = [ + "%s.%s" % (test_config["prefix"], object_type) + for object_type in test_config["object_types"] + ] + + consumer.subscribe(kafka_topics) + + yield consumer + + consumer.close() diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py index e7169e2..ec2a3cf 100644 --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -1,358 +1,28 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import datetime -import os -import pytest import logging -import random -import string - -from confluent_kafka import Consumer -from confluent_kafka.admin import AdminClient, ConfigResource from hypothesis.strategies import one_of -from subprocess import Popen -from typing import Any, Dict, Iterator, List, Tuple - -from pathlib import Path -from pytest_kafka import ( - make_zookeeper_process, - make_kafka_server, - KAFKA_SERVER_CONFIG_TEMPLATE, - ZOOKEEPER_CONFIG_TEMPLATE, -) from swh.model import hypothesis_strategies as strategies -from swh.model.hashutil import MultiHash, hash_to_bytes - -from swh.journal.serializers import ModelObject -from swh.journal.writer.kafka import OBJECT_TYPES +# for bw compat +from swh.journal.tests.journal_data import * # noqa logger = logging.getLogger(__name__) -CONTENTS = [ - {**MultiHash.from_data(b"foo").digest(), "length": 3, "status": "visible",}, -] - -duplicate_content1 = { - "length": 4, - "sha1": hash_to_bytes("44973274ccef6ab4dfaaf86599792fa9c3fe4689"), - "sha1_git": b"another-foo", - "blake2s256": b"another-bar", - "sha256": b"another-baz", - "status": "visible", -} - -# Craft a sha1 collision -duplicate_content2 = duplicate_content1.copy() -sha1_array = bytearray(duplicate_content1["sha1_git"]) -sha1_array[0] += 1 -duplicate_content2["sha1_git"] = bytes(sha1_array) - - -DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2] - - -COMMITTERS = [ - {"fullname": b"foo", "name": b"foo", "email": b"",}, - {"fullname": b"bar", "name": b"bar", "email": b"",}, -] - -DATES = [ - { - "timestamp": {"seconds": 1234567891, "microseconds": 0,}, - "offset": 120, - "negative_utc": False, - }, - { - "timestamp": {"seconds": 1234567892, "microseconds": 0,}, - "offset": 120, - "negative_utc": False, - }, -] - -REVISIONS = [ - { - "id": hash_to_bytes("7026b7c1a2af56521e951c01ed20f255fa054238"), - "message": b"hello", - "date": DATES[0], - "committer": COMMITTERS[0], - "author": COMMITTERS[0], - "committer_date": DATES[0], - "type": "git", - "directory": b"\x01" * 20, - "synthetic": False, - "metadata": None, - "parents": [], - }, - { - "id": hash_to_bytes("368a48fe15b7db2383775f97c6b247011b3f14f4"), - "message": b"hello again", - "date": DATES[1], - "committer": COMMITTERS[1], - "author": COMMITTERS[1], - "committer_date": DATES[1], - "type": "hg", - "directory": b"\x02" * 20, - "synthetic": False, - "metadata": None, - "parents": [], - }, -] - -RELEASES = [ - { - "id": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), - "name": b"v0.0.1", - "date": { - "timestamp": {"seconds": 1234567890, "microseconds": 0,}, - "offset": 120, - "negative_utc": False, - }, - "author": COMMITTERS[0], - "target_type": "revision", - "target": b"\x04" * 20, - "message": b"foo", - "synthetic": False, - }, -] - -ORIGINS = [ - {"url": "https://somewhere.org/den/fox",}, - {"url": "https://overtherainbow.org/fox/den",}, -] - -ORIGIN_VISITS = [ - { - "origin": ORIGINS[0]["url"], - "date": "2013-05-07 04:20:39.369271+00:00", - "snapshot": None, # TODO - "status": "ongoing", # TODO - "metadata": {"foo": "bar"}, - "type": "git", - }, - { - "origin": ORIGINS[0]["url"], - "date": "2018-11-27 17:20:39+00:00", - "snapshot": None, # TODO - "status": "ongoing", # TODO - "metadata": {"baz": "qux"}, - "type": "git", - }, -] - -TEST_OBJECT_DICTS: Dict[str, List[Dict[str, Any]]] = { - "content": CONTENTS, - "revision": REVISIONS, - "release": RELEASES, - "origin": ORIGINS, - "origin_visit": ORIGIN_VISITS, -} - -MODEL_OBJECTS = {v: k for (k, v) in OBJECT_TYPES.items()} - -TEST_OBJECTS: Dict[str, List[ModelObject]] = {} - -for object_type, objects in TEST_OBJECT_DICTS.items(): - converted_objects: List[ModelObject] = [] - model = MODEL_OBJECTS[object_type] - - for (num, obj_d) in enumerate(objects): - if object_type == "origin_visit": - obj_d = {**obj_d, "visit": num} - elif object_type == "content": - obj_d = {**obj_d, "data": b"", "ctime": datetime.datetime.now()} - - converted_objects.append(model.from_dict(obj_d)) - - TEST_OBJECTS[object_type] = converted_objects - - -KAFKA_ROOT = os.environ.get("SWH_KAFKA_ROOT") -KAFKA_ROOT = KAFKA_ROOT if KAFKA_ROOT else os.path.dirname(__file__) + "/kafka" -if not os.path.exists(KAFKA_ROOT): - msg = ( - "Development error: %s must exist and target an " - "existing kafka installation" % KAFKA_ROOT - ) - raise ValueError(msg) - -KAFKA_SCRIPTS = Path(KAFKA_ROOT) / "bin" - -KAFKA_BIN = str(KAFKA_SCRIPTS / "kafka-server-start.sh") -ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / "zookeeper-server-start.sh") - -ZK_CONFIG_TEMPLATE = ZOOKEEPER_CONFIG_TEMPLATE + "\nadmin.enableServer=false\n" -KAFKA_CONFIG_TEMPLATE = KAFKA_SERVER_CONFIG_TEMPLATE + "\nmessage.max.bytes=104857600\n" - -# Those defines fixtures -zookeeper_proc = make_zookeeper_process( - ZOOKEEPER_BIN, zk_config_template=ZK_CONFIG_TEMPLATE, scope="session" -) -os.environ[ - "KAFKA_LOG4J_OPTS" -] = "-Dlog4j.configuration=file:%s/log4j.properties" % os.path.dirname(__file__) -session_kafka_server = make_kafka_server( - KAFKA_BIN, - "zookeeper_proc", - kafka_config_template=KAFKA_CONFIG_TEMPLATE, - scope="session", -) - -kafka_logger = logging.getLogger("kafka") -kafka_logger.setLevel(logging.WARN) - - -@pytest.fixture(scope="function") -def kafka_prefix(): - """Pick a random prefix for kafka topics on each call""" - return "".join(random.choice(string.ascii_lowercase) for _ in range(10)) - - -@pytest.fixture(scope="function") -def kafka_consumer_group(kafka_prefix: str): - """Pick a random consumer group for kafka consumers on each call""" - return "test-consumer-%s" % kafka_prefix - - -@pytest.fixture(scope="session") -def kafka_admin_client(session_kafka_server: Tuple[Popen, int]) -> AdminClient: - return AdminClient({"bootstrap.servers": "localhost:%s" % session_kafka_server[1]}) - - -@pytest.fixture(scope="function") -def kafka_server_config_overrides() -> Dict[str, str]: - return {} - - -@pytest.fixture(scope="function") -def kafka_server( - session_kafka_server: Tuple[Popen, int], - kafka_admin_client: AdminClient, - kafka_server_config_overrides: Dict[str, str], -) -> Iterator[Tuple[Popen, int]]: - # No overrides, we can just return the original broker connection - if not kafka_server_config_overrides: - yield session_kafka_server - return - - # This is the minimal operation that the kafka_admin_client gives to - # retrieve the cluster metadata, which we need to get the numeric id of the - # broker spawned by pytest_kafka. - metadata = kafka_admin_client.list_topics("__consumer_offsets") - broker_ids = [str(broker) for broker in metadata.brokers.keys()] - assert len(broker_ids) == 1, "More than one broker found in the kafka cluster?!" - - # Pull the current broker configuration. describe_configs and alter_configs - # generate a dict containing one concurrent.future per queried - # ConfigResource, hence the use of .result() - broker = ConfigResource("broker", broker_ids[0]) - futures = kafka_admin_client.describe_configs([broker]) - original_config = futures[broker].result() - - # Gather the list of settings we need to change in the broker - # ConfigResource, and their original values in the to_restore dict - to_restore = {} - for key, new_value in kafka_server_config_overrides.items(): - if key not in original_config: - raise ValueError(f"Cannot override unknown configuration {key}") - orig_value = original_config[key].value - if orig_value == new_value: - continue - if original_config[key].is_read_only: - raise ValueError(f"Cannot override read-only configuration {key}") - - broker.set_config(key, new_value) - to_restore[key] = orig_value - - # to_restore will be empty if all the config "overrides" are equal to the - # original value. No need to wait for a config alteration if that's the - # case. The result() will raise a KafkaException if the settings change - # failed. - if to_restore: - futures = kafka_admin_client.alter_configs([broker]) - try: - futures[broker].result() - except Exception: - raise - - yield session_kafka_server - - # Now we can restore the old setting values. Again, the result() will raise - # a KafkaException if the settings change failed. - if to_restore: - for key, orig_value in to_restore.items(): - broker.set_config(key, orig_value) - - futures = kafka_admin_client.alter_configs([broker]) - try: - futures[broker].result() - except Exception: - raise - - -TEST_CONFIG = { - "consumer_id": "swh.journal.consumer", - "object_types": TEST_OBJECT_DICTS.keys(), - "stop_after_objects": 1, # will read 1 object and stop - "storage": {"cls": "memory", "args": {}}, -} - - -@pytest.fixture -def test_config(kafka_server: Tuple[Popen, int], kafka_prefix: str): - """Test configuration needed for producer/consumer - - """ - _, port = kafka_server - return { - **TEST_CONFIG, - "brokers": ["127.0.0.1:{}".format(port)], - "prefix": kafka_prefix + ".swh.journal.objects", - } - - -@pytest.fixture -def consumer( - kafka_server: Tuple[Popen, int], test_config: Dict, kafka_consumer_group: str, -) -> Consumer: - """Get a connected Kafka consumer. - - """ - _, kafka_port = kafka_server - consumer = Consumer( - { - "bootstrap.servers": "127.0.0.1:{}".format(kafka_port), - "auto.offset.reset": "earliest", - "enable.auto.commit": True, - "group.id": kafka_consumer_group, - } - ) - - kafka_topics = [ - "%s.%s" % (test_config["prefix"], object_type) - for object_type in test_config["object_types"] - ] - - consumer.subscribe(kafka_topics) - - yield consumer - - consumer.close() - def objects_d(): return one_of( strategies.origins().map(lambda x: ("origin", x.to_dict())), strategies.origin_visits().map(lambda x: ("origin_visit", x.to_dict())), strategies.snapshots().map(lambda x: ("snapshot", x.to_dict())), strategies.releases().map(lambda x: ("release", x.to_dict())), strategies.revisions().map(lambda x: ("revision", x.to_dict())), strategies.directories().map(lambda x: ("directory", x.to_dict())), strategies.skipped_contents().map(lambda x: ("skipped_content", x.to_dict())), strategies.present_contents().map(lambda x: ("content", x.to_dict())), ) diff --git a/swh/journal/tests/journal_data.py b/swh/journal/tests/journal_data.py new file mode 100644 index 0000000..12d0c64 --- /dev/null +++ b/swh/journal/tests/journal_data.py @@ -0,0 +1,150 @@ +# Copyright (C) 2019-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime + +from typing import Any, Dict, List + +from swh.model.hashutil import MultiHash, hash_to_bytes +from swh.journal.serializers import ModelObject +from swh.journal.writer.kafka import OBJECT_TYPES + + +CONTENTS = [ + {**MultiHash.from_data(b"foo").digest(), "length": 3, "status": "visible",}, +] + +duplicate_content1 = { + "length": 4, + "sha1": hash_to_bytes("44973274ccef6ab4dfaaf86599792fa9c3fe4689"), + "sha1_git": b"another-foo", + "blake2s256": b"another-bar", + "sha256": b"another-baz", + "status": "visible", +} + +# Craft a sha1 collision +duplicate_content2 = duplicate_content1.copy() +sha1_array = bytearray(duplicate_content1["sha1_git"]) +sha1_array[0] += 1 +duplicate_content2["sha1_git"] = bytes(sha1_array) + + +DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2] + + +COMMITTERS = [ + {"fullname": b"foo", "name": b"foo", "email": b"",}, + {"fullname": b"bar", "name": b"bar", "email": b"",}, +] + +DATES = [ + { + "timestamp": {"seconds": 1234567891, "microseconds": 0,}, + "offset": 120, + "negative_utc": False, + }, + { + "timestamp": {"seconds": 1234567892, "microseconds": 0,}, + "offset": 120, + "negative_utc": False, + }, +] + +REVISIONS = [ + { + "id": hash_to_bytes("7026b7c1a2af56521e951c01ed20f255fa054238"), + "message": b"hello", + "date": DATES[0], + "committer": COMMITTERS[0], + "author": COMMITTERS[0], + "committer_date": DATES[0], + "type": "git", + "directory": b"\x01" * 20, + "synthetic": False, + "metadata": None, + "parents": [], + }, + { + "id": hash_to_bytes("368a48fe15b7db2383775f97c6b247011b3f14f4"), + "message": b"hello again", + "date": DATES[1], + "committer": COMMITTERS[1], + "author": COMMITTERS[1], + "committer_date": DATES[1], + "type": "hg", + "directory": b"\x02" * 20, + "synthetic": False, + "metadata": None, + "parents": [], + }, +] + +RELEASES = [ + { + "id": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + "name": b"v0.0.1", + "date": { + "timestamp": {"seconds": 1234567890, "microseconds": 0,}, + "offset": 120, + "negative_utc": False, + }, + "author": COMMITTERS[0], + "target_type": "revision", + "target": b"\x04" * 20, + "message": b"foo", + "synthetic": False, + }, +] + +ORIGINS = [ + {"url": "https://somewhere.org/den/fox",}, + {"url": "https://overtherainbow.org/fox/den",}, +] + +ORIGIN_VISITS = [ + { + "origin": ORIGINS[0]["url"], + "date": "2013-05-07 04:20:39.369271+00:00", + "snapshot": None, # TODO + "status": "ongoing", # TODO + "metadata": {"foo": "bar"}, + "type": "git", + }, + { + "origin": ORIGINS[0]["url"], + "date": "2018-11-27 17:20:39+00:00", + "snapshot": None, # TODO + "status": "ongoing", # TODO + "metadata": {"baz": "qux"}, + "type": "git", + }, +] + +TEST_OBJECT_DICTS: Dict[str, List[Dict[str, Any]]] = { + "content": CONTENTS, + "revision": REVISIONS, + "release": RELEASES, + "origin": ORIGINS, + "origin_visit": ORIGIN_VISITS, +} + +MODEL_OBJECTS = {v: k for (k, v) in OBJECT_TYPES.items()} + +TEST_OBJECTS: Dict[str, List[ModelObject]] = {} + +for object_type, objects in TEST_OBJECT_DICTS.items(): + converted_objects: List[ModelObject] = [] + model = MODEL_OBJECTS[object_type] + + for (num, obj_d) in enumerate(objects): + if object_type == "origin_visit": + obj_d = {**obj_d, "visit": num} + elif object_type == "content": + obj_d = {**obj_d, "data": b"", "ctime": datetime.datetime.now()} + + converted_objects.append(model.from_dict(obj_d)) + + TEST_OBJECTS[object_type] = converted_objects diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py index eb19869..b1924d8 100644 --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -1,631 +1,655 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter import copy import functools import logging import re import tempfile -from subprocess import Popen -from typing import Any, Dict, Tuple -from unittest.mock import patch +from typing import Any, Dict +from unittest.mock import patch, MagicMock from click.testing import CliRunner from confluent_kafka import Producer import pytest import yaml from swh.model.hashutil import hash_to_hex from swh.objstorage.backends.in_memory import InMemoryObjStorage from swh.storage import get_storage -from swh.journal.cli import cli +from swh.journal.cli import cli, get_journal_client from swh.journal.replay import CONTENT_REPLAY_RETRIES from swh.journal.serializers import key_to_kafka, value_to_kafka logger = logging.getLogger(__name__) CLI_CONFIG = { "storage": {"cls": "memory",}, "objstorage_src": {"cls": "mocked", "name": "src",}, "objstorage_dst": {"cls": "mocked", "name": "dst",}, } @pytest.fixture def storage(): """An swh-storage object that gets injected into the CLI functions.""" storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]} storage = get_storage(**storage_config) with patch("swh.journal.cli.get_storage") as get_storage_mock: get_storage_mock.return_value = storage yield storage @pytest.fixture def monkeypatch_retry_sleep(monkeypatch): from swh.journal.replay import copy_object, obj_in_objstorage monkeypatch.setattr(copy_object.retry, "sleep", lambda x: None) monkeypatch.setattr(obj_in_objstorage.retry, "sleep", lambda x: None) def invoke(*args, env=None, journal_config=None): config = copy.deepcopy(CLI_CONFIG) if journal_config: - config["journal"] = journal_config + config["journal_client"] = journal_config.copy() + config["journal_client"]["cls"] = "kafka" runner = CliRunner() with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: yaml.dump(config, config_fd) config_fd.seek(0) args = ["-C" + config_fd.name] + list(args) return runner.invoke(cli, args, obj={"log_level": logging.DEBUG}, env=env,) +def test_get_journal_client_config_bwcompat(kafka_server): + cfg = { + "journal": { + "brokers": [kafka_server], + "group_id": "toto", + "prefix": "xiferp", + "object_types": ["content"], + "batch_size": 50, + } + } + ctx = MagicMock(obj={"config": cfg}) + with pytest.deprecated_call(): + client = get_journal_client(ctx, stop_after_objects=10, prefix="prefix") + assert client.subscription == ["prefix.content"] + assert client.stop_after_objects == 10 + assert client.batch_size == 50 + + +def test_get_journal_client_config(kafka_server): + cfg = { + "journal_client": { + "cls": "kafka", + "brokers": [kafka_server], + "group_id": "toto", + "prefix": "xiferp", + "object_types": ["content"], + "batch_size": 50, + } + } + ctx = MagicMock(obj={"config": cfg}) + client = get_journal_client(ctx, stop_after_objects=10, prefix="prefix") + assert client.subscription == ["prefix.content"] + assert client.stop_after_objects == 10 + assert client.batch_size == 50 + + def test_replay( - storage, - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], + storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, ): - (_, kafka_port) = kafka_server kafka_prefix += ".swh.journal.objects" producer = Producer( { - "bootstrap.servers": "localhost:{}".format(kafka_port), + "bootstrap.servers": kafka_server, "client.id": "test-producer", "acks": "all", } ) snapshot = { "id": b"foo", "branches": {b"HEAD": {"target_type": "revision", "target": b"\x01" * 20,}}, } # type: Dict[str, Any] producer.produce( topic=kafka_prefix + ".snapshot", key=key_to_kafka(snapshot["id"]), value=value_to_kafka(snapshot), ) producer.flush() logger.debug("Flushed producer") result = invoke( "replay", "--stop-after-objects", "1", journal_config={ - "brokers": ["127.0.0.1:%d" % kafka_port], + "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output assert storage.snapshot_get(snapshot["id"]) == {**snapshot, "next_branch": None} def _patch_objstorages(names): objstorages = {name: InMemoryObjStorage() for name in names} def get_mock_objstorage(cls, **args): assert cls == "mocked", cls return objstorages[args["name"]] def decorator(f): @functools.wraps(f) @patch("swh.journal.cli.get_objstorage") def newf(get_objstorage_mock, *args, **kwargs): get_objstorage_mock.side_effect = get_mock_objstorage f(*args, objstorages=objstorages, **kwargs) return newf return decorator NUM_CONTENTS = 10 -def _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages): +def _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages): producer = Producer( { - "bootstrap.servers": "127.0.0.1:{}".format(kafka_port), + "bootstrap.servers": kafka_server, "client.id": "test-producer", "acks": "all", } ) contents = {} for i in range(NUM_CONTENTS): content = b"\x00" * 19 + bytes([i]) sha1 = objstorages["src"].add(content) contents[sha1] = content producer.produce( topic=kafka_prefix + ".content", key=key_to_kafka(sha1), value=key_to_kafka({"sha1": sha1, "status": "visible",}), ) producer.flush() return contents @_patch_objstorages(["src", "dst"]) def test_replay_content( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], + kafka_server: str, ): - (_, kafka_port) = kafka_server kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) result = invoke( "content-replay", "--stop-after-objects", str(NUM_CONTENTS), journal_config={ - "brokers": ["127.0.0.1:%d" % kafka_port], + "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content @_patch_objstorages(["src", "dst"]) def test_replay_content_structured_log( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], + kafka_server: str, caplog, ): - (_, kafka_port) = kafka_server kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) caplog.set_level(logging.DEBUG, "swh.journal.replay") expected_obj_ids = set(hash_to_hex(sha1) for sha1 in contents) result = invoke( "content-replay", "--stop-after-objects", str(NUM_CONTENTS), journal_config={ - "brokers": ["127.0.0.1:%d" % kafka_port], + "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = set() for record in caplog.records: logtext = record.getMessage() if "copied" in logtext: copied.add(record.args["obj_id"]) assert ( copied == expected_obj_ids ), "Mismatched logging; see captured log output for details." @_patch_objstorages(["src", "dst"]) def test_replay_content_static_group_id( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], + kafka_server: str, caplog, ): - (_, kafka_port) = kafka_server kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) # Setup log capture to fish the consumer settings out of the log messages caplog.set_level(logging.DEBUG, "swh.journal.client") result = invoke( "content-replay", "--stop-after-objects", str(NUM_CONTENTS), env={"KAFKA_GROUP_INSTANCE_ID": "static-group-instance-id"}, journal_config={ - "brokers": ["127.0.0.1:%d" % kafka_port], + "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output consumer_settings = None for record in caplog.records: if "Consumer settings" in record.message: consumer_settings = record.args break assert consumer_settings is not None, ( "Failed to get consumer settings out of the consumer log. " "See log capture for details." ) assert consumer_settings["group.instance.id"] == "static-group-instance-id" assert consumer_settings["session.timeout.ms"] == 60 * 10 * 1000 assert consumer_settings["max.poll.interval.ms"] == 90 * 10 * 1000 for (sha1, content) in contents.items(): assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content @_patch_objstorages(["src", "dst"]) def test_replay_content_exclude( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], + kafka_server: str, ): - (_, kafka_port) = kafka_server kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) excluded_contents = list(contents)[0::2] # picking half of them with tempfile.NamedTemporaryFile(mode="w+b") as fd: fd.write(b"".join(sorted(excluded_contents))) fd.seek(0) result = invoke( "content-replay", "--stop-after-objects", str(NUM_CONTENTS), "--exclude-sha1-file", fd.name, journal_config={ - "brokers": ["127.0.0.1:%d" % kafka_port], + "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): if sha1 in excluded_contents: assert sha1 not in objstorages["dst"], sha1 else: assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content NUM_CONTENTS_DST = 5 @_patch_objstorages(["src", "dst"]) @pytest.mark.parametrize( "check_dst,expected_copied,expected_in_dst", [ (True, NUM_CONTENTS - NUM_CONTENTS_DST, NUM_CONTENTS_DST), (False, NUM_CONTENTS, 0), ], ) def test_replay_content_check_dst( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], + kafka_server: str, check_dst: bool, expected_copied: int, expected_in_dst: int, caplog, ): - (_, kafka_port) = kafka_server kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) for i, (sha1, content) in enumerate(contents.items()): if i >= NUM_CONTENTS_DST: break objstorages["dst"].add(content, obj_id=sha1) caplog.set_level(logging.DEBUG, "swh.journal.replay") result = invoke( "content-replay", "--stop-after-objects", str(NUM_CONTENTS), "--check-dst" if check_dst else "--no-check-dst", journal_config={ - "brokers": ["127.0.0.1:%d" % kafka_port], + "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = 0 in_dst = 0 for record in caplog.records: logtext = record.getMessage() if "copied" in logtext: copied += 1 elif "in dst" in logtext: in_dst += 1 assert ( copied == expected_copied and in_dst == expected_in_dst ), "Unexpected amount of objects copied, see the captured log for details" for (sha1, content) in contents.items(): assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content class FlakyObjStorage(InMemoryObjStorage): def __init__(self, *args, **kwargs): state = kwargs.pop("state") self.failures_left = Counter(kwargs.pop("failures")) super().__init__(*args, **kwargs) if state: self.state = state def flaky_operation(self, op, obj_id): if self.failures_left[op, obj_id] > 0: self.failures_left[op, obj_id] -= 1 raise RuntimeError("Failed %s on %s" % (op, hash_to_hex(obj_id))) def get(self, obj_id): self.flaky_operation("get", obj_id) return super().get(obj_id) def add(self, data, obj_id=None, check_presence=True): self.flaky_operation("add", obj_id) return super().add(data, obj_id=obj_id, check_presence=check_presence) def __contains__(self, obj_id): self.flaky_operation("in", obj_id) return super().__contains__(obj_id) @_patch_objstorages(["src", "dst"]) def test_replay_content_check_dst_retry( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], + kafka_server: str, monkeypatch_retry_sleep, ): - (_, kafka_port) = kafka_server kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) failures = {} for i, (sha1, content) in enumerate(contents.items()): if i >= NUM_CONTENTS_DST: break objstorages["dst"].add(content, obj_id=sha1) failures["in", sha1] = 1 orig_dst = objstorages["dst"] objstorages["dst"] = FlakyObjStorage(state=orig_dst.state, failures=failures) result = invoke( "content-replay", "--stop-after-objects", str(NUM_CONTENTS), "--check-dst", journal_config={ - "brokers": ["127.0.0.1:%d" % kafka_port], + "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content @_patch_objstorages(["src", "dst"]) def test_replay_content_failed_copy_retry( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], + kafka_server: str, caplog, monkeypatch_retry_sleep, ): - (_, kafka_port) = kafka_server kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) add_failures = {} get_failures = {} definitely_failed = set() # We want to generate operations failing 1 to CONTENT_REPLAY_RETRIES times. # We generate failures for 2 different operations, get and add. num_retry_contents = 2 * CONTENT_REPLAY_RETRIES assert ( num_retry_contents < NUM_CONTENTS ), "Need to generate more test contents to properly test retry behavior" for i, sha1 in enumerate(contents): if i >= num_retry_contents: break # This generates a number of failures, up to CONTENT_REPLAY_RETRIES num_failures = (i % CONTENT_REPLAY_RETRIES) + 1 # This generates failures of add for the first CONTENT_REPLAY_RETRIES # objects, then failures of get. if i < CONTENT_REPLAY_RETRIES: add_failures["add", sha1] = num_failures else: get_failures["get", sha1] = num_failures # Only contents that have CONTENT_REPLAY_RETRIES or more are # definitely failing if num_failures >= CONTENT_REPLAY_RETRIES: definitely_failed.add(hash_to_hex(sha1)) objstorages["dst"] = FlakyObjStorage( state=objstorages["dst"].state, failures=add_failures, ) objstorages["src"] = FlakyObjStorage( state=objstorages["src"].state, failures=get_failures, ) caplog.set_level(logging.DEBUG, "swh.journal.replay") result = invoke( "content-replay", "--stop-after-objects", str(NUM_CONTENTS), journal_config={ - "brokers": ["127.0.0.1:%d" % kafka_port], + "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = 0 actually_failed = set() for record in caplog.records: logtext = record.getMessage() if "copied" in logtext: copied += 1 elif "Failed operation" in logtext: assert record.levelno == logging.ERROR assert record.args["retries"] == CONTENT_REPLAY_RETRIES actually_failed.add(record.args["obj_id"]) assert ( actually_failed == definitely_failed ), "Unexpected object copy failures; see captured log for details" for (sha1, content) in contents.items(): if hash_to_hex(sha1) in definitely_failed: assert sha1 not in objstorages["dst"] continue assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content @_patch_objstorages(["src", "dst"]) def test_replay_content_objnotfound( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], + kafka_server: str, caplog, ): - (_, kafka_port) = kafka_server kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) num_contents_deleted = 5 contents_deleted = set() for i, sha1 in enumerate(contents): if i >= num_contents_deleted: break del objstorages["src"].state[sha1] contents_deleted.add(hash_to_hex(sha1)) caplog.set_level(logging.DEBUG, "swh.journal.replay") result = invoke( "content-replay", "--stop-after-objects", str(NUM_CONTENTS), journal_config={ - "brokers": ["127.0.0.1:%d" % kafka_port], + "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = 0 not_in_src = set() for record in caplog.records: logtext = record.getMessage() if "copied" in logtext: copied += 1 elif "object not found" in logtext: # Check that the object id can be recovered from logs assert record.levelno == logging.ERROR not_in_src.add(record.args["obj_id"]) assert ( copied == NUM_CONTENTS - num_contents_deleted ), "Unexpected number of contents copied" assert ( not_in_src == contents_deleted ), "Mismatch between deleted contents and not_in_src logs" for (sha1, content) in contents.items(): if sha1 not in objstorages["src"]: continue assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py index f0bd3cd..76d190c 100644 --- a/swh/journal/tests/test_client.py +++ b/swh/journal/tests/test_client.py @@ -1,144 +1,137 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from subprocess import Popen -from typing import Dict, List, Tuple +from typing import Dict, List from unittest.mock import MagicMock from confluent_kafka import Producer import pytest from swh.model.hypothesis_strategies import revisions from swh.model.model import Content from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka -def test_client( - kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int] -): - (_, port) = kafka_server +def test_client(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): kafka_prefix += ".swh.journal.objects" producer = Producer( { - "bootstrap.servers": "localhost:{}".format(port), + "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) rev = revisions().example() # Fill Kafka producer.produce( topic=kafka_prefix + ".revision", key=key_to_kafka(rev.id), value=value_to_kafka(rev.to_dict()), ) producer.flush() client = JournalClient( - brokers="localhost:%d" % kafka_server[1], + brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=1, ) worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({"revision": [rev.to_dict()]}) -def test_client_eof( - kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int] -): - (_, port) = kafka_server +def test_client_eof(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): kafka_prefix += ".swh.journal.objects" producer = Producer( { - "bootstrap.servers": "localhost:{}".format(port), + "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) rev = revisions().example() # Fill Kafka producer.produce( topic=kafka_prefix + ".revision", key=key_to_kafka(rev.id), value=value_to_kafka(rev.to_dict()), ) producer.flush() client = JournalClient( - brokers="localhost:%d" % kafka_server[1], + brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=None, stop_on_eof=True, ) worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({"revision": [rev.to_dict()]}) @pytest.mark.parametrize("batch_size", [1, 5, 100]) def test_client_batch_size( - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], - batch_size: int, + kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, batch_size: int, ): - (_, port) = kafka_server kafka_prefix += ".swh.journal.objects" num_objects = 2 * batch_size + 1 assert num_objects < 256, "Too many objects, generation will fail" producer = Producer( { - "bootstrap.servers": "localhost:{}".format(port), + "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) contents = [Content.from_data(bytes([i])) for i in range(num_objects)] # Fill Kafka for content in contents: producer.produce( topic=kafka_prefix + ".content", key=key_to_kafka(content.sha1), value=value_to_kafka(content.to_dict()), ) producer.flush() client = JournalClient( - brokers=["localhost:%d" % kafka_server[1]], + brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=num_objects, batch_size=batch_size, ) collected_output: List[Dict] = [] def worker_fn(objects): received = objects["content"] assert len(received) <= batch_size collected_output.extend(received) client.process(worker_fn) - assert collected_output == [content.to_dict() for content in contents] + expected_output = [content.to_dict() for content in contents] + assert len(collected_output) == len(expected_output) + + for output in collected_output: + assert output in expected_output diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py index e6c7025..fa5e7d3 100644 --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -1,314 +1,151 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from collections import defaultdict - -from confluent_kafka import Consumer, Producer, KafkaException - import pytest -from subprocess import Popen -from typing import List, Tuple +from confluent_kafka import Consumer, Producer from swh.storage import get_storage -from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value -from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError - -from swh.model.model import Directory, DirectoryEntry, Origin, OriginVisit - -from .conftest import TEST_OBJECTS, TEST_OBJECT_DICTS - - -def consume_messages(consumer, kafka_prefix, expected_messages): - """Consume expected_messages from the consumer; - Sort them all into a consumed_objects dict""" - consumed_messages = defaultdict(list) - - fetched_messages = 0 - retries_left = 1000 - - while fetched_messages < expected_messages: - if retries_left == 0: - raise ValueError("Timed out fetching messages from kafka") - - msg = consumer.poll(timeout=0.01) - - if not msg: - retries_left -= 1 - continue - - error = msg.error() - if error is not None: - if error.fatal(): - raise KafkaException(error) - retries_left -= 1 - continue - - fetched_messages += 1 - topic = msg.topic() - assert topic.startswith(kafka_prefix + "."), "Unexpected topic" - object_type = topic[len(kafka_prefix + ".") :] - - consumed_messages[object_type].append( - (kafka_to_key(msg.key()), kafka_to_value(msg.value())) - ) +from swh.model.model import Directory, Origin, OriginVisit - return consumed_messages - - -def assert_all_objects_consumed(consumed_messages): - """Check whether all objects from TEST_OBJECT_DICTS have been consumed""" - for object_type, known_values in TEST_OBJECT_DICTS.items(): - known_keys = [object_key(object_type, obj) for obj in TEST_OBJECTS[object_type]] - - (received_keys, received_values) = zip(*consumed_messages[object_type]) - - if object_type == "origin_visit": - for value in received_values: - del value["visit"] - elif object_type == "content": - for value in received_values: - del value["ctime"] - - for key in known_keys: - assert key in received_keys - - for value in known_values: - assert value in received_values +from swh.journal.tests.journal_data import TEST_OBJECTS +from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed +from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError -def test_kafka_writer( - kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer -): +def test_kafka_writer(kafka_prefix: str, kafka_server: str, consumer: Consumer): kafka_prefix += ".swh.journal.objects" writer = KafkaJournalWriter( - brokers=[f"localhost:{kafka_server[1]}"], - client_id="kafka_writer", - prefix=kafka_prefix, + brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, ) expected_messages = 0 for object_type, objects in TEST_OBJECTS.items(): writer.write_additions(object_type, objects) expected_messages += len(objects) consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages) -def test_storage_direct_writer( - kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer -): +def test_storage_direct_writer(kafka_prefix: str, kafka_server, consumer: Consumer): kafka_prefix += ".swh.journal.objects" writer_config = { "cls": "kafka", - "brokers": ["localhost:%d" % kafka_server[1]], + "brokers": [kafka_server], "client_id": "kafka_writer", "prefix": kafka_prefix, } storage_config = { "cls": "pipeline", "steps": [{"cls": "memory", "journal_writer": writer_config},], } storage = get_storage(**storage_config) expected_messages = 0 for object_type, objects in TEST_OBJECTS.items(): method = getattr(storage, object_type + "_add") if object_type in ( "content", "directory", "revision", "release", "snapshot", "origin", ): method(objects) expected_messages += len(objects) elif object_type in ("origin_visit",): for obj in objects: assert isinstance(obj, OriginVisit) storage.origin_add_one(Origin(url=obj.origin)) visit = method(obj.origin, date=obj.date, type=obj.type) expected_messages += 1 obj_d = obj.to_dict() for k in ("visit", "origin", "date", "type"): del obj_d[k] storage.origin_visit_update(obj.origin, visit.visit, **obj_d) expected_messages += 1 else: assert False, object_type consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages) -@pytest.fixture(scope="session") -def large_directories() -> List[Directory]: - dir_sizes = [1 << n for n in range(21)] # 2**0 = 1 to 2**20 = 1024 * 1024 - - dir_entries = [ - DirectoryEntry( - name=("%09d" % i).encode(), - type="file", - perms=0o100644, - target=b"\x00" * 20, - ) - for i in range(max(dir_sizes)) - ] - - return [Directory(entries=dir_entries[:size]) for size in dir_sizes] - - -def test_write_large_objects( - kafka_prefix: str, - kafka_server: Tuple[Popen, int], - consumer: Consumer, - large_directories: List[Directory], +def test_write_delivery_failure( + kafka_prefix: str, kafka_server: str, consumer: Consumer ): - kafka_prefix += ".swh.journal.objects" - - # Needed as there is no directories in TEST_OBJECT_DICTS, the consumer - # isn't autosubscribed to directories. - consumer.subscribe([kafka_prefix + ".directory"]) - - writer = KafkaJournalWriter( - brokers=["localhost:%d" % kafka_server[1]], - client_id="kafka_writer", - prefix=kafka_prefix, - ) + class MockKafkaError: + """A mocked kafka error""" - writer.write_additions("directory", large_directories) - - consumed_messages = consume_messages(consumer, kafka_prefix, len(large_directories)) - - for dir, message in zip(large_directories, consumed_messages["directory"]): - (dir_id, consumed_dir) = message - assert dir_id == dir.id - assert consumed_dir == dir.to_dict() - - -def dir_message_size(directory: Directory) -> int: - """Estimate the size of a directory kafka message. - - We could just do it with `len(value_to_kafka(directory.to_dict()))`, - but the serialization is a substantial chunk of the test time here. - - """ - n_entries = len(directory.entries) - return ( - # fmt: off - 0 - + 1 # header of a 2-element fixmap - + 1 + 2 # fixstr("id") - + 2 + 20 # bin8(directory.id of length 20) - + 1 + 7 # fixstr("entries") - + 4 # array header - + n_entries - * ( - 0 - + 1 # header of a 4-element fixmap - + 1 + 6 # fixstr("target") - + 2 + 20 # bin8(target of length 20) - + 1 + 4 # fixstr("name") - + 2 + 9 # bin8(name of length 9) - + 1 + 5 # fixstr("perms") - + 5 # uint32(perms) - + 1 + 4 # fixstr("type") - + 1 + 3 # fixstr(type) - ) - # fmt: on - ) + def str(self): + return "Mocked Kafka Error" + def name(self): + return "SWH_MOCK_ERROR" -SMALL_MESSAGE_SIZE = 1024 * 1024 + class KafkaJournalWriterFailDelivery(KafkaJournalWriter): + """A journal writer which always fails delivering messages""" + def _on_delivery(self, error, message): + """Replace the inbound error with a fake delivery error""" + super()._on_delivery(MockKafkaError(), message) -@pytest.mark.parametrize( - "kafka_server_config_overrides", [{"message.max.bytes": str(SMALL_MESSAGE_SIZE)}] -) -def test_fail_write_large_objects( - kafka_prefix: str, - kafka_server: Tuple[Popen, int], - consumer: Consumer, - large_directories: List[Directory], -): kafka_prefix += ".swh.journal.objects" - - # Needed as there is no directories in TEST_OBJECT_DICTS, the consumer - # isn't autosubscribed to directories. - consumer.subscribe([kafka_prefix + ".directory"]) - - writer = KafkaJournalWriter( - brokers=["localhost:%d" % kafka_server[1]], - client_id="kafka_writer", - prefix=kafka_prefix, + writer = KafkaJournalWriterFailDelivery( + brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, ) - expected_dirs = [] - - for directory in large_directories: - if dir_message_size(directory) < SMALL_MESSAGE_SIZE: - # No error; write anyway, but continue - writer.write_addition("directory", directory) - expected_dirs.append(directory) - continue - - with pytest.raises(KafkaDeliveryError) as exc: - writer.write_addition("directory", directory) - - assert "Failed deliveries" in exc.value.message - assert len(exc.value.delivery_failures) == 1 - - object_type, key, msg, code = exc.value.delivery_failures[0] - - assert object_type == "directory" - assert key == directory.id - assert code == "MSG_SIZE_TOO_LARGE" - - consumed_messages = consume_messages(consumer, kafka_prefix, len(expected_dirs)) + empty_dir = Directory(entries=[]) + with pytest.raises(KafkaDeliveryError) as exc: + writer.write_addition("directory", empty_dir) - for dir, message in zip(expected_dirs, consumed_messages["directory"]): - (dir_id, consumed_dir) = message - assert dir_id == dir.id - assert consumed_dir == dir.to_dict() + assert "Failed deliveries" in exc.value.message + assert len(exc.value.delivery_failures) == 1 + delivery_failure = exc.value.delivery_failures[0] + assert delivery_failure.key == empty_dir.id + assert delivery_failure.code == "SWH_MOCK_ERROR" def test_write_delivery_timeout( - kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer + kafka_prefix: str, kafka_server: str, consumer: Consumer ): produced = [] class MockProducer(Producer): + """A kafka producer which pretends to produce messages, but never sends any + delivery acknowledgements""" + def produce(self, **kwargs): produced.append(kwargs) kafka_prefix += ".swh.journal.objects" writer = KafkaJournalWriter( - brokers=["localhost:%d" % kafka_server[1]], + brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, flush_timeout=1, producer_class=MockProducer, ) empty_dir = Directory(entries=[]) with pytest.raises(KafkaDeliveryError) as exc: writer.write_addition("directory", empty_dir) assert len(produced) == 1 assert "timeout" in exc.value.message assert len(exc.value.delivery_failures) == 1 delivery_failure = exc.value.delivery_failures[0] assert delivery_failure.key == empty_dir.id assert delivery_failure.code == "SWH_FLUSH_TIMEOUT" diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py index 5925049..796236d 100644 --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -1,414 +1,405 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import functools import logging import random -from subprocess import Popen -from typing import Dict, List, Tuple +from typing import Dict, List import dateutil import pytest from confluent_kafka import Producer from hypothesis import strategies, given, settings from swh.storage import get_storage from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.journal.replay import process_replay_objects, is_hash_in_bytearray from swh.model.hashutil import hash_to_hex from swh.model.model import Content from .conftest import TEST_OBJECT_DICTS, DUPLICATE_CONTENTS from .utils import MockedJournalClient, MockedKafkaWriter storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]} def make_topic(kafka_prefix: str, object_type: str) -> str: return kafka_prefix + "." + object_type def test_storage_play( - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], - caplog, + kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, ): """Optimal replayer scenario. This: - writes objects to the topic - replayer consumes objects from the topic and replay them """ - (_, port) = kafka_server kafka_prefix += ".swh.journal.objects" storage = get_storage(**storage_config) producer = Producer( { - "bootstrap.servers": "localhost:{}".format(port), + "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) now = datetime.datetime.now(tz=datetime.timezone.utc) # Fill Kafka nb_sent = 0 nb_visits = 0 for object_type, objects in TEST_OBJECT_DICTS.items(): topic = make_topic(kafka_prefix, object_type) for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() if object_type == "content": object_["ctime"] = now elif object_type == "origin_visit": nb_visits += 1 object_["visit"] = nb_visits producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), ) nb_sent += 1 producer.flush() caplog.set_level(logging.ERROR, "swh.journal.replay") # Fill the storage from Kafka replayer = JournalClient( - brokers="localhost:%d" % kafka_server[1], + brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=nb_sent, ) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_inserted = 0 while nb_inserted < nb_sent: nb_inserted += replayer.process(worker_fn) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage assert TEST_OBJECT_DICTS["revision"] == list( storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"]]) ) assert TEST_OBJECT_DICTS["release"] == list( storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"]]) ) origins = list(storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"]])) assert TEST_OBJECT_DICTS["origin"] == [{"url": orig["url"]} for orig in origins] for origin in origins: origin_url = origin["url"] expected_visits = [ { **visit, "origin": origin_url, "date": dateutil.parser.parse(visit["date"]), } for visit in TEST_OBJECT_DICTS["origin_visit"] if visit["origin"] == origin["url"] ] actual_visits = list(storage.origin_visit_get(origin_url)) for visit in actual_visits: del visit["visit"] # opaque identifier assert expected_visits == actual_visits input_contents = TEST_OBJECT_DICTS["content"] contents = storage.content_get_metadata([cont["sha1"] for cont in input_contents]) assert len(contents) == len(input_contents) assert contents == {cont["sha1"]: [cont] for cont in input_contents} collision = 0 for record in caplog.records: logtext = record.getMessage() if "Colliding contents:" in logtext: collision += 1 assert collision == 0, "No collision should be detected" def test_storage_play_with_collision( - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], - caplog, + kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, ): """Another replayer scenario with collisions. This: - writes objects to the topic, including colliding contents - replayer consumes objects from the topic and replay them - This drops the colliding contents from the replay when detected """ - (_, port) = kafka_server kafka_prefix += ".swh.journal.objects" storage = get_storage(**storage_config) producer = Producer( { - "bootstrap.servers": "localhost:{}".format(port), + "bootstrap.servers": kafka_server, "client.id": "test producer", "enable.idempotence": "true", } ) now = datetime.datetime.now(tz=datetime.timezone.utc) # Fill Kafka nb_sent = 0 nb_visits = 0 for object_type, objects in TEST_OBJECT_DICTS.items(): topic = make_topic(kafka_prefix, object_type) for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() if object_type == "content": object_["ctime"] = now elif object_type == "origin_visit": nb_visits += 1 object_["visit"] = nb_visits producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), ) nb_sent += 1 # Create collision in input data # They are not written in the destination for content in DUPLICATE_CONTENTS: topic = make_topic(kafka_prefix, "content") producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(content), ) nb_sent += 1 producer.flush() caplog.set_level(logging.ERROR, "swh.journal.replay") # Fill the storage from Kafka replayer = JournalClient( - brokers="localhost:%d" % kafka_server[1], + brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=nb_sent, ) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_inserted = 0 while nb_inserted < nb_sent: nb_inserted += replayer.process(worker_fn) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage assert TEST_OBJECT_DICTS["revision"] == list( storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"]]) ) assert TEST_OBJECT_DICTS["release"] == list( storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"]]) ) origins = list(storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"]])) assert TEST_OBJECT_DICTS["origin"] == [{"url": orig["url"]} for orig in origins] for origin in origins: origin_url = origin["url"] expected_visits = [ { **visit, "origin": origin_url, "date": dateutil.parser.parse(visit["date"]), } for visit in TEST_OBJECT_DICTS["origin_visit"] if visit["origin"] == origin["url"] ] actual_visits = list(storage.origin_visit_get(origin_url)) for visit in actual_visits: del visit["visit"] # opaque identifier assert expected_visits == actual_visits input_contents = TEST_OBJECT_DICTS["content"] contents = storage.content_get_metadata([cont["sha1"] for cont in input_contents]) assert len(contents) == len(input_contents) assert contents == {cont["sha1"]: [cont] for cont in input_contents} nb_collisions = 0 actual_collision: Dict for record in caplog.records: logtext = record.getMessage() if "Collision detected:" in logtext: nb_collisions += 1 actual_collision = record.args["collision"] assert nb_collisions == 1, "1 collision should be detected" algo = "sha1" assert actual_collision["algo"] == algo expected_colliding_hash = hash_to_hex(DUPLICATE_CONTENTS[0][algo]) assert actual_collision["hash"] == expected_colliding_hash actual_colliding_hashes = actual_collision["objects"] assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS) for content in DUPLICATE_CONTENTS: expected_content_hashes = { k: hash_to_hex(v) for k, v in Content.from_dict(content).hashes().items() } assert expected_content_hashes in actual_colliding_hashes def _test_write_replay_origin_visit(visits: List[Dict]): """Helper function to write tests for origin_visit. Each visit (a dict) given in the 'visits' argument will be sent to a (mocked) kafka queue, which a in-memory-storage backed replayer is listening to. Check that corresponding origin visits entities are present in the storage and have correct values if they are not skipped. """ queue: List = [] replayer = MockedJournalClient(queue) writer = MockedKafkaWriter(queue) # Note that flipping the order of these two insertions will crash # the test, because the legacy origin_format does not allow to create # the origin when needed (type is missing) writer.send( "origin", "foo", { "url": "http://example.com/", "type": "git", # test the legacy origin format is accepted }, ) for visit in visits: writer.send("origin_visit", "foo", visit) queue_size = len(queue) assert replayer.stop_after_objects is None replayer.stop_after_objects = queue_size storage = get_storage(**storage_config) worker_fn = functools.partial(process_replay_objects, storage=storage) replayer.process(worker_fn) actual_visits = list(storage.origin_visit_get("http://example.com/")) assert len(actual_visits) == len(visits), actual_visits for vin, vout in zip(visits, actual_visits): vin = vin.copy() vout = vout.copy() assert vout.pop("origin") == "http://example.com/" vin.pop("origin") vin.setdefault("type", "git") vin.setdefault("metadata", None) assert vin == vout def test_write_replay_origin_visit(): """Test origin_visit when the 'origin' is just a string.""" now = datetime.datetime.now() visits = [ { "visit": 1, "origin": "http://example.com/", "date": now, "type": "git", "status": "partial", "snapshot": None, } ] _test_write_replay_origin_visit(visits) def test_write_replay_legacy_origin_visit1(): """Origin_visit with no types should make the replayer crash We expect the journal's origin_visit topic to no longer reference such visits. If it does, the replayer must crash so we can fix the journal's topic. """ now = datetime.datetime.now() visit = { "visit": 1, "origin": "http://example.com/", "date": now, "status": "partial", "snapshot": None, } now2 = datetime.datetime.now() visit2 = { "visit": 2, "origin": {"url": "http://example.com/"}, "date": now2, "status": "partial", "snapshot": None, } for origin_visit in [visit, visit2]: with pytest.raises(ValueError, match="Old origin visit format"): _test_write_replay_origin_visit([origin_visit]) def test_write_replay_legacy_origin_visit2(): """Test origin_visit when 'type' is missing from the visit, but not from the origin.""" now = datetime.datetime.now() visits = [ { "visit": 1, "origin": {"url": "http://example.com/", "type": "git",}, "date": now, "type": "git", "status": "partial", "snapshot": None, } ] _test_write_replay_origin_visit(visits) def test_write_replay_legacy_origin_visit3(): """Test origin_visit when the origin is a dict""" now = datetime.datetime.now() visits = [ { "visit": 1, "origin": {"url": "http://example.com/",}, "date": now, "type": "git", "status": "partial", "snapshot": None, } ] _test_write_replay_origin_visit(visits) hash_strategy = strategies.binary(min_size=20, max_size=20) @settings(max_examples=500) @given( strategies.sets(hash_strategy, min_size=0, max_size=500), strategies.sets(hash_strategy, min_size=10), ) def test_is_hash_in_bytearray(haystack, needles): array = b"".join(sorted(haystack)) needles |= haystack # Exhaustively test for all objects in the array for needle in needles: assert is_hash_in_bytearray(needle, array, len(haystack)) == ( needle in haystack ) diff --git a/version.txt b/version.txt index 997ae4e..d290898 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.30-0-gb53d7d7 \ No newline at end of file +v0.0.31-0-gdc96335 \ No newline at end of file