diff --git a/.gitignore b/.gitignore --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,4 @@ dist/ version.txt .mypy_cache/ +.hypothesis/ diff --git a/docs/index.rst b/docs/index.rst --- a/docs/index.rst +++ b/docs/index.rst @@ -1,19 +1,20 @@ -.. _swh-py-template: +.. _swh-objstorage-replayer: -Software Heritage - Python module template -========================================== +Software Heritage - Object storage replayer +=========================================== -Python module template, used as skeleton to create new modules. +This Python module provides a command line tool to replicate content objects from a +source Object storage to a destination one by listening the `content` topic of a +`swh.journal` kafka stream. +It is meant to be used as the brick of a mirror setup dedicated to replicating content +objects. -.. toctree:: - :maxdepth: 2 - :caption: Contents: +Reference Documentation +----------------------- -Indices and tables -================== +.. toctree:: + :maxdepth: 2 -* :ref:`genindex` -* :ref:`modindex` -* :ref:`search` + /apidoc/swh.objstorage.replayer diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -5,11 +5,17 @@ # 3rd party libraries without stubs (yet) +[mypy-confluent_kafka.*] +ignore_missing_imports = True + [mypy-pkg_resources.*] ignore_missing_imports = True [mypy-pytest.*] ignore_missing_imports = True -# [mypy-add_your_lib_here.*] -# ignore_missing_imports = True +[mypy-systemd.daemon.*] +ignore_missing_imports = True + +[mypy-tenacity.*] +ignore_missing_imports = True diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,2 +1,4 @@ # Add here internal Software Heritage dependencies, one per line. -swh.core +swh.core[http] +swh.objstorage >= 0.0.43 +swh.journal >= 0.0.31 diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html -vcversioner diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -39,24 +39,24 @@ # Full sample: # https://forge.softwareheritage.org/diffusion/DCORE/browse/master/setup.py setup( - name="swh.", # example: swh.loader.pypi - description="Software Heritage ", + name="swh.objstorage.replayer", # example: swh.loader.pypi + description="Software Heritage content replayer", long_description=long_description, long_description_content_type="text/markdown", python_requires=">=3.7", author="Software Heritage developers", author_email="swh-devel@inria.fr", - url="https://forge.softwareheritage.org/diffusion/", + url="https://forge.softwareheritage.org/diffusion/swh-objstorage-replayer", packages=find_packages(), # packages's modules install_requires=parse_requirements() + parse_requirements("swh"), tests_require=parse_requirements("test"), - setup_requires=["vcversioner"], + setup_requires=["setuptools_scm"], extras_require={"testing": parse_requirements("test")}, - vcversioner={}, + use_scm_version=True, include_package_data=True, entry_points=""" [swh.cli.subcommands] - =swh..cli:cli + content-replayer=swh.objstorage.replayer.cli:cli """, classifiers=[ "Programming Language :: Python :: 3", diff --git a/swh/foo/bar.py b/swh/foo/bar.py deleted file mode 100644 --- a/swh/foo/bar.py +++ /dev/null @@ -1,4 +0,0 @@ -# Copyright (C) 2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information diff --git a/swh/foo/cli.py b/swh/foo/cli.py deleted file mode 100644 --- a/swh/foo/cli.py +++ /dev/null @@ -1,18 +0,0 @@ -import click - -from swh.core.cli import CONTEXT_SETTINGS - - -@click.group(name="foo", context_settings=CONTEXT_SETTINGS) -@click.pass_context -def cli(ctx): - """Foo main command. - """ - - -@cli.command() -@click.option("--bar", help="Something") -@click.pass_context -def bar(ctx, bar): - """Do something.""" - click.echo("bar") diff --git a/swh/objstorage/__init__.py b/swh/objstorage/__init__.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/__init__.py @@ -0,0 +1,7 @@ +from pkgutil import extend_path +from typing import Iterable + +__path__: Iterable[str] = extend_path(__path__, __name__) + +# for BW compat +from swh.objstorage.factory import * # noqa diff --git a/swh/foo/__init__.py b/swh/objstorage/replayer/__init__.py rename from swh/foo/__init__.py rename to swh/objstorage/replayer/__init__.py diff --git a/swh/objstorage/replayer/cli.py b/swh/objstorage/replayer/cli.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/replayer/cli.py @@ -0,0 +1,133 @@ +# Copyright (C) 2016-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import functools +import logging +import mmap + +import click + +try: + from systemd.daemon import notify +except ImportError: + notify = None + +from swh.model.model import SHA1_SIZE +from swh.journal.client import get_journal_client +from swh.objstorage import get_objstorage +from swh.objstorage.cli import cli + +from swh.objstorage.replayer.replay import is_hash_in_bytearray +from swh.objstorage.replayer.replay import process_replay_objects_content + + +@cli.command("replay") +@click.option( + "--stop-after-objects", + "-n", + default=None, + type=int, + help="Stop after processing this many objects. Default is to run forever.", +) +@click.option( + "--exclude-sha1-file", + default=None, + type=click.File("rb"), + help="File containing a sorted array of hashes to be excluded.", +) +@click.option( + "--check-dst/--no-check-dst", + default=True, + help="Check whether the destination contains the object before copying.", +) +@click.pass_context +def content_replay(ctx, stop_after_objects, exclude_sha1_file, check_dst): + """Fill a destination Object Storage using a journal stream. + + This is typically used for a mirror configuration, by reading a Journal + and retrieving objects from an existing source ObjStorage. + + There can be several 'replayers' filling a given ObjStorage as long as they + use the same ``group-id``. You can use the ``KAFKA_GROUP_INSTANCE_ID`` + environment variable to use KIP-345 static group membership. + + This service retrieves object ids to copy from the 'content' topic. It will + only copy object's content if the object's description in the kafka + nmessage has the status:visible set. + + ``--exclude-sha1-file`` may be used to exclude some hashes to speed-up the + replay in case many of the contents are already in the destination + objstorage. It must contain a concatenation of all (sha1) hashes, + and it must be sorted. + This file will not be fully loaded into memory at any given time, + so it can be arbitrarily large. + + ``--check-dst`` sets whether the replayer should check in the destination + ObjStorage before copying an object. You can turn that off if you know + you're copying to an empty ObjStorage. + """ + conf = ctx.obj["config"] + try: + objstorage_src = get_objstorage(**conf.pop("objstorage")) + except KeyError: + ctx.fail("You must have a source objstorage configured in " "your config file.") + try: + objstorage_dst = get_objstorage(**conf.pop("objstorage_dst")) + except KeyError: + ctx.fail( + "You must have a destination objstorage configured " "in your config file." + ) + + if exclude_sha1_file: + map_ = mmap.mmap(exclude_sha1_file.fileno(), 0, prot=mmap.PROT_READ) + if map_.size() % SHA1_SIZE != 0: + ctx.fail( + "--exclude-sha1 must link to a file whose size is an " + "exact multiple of %d bytes." % SHA1_SIZE + ) + nb_excluded_hashes = int(map_.size() / SHA1_SIZE) + + def exclude_fn(obj): + return is_hash_in_bytearray(obj["sha1"], map_, nb_excluded_hashes) + + else: + exclude_fn = None + + client = get_journal_client( + "kafka", + **conf["journal_client"], + stop_after_objects=stop_after_objects, + object_types=("content",), + ) + worker_fn = functools.partial( + process_replay_objects_content, + src=objstorage_src, + dst=objstorage_dst, + exclude_fn=exclude_fn, + check_dst=check_dst, + ) + + if notify: + notify("READY=1") + + try: + client.process(worker_fn) + except KeyboardInterrupt: + ctx.exit(0) + else: + print("Done.") + finally: + if notify: + notify("STOPPING=1") + client.close() + + +def main(): + logging.basicConfig() + return cli(auto_envvar_prefix="SWH_OBJSTORAGE") + + +if __name__ == "__main__": + main() diff --git a/swh/foo/py.typed b/swh/objstorage/replayer/py.typed rename from swh/foo/py.typed rename to swh/objstorage/replayer/py.typed diff --git a/swh/objstorage/replayer/replay.py b/swh/objstorage/replayer/replay.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/replayer/replay.py @@ -0,0 +1,303 @@ +# Copyright (C) 2019-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging +from time import time +from typing import Callable, Dict, List, Optional + +from sentry_sdk import capture_exception, push_scope + +try: + from systemd.daemon import notify +except ImportError: + notify = None + +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_random_exponential, +) + +from swh.core.statsd import statsd +from swh.model.hashutil import hash_to_hex +from swh.model.model import SHA1_SIZE + +from swh.objstorage.objstorage import ( + ID_HASH_ALGO, + ObjNotFoundError, + ObjStorage, +) + +logger = logging.getLogger(__name__) + +CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total" +CONTENT_RETRY_METRIC = "swh_content_replayer_retries_total" +CONTENT_BYTES_METRIC = "swh_content_replayer_bytes" +CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds" + + +def is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=SHA1_SIZE): + """ + Checks if the given hash is in the provided `array`. The array must be + a *sorted* list of sha1 hashes, and contain `nb_hashes` hashes + (so its size must by `nb_hashes*hash_size` bytes). + + Args: + hash_ (bytes): the hash to look for + array (bytes): a sorted concatenated array of hashes (may be of + any type supporting slice indexing, eg. :class:`mmap.mmap`) + nb_hashes (int): number of hashes in the array + hash_size (int): size of a hash (defaults to 20, for SHA1) + + Example: + + >>> import os + >>> hash1 = os.urandom(20) + >>> hash2 = os.urandom(20) + >>> hash3 = os.urandom(20) + >>> array = b''.join(sorted([hash1, hash2])) + >>> is_hash_in_bytearray(hash1, array, 2) + True + >>> is_hash_in_bytearray(hash2, array, 2) + True + >>> is_hash_in_bytearray(hash3, array, 2) + False + """ + if len(hash_) != hash_size: + raise ValueError("hash_ does not match the provided hash_size.") + + def get_hash(position): + return array[position * hash_size : (position + 1) * hash_size] + + # Regular dichotomy: + left = 0 + right = nb_hashes + while left < right - 1: + middle = int((right + left) / 2) + pivot = get_hash(middle) + if pivot == hash_: + return True + elif pivot < hash_: + left = middle + else: + right = middle + return get_hash(left) == hash_ + + +class ReplayError(Exception): + """An error occurred during the replay of an object""" + + def __init__(self, operation, *, obj_id, exc): + self.operation = operation + self.obj_id = hash_to_hex(obj_id) + self.exc = exc + + def __str__(self): + return "ReplayError(doing %s, %s, %s)" % (self.operation, self.obj_id, self.exc) + + +def log_replay_retry(retry_obj, sleep, last_result): + """Log a retry of the content replayer""" + exc = last_result.exception() + logger.debug( + "Retry operation %(operation)s on %(obj_id)s: %(exc)s", + {"operation": exc.operation, "obj_id": exc.obj_id, "exc": str(exc.exc)}, + ) + + statsd.increment( + CONTENT_RETRY_METRIC, + tags={ + "operation": exc.operation, + "attempt": str(retry_obj.statistics["attempt_number"]), + }, + ) + + +def log_replay_error(last_attempt): + """Log a replay error to sentry""" + exc = last_attempt.exception() + with push_scope() as scope: + scope.set_tag("operation", exc.operation) + scope.set_extra("obj_id", exc.obj_id) + capture_exception(exc.exc) + + logger.error( + "Failed operation %(operation)s on %(obj_id)s after %(retries)s" + " retries: %(exc)s", + { + "obj_id": exc.obj_id, + "operation": exc.operation, + "exc": str(exc.exc), + "retries": last_attempt.attempt_number, + }, + ) + + return None + + +CONTENT_REPLAY_RETRIES = 3 + +content_replay_retry = retry( + retry=retry_if_exception_type(ReplayError), + stop=stop_after_attempt(CONTENT_REPLAY_RETRIES), + wait=wait_random_exponential(multiplier=1, max=60), + before_sleep=log_replay_retry, + retry_error_callback=log_replay_error, +) + + +@content_replay_retry +def copy_object(obj_id, src, dst): + hex_obj_id = hash_to_hex(obj_id) + obj = "" + try: + with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "get"}): + obj = src.get(obj_id) + logger.debug("retrieved %(obj_id)s", {"obj_id": hex_obj_id}) + + with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "put"}): + dst.add(obj, obj_id=obj_id, check_presence=False) + logger.debug("copied %(obj_id)s", {"obj_id": hex_obj_id}) + statsd.increment(CONTENT_BYTES_METRIC, len(obj)) + except ObjNotFoundError: + logger.error( + "Failed to copy %(obj_id)s: object not found", {"obj_id": hex_obj_id} + ) + raise + except Exception as exc: + raise ReplayError("copy", obj_id=obj_id, exc=exc) from None + return len(obj) + + +@content_replay_retry +def obj_in_objstorage(obj_id, dst): + """Check if an object is already in an objstorage, tenaciously""" + try: + return obj_id in dst + except Exception as exc: + raise ReplayError("in_dst", obj_id=obj_id, exc=exc) from None + + +def process_replay_objects_content( + all_objects: Dict[str, List[dict]], + *, + src: ObjStorage, + dst: ObjStorage, + exclude_fn: Optional[Callable[[dict], bool]] = None, + check_dst: bool = True, +): + """ + Takes a list of records from Kafka (see + :py:func:`swh.journal.client.JournalClient.process`) and copies them + from the `src` objstorage to the `dst` objstorage, if: + + * `obj['status']` is `'visible'` + * `exclude_fn(obj)` is `False` (if `exclude_fn` is provided) + * `obj['sha1'] not in dst` (if `check_dst` is True) + + Args: + all_objects: Objects passed by the Kafka client. Most importantly, + `all_objects['content'][*]['sha1']` is the sha1 hash of each + content. + src: An object storage (see :py:func:`swh.objstorage.get_objstorage`) + dst: An object storage (see :py:func:`swh.objstorage.get_objstorage`) + exclude_fn: Determines whether an object should be copied. + check_dst: Determines whether we should check the destination + objstorage before copying. + + Example: + + >>> from swh.objstorage import get_objstorage + >>> src = get_objstorage('memory', {}) + >>> dst = get_objstorage('memory', {}) + >>> id1 = src.add(b'foo bar') + >>> id2 = src.add(b'baz qux') + >>> kafka_partitions = { + ... 'content': [ + ... { + ... 'sha1': id1, + ... 'status': 'visible', + ... }, + ... { + ... 'sha1': id2, + ... 'status': 'visible', + ... }, + ... ] + ... } + >>> process_replay_objects_content( + ... kafka_partitions, src=src, dst=dst, + ... exclude_fn=lambda obj: obj['sha1'] == id1) + >>> id1 in dst + False + >>> id2 in dst + True + """ + vol = [] + nb_skipped = 0 + nb_failures = 0 + t0 = time() + + for (object_type, objects) in all_objects.items(): + if object_type != "content": + logger.warning( + "Received a series of %s, this should not happen", object_type + ) + continue + for obj in objects: + obj_id = obj[ID_HASH_ALGO] + if obj["status"] != "visible": + nb_skipped += 1 + logger.debug( + "skipped %s (status=%s)", hash_to_hex(obj_id), obj["status"] + ) + statsd.increment( + CONTENT_OPERATIONS_METRIC, + tags={"decision": "skipped", "status": obj["status"]}, + ) + elif exclude_fn and exclude_fn(obj): + nb_skipped += 1 + logger.debug("skipped %s (manually excluded)", hash_to_hex(obj_id)) + statsd.increment( + CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"} + ) + elif check_dst and obj_in_objstorage(obj_id, dst): + nb_skipped += 1 + logger.debug("skipped %s (in dst)", hash_to_hex(obj_id)) + statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "in_dst"}) + else: + try: + copied = copy_object(obj_id, src, dst) + except ObjNotFoundError: + nb_skipped += 1 + statsd.increment( + CONTENT_OPERATIONS_METRIC, tags={"decision": "not_in_src"} + ) + else: + if copied is None: + nb_failures += 1 + statsd.increment( + CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"} + ) + else: + vol.append(copied) + statsd.increment( + CONTENT_OPERATIONS_METRIC, tags={"decision": "copied"} + ) + + dt = time() - t0 + logger.info( + "processed %s content objects in %.1fsec " + "(%.1f obj/sec, %.1fMB/sec) - %d failed - %d skipped", + len(vol), + dt, + len(vol) / dt, + sum(vol) / 1024 / 1024 / dt, + nb_failures, + nb_skipped, + ) + + if notify: + notify("WATCHDOG=1") diff --git a/swh/foo/tests/__init__.py b/swh/objstorage/replayer/tests/__init__.py rename from swh/foo/tests/__init__.py rename to swh/objstorage/replayer/tests/__init__.py diff --git a/swh/objstorage/replayer/tests/test_cli.py b/swh/objstorage/replayer/tests/test_cli.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/replayer/tests/test_cli.py @@ -0,0 +1,556 @@ +# Copyright (C) 2019-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from collections import Counter +import copy +import functools +import logging +import re +import tempfile +from subprocess import Popen +from typing import Tuple +from unittest.mock import patch + +from click.testing import CliRunner +from confluent_kafka import Producer +import pytest +import yaml + +from swh.journal.serializers import key_to_kafka +from swh.model.hashutil import hash_to_hex +from swh.objstorage.backends.in_memory import InMemoryObjStorage +from swh.objstorage.replayer.cli import cli +from swh.objstorage.replayer.replay import CONTENT_REPLAY_RETRIES + + +logger = logging.getLogger(__name__) + + +CLI_CONFIG = { + "objstorage": {"cls": "mocked", "name": "src",}, + "objstorage_dst": {"cls": "mocked", "name": "dst",}, +} + + +@pytest.fixture +def monkeypatch_retry_sleep(monkeypatch): + from swh.objstorage.replayer.replay import copy_object, obj_in_objstorage + + monkeypatch.setattr(copy_object.retry, "sleep", lambda x: None) + monkeypatch.setattr(obj_in_objstorage.retry, "sleep", lambda x: None) + + +def _patch_objstorages(names): + objstorages = {name: InMemoryObjStorage() for name in names} + + def get_mock_objstorage(cls, **args): + assert cls == "mocked", cls + return objstorages[args["name"]] + + def decorator(f): + @functools.wraps(f) + @patch("swh.objstorage.replayer.cli.get_objstorage") + def newf(get_objstorage_mock, *args, **kwargs): + get_objstorage_mock.side_effect = get_mock_objstorage + f(*args, objstorages=objstorages, **kwargs) + + return newf + + return decorator + + +def invoke(*args, env=None, journal_config=None): + config = copy.deepcopy(CLI_CONFIG) + if journal_config: + config["journal_client"] = journal_config + + runner = CliRunner() + with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: + yaml.dump(config, config_fd) + config_fd.seek(0) + args = ["-C" + config_fd.name] + list(args) + return runner.invoke(cli, args, obj={"log_level": logging.DEBUG}, env=env,) + + +def test_replay_help(): + result = invoke("replay", "--help",) + expected = ( + r"^\s*Usage: objstorage replay \[OPTIONS\]\s+" + r"Fill a destination Object Storage.*" + ) + assert result.exit_code == 0, result.output + assert re.match(expected, result.output, re.MULTILINE), result.output + + +NUM_CONTENTS = 10 + + +def _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages): + producer = Producer( + { + "bootstrap.servers": kafka_server, + "client.id": "test-producer", + "acks": "all", + } + ) + + contents = {} + for i in range(NUM_CONTENTS): + content = b"\x00" * 19 + bytes([i]) + sha1 = objstorages["src"].add(content) + contents[sha1] = content + producer.produce( + topic=kafka_prefix + ".content", + key=key_to_kafka(sha1), + value=key_to_kafka({"sha1": sha1, "status": "visible",}), + ) + + producer.flush() + + return contents + + +@_patch_objstorages(["src", "dst"]) +def test_replay_content( + objstorages, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], +): + + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) + + result = invoke( + "replay", + "--stop-after-objects", + str(NUM_CONTENTS), + journal_config={ + "brokers": kafka_server, + "group_id": kafka_consumer_group, + "prefix": kafka_prefix, + }, + ) + + expected = r"Done.\n" + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + for (sha1, content) in contents.items(): + assert sha1 in objstorages["dst"], sha1 + assert objstorages["dst"].get(sha1) == content + + +@_patch_objstorages(["src", "dst"]) +def test_replay_content_structured_log( + objstorages, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + caplog, +): + + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) + + caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") + + expected_obj_ids = set(hash_to_hex(sha1) for sha1 in contents) + + result = invoke( + "replay", + "--stop-after-objects", + str(NUM_CONTENTS), + journal_config={ + "brokers": kafka_server, + "group_id": kafka_consumer_group, + "prefix": kafka_prefix, + }, + ) + expected = r"Done.\n" + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + copied = set() + for record in caplog.records: + logtext = record.getMessage() + if "copied" in logtext: + copied.add(record.args["obj_id"]) + + assert ( + copied == expected_obj_ids + ), "Mismatched logging; see captured log output for details." + + +@_patch_objstorages(["src", "dst"]) +def test_replay_content_static_group_id( + objstorages, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + caplog, +): + + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) + + # Setup log capture to fish the consumer settings out of the log messages + caplog.set_level(logging.DEBUG, "swh.journal.client") + + result = invoke( + "replay", + "--stop-after-objects", + str(NUM_CONTENTS), + env={"KAFKA_GROUP_INSTANCE_ID": "static-group-instance-id"}, + journal_config={ + "brokers": kafka_server, + "group_id": kafka_consumer_group, + "prefix": kafka_prefix, + }, + ) + expected = r"Done.\n" + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + consumer_settings = None + for record in caplog.records: + if "Consumer settings" in record.message: + consumer_settings = record.args + break + + assert consumer_settings is not None, ( + "Failed to get consumer settings out of the consumer log. " + "See log capture for details." + ) + assert consumer_settings["group.instance.id"] == "static-group-instance-id" + assert consumer_settings["session.timeout.ms"] == 60 * 10 * 1000 + assert consumer_settings["max.poll.interval.ms"] == 90 * 10 * 1000 + + for (sha1, content) in contents.items(): + assert sha1 in objstorages["dst"], sha1 + assert objstorages["dst"].get(sha1) == content + + +@_patch_objstorages(["src", "dst"]) +def test_replay_content_exclude( + objstorages, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], +): + + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) + + excluded_contents = list(contents)[0::2] # picking half of them + with tempfile.NamedTemporaryFile(mode="w+b") as fd: + fd.write(b"".join(sorted(excluded_contents))) + + fd.seek(0) + + result = invoke( + "replay", + "--stop-after-objects", + str(NUM_CONTENTS), + "--exclude-sha1-file", + fd.name, + journal_config={ + "brokers": kafka_server, + "group_id": kafka_consumer_group, + "prefix": kafka_prefix, + }, + ) + expected = r"Done.\n" + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + for (sha1, content) in contents.items(): + if sha1 in excluded_contents: + assert sha1 not in objstorages["dst"], sha1 + else: + assert sha1 in objstorages["dst"], sha1 + assert objstorages["dst"].get(sha1) == content + + +NUM_CONTENTS_DST = 5 + + +@_patch_objstorages(["src", "dst"]) +@pytest.mark.parametrize( + "check_dst,expected_copied,expected_in_dst", + [ + (True, NUM_CONTENTS - NUM_CONTENTS_DST, NUM_CONTENTS_DST), + (False, NUM_CONTENTS, 0), + ], +) +def test_replay_content_check_dst( + objstorages, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + check_dst: bool, + expected_copied: int, + expected_in_dst: int, + caplog, +): + + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) + + for i, (sha1, content) in enumerate(contents.items()): + if i >= NUM_CONTENTS_DST: + break + + objstorages["dst"].add(content, obj_id=sha1) + + caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") + + result = invoke( + "replay", + "--stop-after-objects", + str(NUM_CONTENTS), + "--check-dst" if check_dst else "--no-check-dst", + journal_config={ + "brokers": kafka_server, + "group_id": kafka_consumer_group, + "prefix": kafka_prefix, + }, + ) + expected = r"Done.\n" + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + copied = 0 + in_dst = 0 + for record in caplog.records: + logtext = record.getMessage() + if "copied" in logtext: + copied += 1 + elif "in dst" in logtext: + in_dst += 1 + + assert ( + copied == expected_copied and in_dst == expected_in_dst + ), "Unexpected amount of objects copied, see the captured log for details" + + for (sha1, content) in contents.items(): + assert sha1 in objstorages["dst"], sha1 + assert objstorages["dst"].get(sha1) == content + + +class FlakyObjStorage(InMemoryObjStorage): + def __init__(self, *args, **kwargs): + state = kwargs.pop("state") + self.failures_left = Counter(kwargs.pop("failures")) + super().__init__(*args, **kwargs) + if state: + self.state = state + + def flaky_operation(self, op, obj_id): + if self.failures_left[op, obj_id] > 0: + self.failures_left[op, obj_id] -= 1 + raise RuntimeError("Failed %s on %s" % (op, hash_to_hex(obj_id))) + + def get(self, obj_id): + self.flaky_operation("get", obj_id) + return super().get(obj_id) + + def add(self, data, obj_id=None, check_presence=True): + self.flaky_operation("add", obj_id) + return super().add(data, obj_id=obj_id, check_presence=check_presence) + + def __contains__(self, obj_id): + self.flaky_operation("in", obj_id) + return super().__contains__(obj_id) + + +@_patch_objstorages(["src", "dst"]) +def test_replay_content_check_dst_retry( + objstorages, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + monkeypatch_retry_sleep, +): + + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) + + failures = {} + for i, (sha1, content) in enumerate(contents.items()): + if i >= NUM_CONTENTS_DST: + break + + objstorages["dst"].add(content, obj_id=sha1) + failures["in", sha1] = 1 + + orig_dst = objstorages["dst"] + objstorages["dst"] = FlakyObjStorage(state=orig_dst.state, failures=failures) + + result = invoke( + "replay", + "--stop-after-objects", + str(NUM_CONTENTS), + "--check-dst", + journal_config={ + "brokers": kafka_server, + "group_id": kafka_consumer_group, + "prefix": kafka_prefix, + }, + ) + expected = r"Done.\n" + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + for (sha1, content) in contents.items(): + assert sha1 in objstorages["dst"], sha1 + assert objstorages["dst"].get(sha1) == content + + +@_patch_objstorages(["src", "dst"]) +def test_replay_content_failed_copy_retry( + objstorages, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + caplog, + monkeypatch_retry_sleep, +): + + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) + + add_failures = {} + get_failures = {} + definitely_failed = set() + + # We want to generate operations failing 1 to CONTENT_REPLAY_RETRIES times. + # We generate failures for 2 different operations, get and add. + num_retry_contents = 2 * CONTENT_REPLAY_RETRIES + + assert ( + num_retry_contents < NUM_CONTENTS + ), "Need to generate more test contents to properly test retry behavior" + + for i, sha1 in enumerate(contents): + if i >= num_retry_contents: + break + + # This generates a number of failures, up to CONTENT_REPLAY_RETRIES + num_failures = (i % CONTENT_REPLAY_RETRIES) + 1 + + # This generates failures of add for the first CONTENT_REPLAY_RETRIES + # objects, then failures of get. + if i < CONTENT_REPLAY_RETRIES: + add_failures["add", sha1] = num_failures + else: + get_failures["get", sha1] = num_failures + + # Only contents that have CONTENT_REPLAY_RETRIES or more are + # definitely failing + if num_failures >= CONTENT_REPLAY_RETRIES: + definitely_failed.add(hash_to_hex(sha1)) + + objstorages["dst"] = FlakyObjStorage( + state=objstorages["dst"].state, failures=add_failures, + ) + objstorages["src"] = FlakyObjStorage( + state=objstorages["src"].state, failures=get_failures, + ) + + caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") + + result = invoke( + "replay", + "--stop-after-objects", + str(NUM_CONTENTS), + journal_config={ + "brokers": kafka_server, + "group_id": kafka_consumer_group, + "prefix": kafka_prefix, + }, + ) + expected = r"Done.\n" + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + copied = 0 + actually_failed = set() + for record in caplog.records: + logtext = record.getMessage() + if "copied" in logtext: + copied += 1 + elif "Failed operation" in logtext: + assert record.levelno == logging.ERROR + assert record.args["retries"] == CONTENT_REPLAY_RETRIES + actually_failed.add(record.args["obj_id"]) + + assert ( + actually_failed == definitely_failed + ), "Unexpected object copy failures; see captured log for details" + + for (sha1, content) in contents.items(): + if hash_to_hex(sha1) in definitely_failed: + assert sha1 not in objstorages["dst"] + continue + + assert sha1 in objstorages["dst"], sha1 + assert objstorages["dst"].get(sha1) == content + + +@_patch_objstorages(["src", "dst"]) +def test_replay_content_objnotfound( + objstorages, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + caplog, +): + + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) + + num_contents_deleted = 5 + contents_deleted = set() + + for i, sha1 in enumerate(contents): + if i >= num_contents_deleted: + break + + del objstorages["src"].state[sha1] + contents_deleted.add(hash_to_hex(sha1)) + + caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") + + result = invoke( + "replay", + "--stop-after-objects", + str(NUM_CONTENTS), + journal_config={ + "brokers": kafka_server, + "group_id": kafka_consumer_group, + "prefix": kafka_prefix, + }, + ) + expected = r"Done.\n" + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + copied = 0 + not_in_src = set() + for record in caplog.records: + logtext = record.getMessage() + if "copied" in logtext: + copied += 1 + elif "object not found" in logtext: + # Check that the object id can be recovered from logs + assert record.levelno == logging.ERROR + not_in_src.add(record.args["obj_id"]) + + assert ( + copied == NUM_CONTENTS - num_contents_deleted + ), "Unexpected number of contents copied" + + assert ( + not_in_src == contents_deleted + ), "Mismatch between deleted contents and not_in_src logs" + + for (sha1, content) in contents.items(): + if sha1 not in objstorages["src"]: + continue + assert sha1 in objstorages["dst"], sha1 + assert objstorages["dst"].get(sha1) == content diff --git a/swh/objstorage/replayer/tests/test_replay.py b/swh/objstorage/replayer/tests/test_replay.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/replayer/tests/test_replay.py @@ -0,0 +1,71 @@ +# Copyright (C) 2019-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import functools + +from hypothesis import given, settings, HealthCheck +from hypothesis.strategies import lists, sets, binary + +from swh.model.hypothesis_strategies import present_contents +from swh.objstorage import get_objstorage +from swh.objstorage.replayer.replay import ( + is_hash_in_bytearray, + process_replay_objects_content, +) +from swh.journal.tests.utils import MockedJournalClient, MockedKafkaWriter + + +hash_strategy = binary(min_size=20, max_size=20) + + +@settings(max_examples=500) +@given( + sets(hash_strategy, min_size=0, max_size=500), sets(hash_strategy, min_size=10), +) +def test_is_hash_in_bytearray(haystack, needles): + array = b"".join(sorted(haystack)) + needles |= haystack # Exhaustively test for all objects in the array + for needle in needles: + assert is_hash_in_bytearray(needle, array, len(haystack)) == ( + needle in haystack + ) + + +@given(lists(present_contents(), min_size=1)) +@settings(suppress_health_check=[HealthCheck.too_slow]) +def test_replay_content(objects): + + queue = [] + replayer = MockedJournalClient(queue) + writer = MockedKafkaWriter(queue) + + objstorage1 = get_objstorage(cls="memory", args={}) + objstorage2 = get_objstorage(cls="memory", args={}) + + contents = [] + for obj in objects: + objstorage1.add(obj.data) + contents.append(obj) + writer.write_addition("content", obj) + + # Bail out early if we didn't insert any relevant objects... + queue_size = len(queue) + assert queue_size != 0, "No test objects found; hypothesis strategy bug?" + + assert replayer.stop_after_objects is None + replayer.stop_after_objects = queue_size + + worker_fn = functools.partial( + process_replay_objects_content, src=objstorage1, dst=objstorage2 + ) + + replayer.process(worker_fn) + + # only content with status visible will be copied in storage2 + expected_objstorage_state = { + c.sha1: c.data for c in contents if c.status == "visible" + } + + assert expected_objstorage_state == objstorage2.state diff --git a/tox.ini b/tox.ini --- a/tox.ini +++ b/tox.ini @@ -8,8 +8,8 @@ pytest-cov commands = pytest --doctest-modules \ - {envsitepackagesdir}/swh/foo \ - --cov={envsitepackagesdir}/swh/foo \ + {envsitepackagesdir}/swh/objstorage/replayer \ + --cov={envsitepackagesdir}/swh/objstorage/replayer \ --cov-branch {posargs} [testenv:black]