diff --git a/setup.py b/setup.py index fef414f..6423ae3 100755 --- a/setup.py +++ b/setup.py @@ -1,73 +1,73 @@ #!/usr/bin/env python3 # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from setuptools import setup, find_packages from os import path from io import open here = path.abspath(path.dirname(__file__)) # Get the long description from the README file with open(path.join(here, "README.md"), encoding="utf-8") as f: long_description = f.read() def parse_requirements(name=None): if name: reqf = "requirements-%s.txt" % name else: reqf = "requirements.txt" requirements = [] if not path.exists(reqf): return requirements with open(reqf) as f: for line in f.readlines(): line = line.strip() if not line or line.startswith("#"): continue requirements.append(line) return requirements # Edit this part to match your module. # Full sample: # https://forge.softwareheritage.org/diffusion/DCORE/browse/master/setup.py setup( name="swh.objstorage.replayer", # example: swh.loader.pypi description="Software Heritage content replayer", long_description=long_description, long_description_content_type="text/markdown", python_requires=">=3.7", author="Software Heritage developers", author_email="swh-devel@inria.fr", url="https://forge.softwareheritage.org/diffusion/swh-objstorage-replayer", packages=find_packages(), # packages's modules install_requires=parse_requirements() + parse_requirements("swh"), tests_require=parse_requirements("test"), setup_requires=["setuptools_scm"], extras_require={"testing": parse_requirements("test")}, use_scm_version=True, include_package_data=True, entry_points=""" [swh.cli.subcommands] - content-replayer=swh.objstorage.replayer.cli:cli + content-replayer=swh.objstorage.replayer.cli:objstorage_cli_group """, classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 3 - Alpha", ], project_urls={ "Bug Reports": "https://forge.softwareheritage.org/maniphest", "Funding": "https://www.softwareheritage.org/donate", "Source": "https://forge.softwareheritage.org/source/swh-objstorage-replayer", }, ) diff --git a/swh/objstorage/replayer/cli.py b/swh/objstorage/replayer/cli.py index 691a0cc..917cf18 100644 --- a/swh/objstorage/replayer/cli.py +++ b/swh/objstorage/replayer/cli.py @@ -1,133 +1,135 @@ # Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import functools +# WARNING: do not import unnecessary things here to keep cli startup time under +# control import logging -import mmap import click try: from systemd.daemon import notify except ImportError: notify = None -from swh.model.model import SHA1_SIZE -from swh.journal.client import get_journal_client -from swh.objstorage.cli import cli -from swh.objstorage.factory import get_objstorage +from swh.objstorage.cli import cli as objstorage_cli_group -from swh.objstorage.replayer.replay import is_hash_in_bytearray -from swh.objstorage.replayer.replay import process_replay_objects_content - -@cli.command("replay") +@objstorage_cli_group.command("replay") @click.option( "--stop-after-objects", "-n", default=None, type=int, help="Stop after processing this many objects. Default is to run forever.", ) @click.option( "--exclude-sha1-file", default=None, type=click.File("rb"), help="File containing a sorted array of hashes to be excluded.", ) @click.option( "--check-dst/--no-check-dst", default=True, help="Check whether the destination contains the object before copying.", ) @click.pass_context def content_replay(ctx, stop_after_objects, exclude_sha1_file, check_dst): """Fill a destination Object Storage using a journal stream. This is typically used for a mirror configuration, by reading a Journal and retrieving objects from an existing source ObjStorage. There can be several 'replayers' filling a given ObjStorage as long as they use the same ``group-id``. You can use the ``KAFKA_GROUP_INSTANCE_ID`` environment variable to use KIP-345 static group membership. This service retrieves object ids to copy from the 'content' topic. It will only copy object's content if the object's description in the kafka nmessage has the status:visible set. ``--exclude-sha1-file`` may be used to exclude some hashes to speed-up the replay in case many of the contents are already in the destination objstorage. It must contain a concatenation of all (sha1) hashes, and it must be sorted. This file will not be fully loaded into memory at any given time, so it can be arbitrarily large. ``--check-dst`` sets whether the replayer should check in the destination ObjStorage before copying an object. You can turn that off if you know you're copying to an empty ObjStorage. """ + import functools + import mmap + from swh.model.model import SHA1_SIZE + from swh.journal.client import get_journal_client + from swh.objstorage.factory import get_objstorage + from swh.objstorage.replayer.replay import is_hash_in_bytearray + from swh.objstorage.replayer.replay import process_replay_objects_content + conf = ctx.obj["config"] try: objstorage_src = get_objstorage(**conf.pop("objstorage")) except KeyError: ctx.fail("You must have a source objstorage configured in " "your config file.") try: objstorage_dst = get_objstorage(**conf.pop("objstorage_dst")) except KeyError: ctx.fail( "You must have a destination objstorage configured " "in your config file." ) if exclude_sha1_file: map_ = mmap.mmap(exclude_sha1_file.fileno(), 0, prot=mmap.PROT_READ) if map_.size() % SHA1_SIZE != 0: ctx.fail( "--exclude-sha1 must link to a file whose size is an " "exact multiple of %d bytes." % SHA1_SIZE ) nb_excluded_hashes = int(map_.size() / SHA1_SIZE) def exclude_fn(obj): return is_hash_in_bytearray(obj["sha1"], map_, nb_excluded_hashes) else: exclude_fn = None client = get_journal_client( "kafka", **conf["journal_client"], stop_after_objects=stop_after_objects, object_types=("content",), ) worker_fn = functools.partial( process_replay_objects_content, src=objstorage_src, dst=objstorage_dst, exclude_fn=exclude_fn, check_dst=check_dst, ) if notify: notify("READY=1") try: client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: if notify: notify("STOPPING=1") client.close() def main(): logging.basicConfig() - return cli(auto_envvar_prefix="SWH_OBJSTORAGE") + return objstorage_cli_group(auto_envvar_prefix="SWH_OBJSTORAGE") if __name__ == "__main__": main() diff --git a/swh/objstorage/replayer/tests/test_cli.py b/swh/objstorage/replayer/tests/test_cli.py index 8cf1de0..18816a1 100644 --- a/swh/objstorage/replayer/tests/test_cli.py +++ b/swh/objstorage/replayer/tests/test_cli.py @@ -1,556 +1,558 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter import copy import functools import logging import re import tempfile from subprocess import Popen from typing import Tuple from unittest.mock import patch from click.testing import CliRunner from confluent_kafka import Producer import pytest import yaml from swh.journal.serializers import key_to_kafka from swh.model.hashutil import hash_to_hex from swh.objstorage.backends.in_memory import InMemoryObjStorage -from swh.objstorage.replayer.cli import cli +from swh.objstorage.replayer.cli import objstorage_cli_group from swh.objstorage.replayer.replay import CONTENT_REPLAY_RETRIES logger = logging.getLogger(__name__) CLI_CONFIG = { "objstorage": {"cls": "mocked", "name": "src",}, "objstorage_dst": {"cls": "mocked", "name": "dst",}, } @pytest.fixture def monkeypatch_retry_sleep(monkeypatch): from swh.objstorage.replayer.replay import copy_object, obj_in_objstorage monkeypatch.setattr(copy_object.retry, "sleep", lambda x: None) monkeypatch.setattr(obj_in_objstorage.retry, "sleep", lambda x: None) def _patch_objstorages(names): objstorages = {name: InMemoryObjStorage() for name in names} def get_mock_objstorage(cls, **args): assert cls == "mocked", cls return objstorages[args["name"]] def decorator(f): @functools.wraps(f) - @patch("swh.objstorage.replayer.cli.get_objstorage") + @patch("swh.objstorage.factory.get_objstorage") def newf(get_objstorage_mock, *args, **kwargs): get_objstorage_mock.side_effect = get_mock_objstorage f(*args, objstorages=objstorages, **kwargs) return newf return decorator def invoke(*args, env=None, journal_config=None): config = copy.deepcopy(CLI_CONFIG) if journal_config: config["journal_client"] = journal_config runner = CliRunner() with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: yaml.dump(config, config_fd) config_fd.seek(0) args = ["-C" + config_fd.name] + list(args) - return runner.invoke(cli, args, obj={"log_level": logging.DEBUG}, env=env,) + return runner.invoke( + objstorage_cli_group, args, obj={"log_level": logging.DEBUG}, env=env, + ) def test_replay_help(): result = invoke("replay", "--help",) expected = ( r"^\s*Usage: objstorage replay \[OPTIONS\]\s+" r"Fill a destination Object Storage.*" ) assert result.exit_code == 0, result.output assert re.match(expected, result.output, re.MULTILINE), result.output NUM_CONTENTS = 10 def _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages): producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test-producer", "acks": "all", } ) contents = {} for i in range(NUM_CONTENTS): content = b"\x00" * 19 + bytes([i]) sha1 = objstorages["src"].add(content) contents[sha1] = content producer.produce( topic=kafka_prefix + ".content", key=key_to_kafka(sha1), value=key_to_kafka({"sha1": sha1, "status": "visible",}), ) producer.flush() return contents @_patch_objstorages(["src", "dst"]) def test_replay_content( objstorages, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], ): contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) result = invoke( "replay", "--stop-after-objects", str(NUM_CONTENTS), journal_config={ "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content @_patch_objstorages(["src", "dst"]) def test_replay_content_structured_log( objstorages, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], caplog, ): contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") expected_obj_ids = set(hash_to_hex(sha1) for sha1 in contents) result = invoke( "replay", "--stop-after-objects", str(NUM_CONTENTS), journal_config={ "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = set() for record in caplog.records: logtext = record.getMessage() if "copied" in logtext: copied.add(record.args["obj_id"]) assert ( copied == expected_obj_ids ), "Mismatched logging; see captured log output for details." @_patch_objstorages(["src", "dst"]) def test_replay_content_static_group_id( objstorages, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], caplog, ): contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) # Setup log capture to fish the consumer settings out of the log messages caplog.set_level(logging.DEBUG, "swh.journal.client") result = invoke( "replay", "--stop-after-objects", str(NUM_CONTENTS), env={"KAFKA_GROUP_INSTANCE_ID": "static-group-instance-id"}, journal_config={ "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output consumer_settings = None for record in caplog.records: if "Consumer settings" in record.message: consumer_settings = record.args break assert consumer_settings is not None, ( "Failed to get consumer settings out of the consumer log. " "See log capture for details." ) assert consumer_settings["group.instance.id"] == "static-group-instance-id" assert consumer_settings["session.timeout.ms"] == 60 * 10 * 1000 assert consumer_settings["max.poll.interval.ms"] == 90 * 10 * 1000 for (sha1, content) in contents.items(): assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content @_patch_objstorages(["src", "dst"]) def test_replay_content_exclude( objstorages, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], ): contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) excluded_contents = list(contents)[0::2] # picking half of them with tempfile.NamedTemporaryFile(mode="w+b") as fd: fd.write(b"".join(sorted(excluded_contents))) fd.seek(0) result = invoke( "replay", "--stop-after-objects", str(NUM_CONTENTS), "--exclude-sha1-file", fd.name, journal_config={ "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): if sha1 in excluded_contents: assert sha1 not in objstorages["dst"], sha1 else: assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content NUM_CONTENTS_DST = 5 @_patch_objstorages(["src", "dst"]) @pytest.mark.parametrize( "check_dst,expected_copied,expected_in_dst", [ (True, NUM_CONTENTS - NUM_CONTENTS_DST, NUM_CONTENTS_DST), (False, NUM_CONTENTS, 0), ], ) def test_replay_content_check_dst( objstorages, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], check_dst: bool, expected_copied: int, expected_in_dst: int, caplog, ): contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) for i, (sha1, content) in enumerate(contents.items()): if i >= NUM_CONTENTS_DST: break objstorages["dst"].add(content, obj_id=sha1) caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") result = invoke( "replay", "--stop-after-objects", str(NUM_CONTENTS), "--check-dst" if check_dst else "--no-check-dst", journal_config={ "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = 0 in_dst = 0 for record in caplog.records: logtext = record.getMessage() if "copied" in logtext: copied += 1 elif "in dst" in logtext: in_dst += 1 assert ( copied == expected_copied and in_dst == expected_in_dst ), "Unexpected amount of objects copied, see the captured log for details" for (sha1, content) in contents.items(): assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content class FlakyObjStorage(InMemoryObjStorage): def __init__(self, *args, **kwargs): state = kwargs.pop("state") self.failures_left = Counter(kwargs.pop("failures")) super().__init__(*args, **kwargs) if state: self.state = state def flaky_operation(self, op, obj_id): if self.failures_left[op, obj_id] > 0: self.failures_left[op, obj_id] -= 1 raise RuntimeError("Failed %s on %s" % (op, hash_to_hex(obj_id))) def get(self, obj_id): self.flaky_operation("get", obj_id) return super().get(obj_id) def add(self, data, obj_id=None, check_presence=True): self.flaky_operation("add", obj_id) return super().add(data, obj_id=obj_id, check_presence=check_presence) def __contains__(self, obj_id): self.flaky_operation("in", obj_id) return super().__contains__(obj_id) @_patch_objstorages(["src", "dst"]) def test_replay_content_check_dst_retry( objstorages, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], monkeypatch_retry_sleep, ): contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) failures = {} for i, (sha1, content) in enumerate(contents.items()): if i >= NUM_CONTENTS_DST: break objstorages["dst"].add(content, obj_id=sha1) failures["in", sha1] = 1 orig_dst = objstorages["dst"] objstorages["dst"] = FlakyObjStorage(state=orig_dst.state, failures=failures) result = invoke( "replay", "--stop-after-objects", str(NUM_CONTENTS), "--check-dst", journal_config={ "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content @_patch_objstorages(["src", "dst"]) def test_replay_content_failed_copy_retry( objstorages, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], caplog, monkeypatch_retry_sleep, ): contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) add_failures = {} get_failures = {} definitely_failed = set() # We want to generate operations failing 1 to CONTENT_REPLAY_RETRIES times. # We generate failures for 2 different operations, get and add. num_retry_contents = 2 * CONTENT_REPLAY_RETRIES assert ( num_retry_contents < NUM_CONTENTS ), "Need to generate more test contents to properly test retry behavior" for i, sha1 in enumerate(contents): if i >= num_retry_contents: break # This generates a number of failures, up to CONTENT_REPLAY_RETRIES num_failures = (i % CONTENT_REPLAY_RETRIES) + 1 # This generates failures of add for the first CONTENT_REPLAY_RETRIES # objects, then failures of get. if i < CONTENT_REPLAY_RETRIES: add_failures["add", sha1] = num_failures else: get_failures["get", sha1] = num_failures # Only contents that have CONTENT_REPLAY_RETRIES or more are # definitely failing if num_failures >= CONTENT_REPLAY_RETRIES: definitely_failed.add(hash_to_hex(sha1)) objstorages["dst"] = FlakyObjStorage( state=objstorages["dst"].state, failures=add_failures, ) objstorages["src"] = FlakyObjStorage( state=objstorages["src"].state, failures=get_failures, ) caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") result = invoke( "replay", "--stop-after-objects", str(NUM_CONTENTS), journal_config={ "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = 0 actually_failed = set() for record in caplog.records: logtext = record.getMessage() if "copied" in logtext: copied += 1 elif "Failed operation" in logtext: assert record.levelno == logging.ERROR assert record.args["retries"] == CONTENT_REPLAY_RETRIES actually_failed.add(record.args["obj_id"]) assert ( actually_failed == definitely_failed ), "Unexpected object copy failures; see captured log for details" for (sha1, content) in contents.items(): if hash_to_hex(sha1) in definitely_failed: assert sha1 not in objstorages["dst"] continue assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content @_patch_objstorages(["src", "dst"]) def test_replay_content_objnotfound( objstorages, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], caplog, ): contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) num_contents_deleted = 5 contents_deleted = set() for i, sha1 in enumerate(contents): if i >= num_contents_deleted: break del objstorages["src"].state[sha1] contents_deleted.add(hash_to_hex(sha1)) caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") result = invoke( "replay", "--stop-after-objects", str(NUM_CONTENTS), journal_config={ "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = 0 not_in_src = set() for record in caplog.records: logtext = record.getMessage() if "copied" in logtext: copied += 1 elif "object not found" in logtext: # Check that the object id can be recovered from logs assert record.levelno == logging.ERROR not_in_src.add(record.args["obj_id"]) assert ( copied == NUM_CONTENTS - num_contents_deleted ), "Unexpected number of contents copied" assert ( not_in_src == contents_deleted ), "Mismatch between deleted contents and not_in_src logs" for (sha1, content) in contents.items(): if sha1 not in objstorages["src"]: continue assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content