diff --git a/PKG-INFO b/PKG-INFO index 81096b0..fc24ee4 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,26 +1,26 @@ Metadata-Version: 2.1 Name: swh.objstorage.replayer -Version: 0.2.1 +Version: 0.2.2 Summary: Software Heritage content replayer Home-page: https://forge.softwareheritage.org/diffusion/swh-objstorage-replayer Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-objstorage-replayer Description: swh-objstorage-replayer ======================= Simple command line tool to replicate content objects from a source Object storage to a destination one by listening the `content` topic of a `swh.journal` kafka stream. Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 3 - Alpha Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/docs/cli.rst b/docs/cli.rst new file mode 100644 index 0000000..26fc96b --- /dev/null +++ b/docs/cli.rst @@ -0,0 +1,8 @@ +.. _swh-objstorage-replayer-cli: + +Command-line interface +====================== + +.. click:: swh.objstorage.replayer.cli:content_replay + :prog: swh objstorage replay + :nested: full diff --git a/docs/index.rst b/docs/index.rst index ec7dbd1..cd5c2eb 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,20 +1,21 @@ .. _swh-objstorage-replayer: Software Heritage - Object storage replayer =========================================== This Python module provides a command line tool to replicate content objects from a source Object storage to a destination one by listening the `content` topic of a `swh.journal` kafka stream. It is meant to be used as the brick of a mirror setup dedicated to replicating content objects. Reference Documentation ----------------------- .. toctree:: :maxdepth: 2 + cli /apidoc/swh.objstorage.replayer diff --git a/swh.objstorage.replayer.egg-info/PKG-INFO b/swh.objstorage.replayer.egg-info/PKG-INFO index 81096b0..fc24ee4 100644 --- a/swh.objstorage.replayer.egg-info/PKG-INFO +++ b/swh.objstorage.replayer.egg-info/PKG-INFO @@ -1,26 +1,26 @@ Metadata-Version: 2.1 Name: swh.objstorage.replayer -Version: 0.2.1 +Version: 0.2.2 Summary: Software Heritage content replayer Home-page: https://forge.softwareheritage.org/diffusion/swh-objstorage-replayer Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-objstorage-replayer Description: swh-objstorage-replayer ======================= Simple command line tool to replicate content objects from a source Object storage to a destination one by listening the `content` topic of a `swh.journal` kafka stream. Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 3 - Alpha Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.objstorage.replayer.egg-info/SOURCES.txt b/swh.objstorage.replayer.egg-info/SOURCES.txt index cd46664..6035f7c 100644 --- a/swh.objstorage.replayer.egg-info/SOURCES.txt +++ b/swh.objstorage.replayer.egg-info/SOURCES.txt @@ -1,39 +1,40 @@ .gitignore .pre-commit-config.yaml AUTHORS CODE_OF_CONDUCT.md CONTRIBUTORS LICENSE MANIFEST.in Makefile README.md mypy.ini pyproject.toml pytest.ini requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py tox.ini docs/.gitignore docs/Makefile +docs/cli.rst docs/conf.py docs/index.rst docs/_static/.placeholder docs/_templates/.placeholder swh/__init__.py swh.objstorage.replayer.egg-info/PKG-INFO swh.objstorage.replayer.egg-info/SOURCES.txt swh.objstorage.replayer.egg-info/dependency_links.txt swh.objstorage.replayer.egg-info/entry_points.txt swh.objstorage.replayer.egg-info/requires.txt swh.objstorage.replayer.egg-info/top_level.txt swh/objstorage/__init__.py swh/objstorage/replayer/__init__.py swh/objstorage/replayer/cli.py swh/objstorage/replayer/py.typed swh/objstorage/replayer/replay.py swh/objstorage/replayer/tests/__init__.py swh/objstorage/replayer/tests/test_cli.py swh/objstorage/replayer/tests/test_replay.py \ No newline at end of file diff --git a/swh/objstorage/replayer/cli.py b/swh/objstorage/replayer/cli.py index e281b0b..673a344 100644 --- a/swh/objstorage/replayer/cli.py +++ b/swh/objstorage/replayer/cli.py @@ -1,138 +1,137 @@ # Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import logging import click try: from systemd.daemon import notify except ImportError: notify = None from swh.objstorage.cli import objstorage_cli_group @objstorage_cli_group.command("replay") @click.option( "--stop-after-objects", "-n", default=None, type=int, help="Stop after processing this many objects. Default is to run forever.", ) @click.option( "--exclude-sha1-file", default=None, type=click.File("rb"), help="File containing a sorted array of hashes to be excluded.", ) @click.option( "--check-dst/--no-check-dst", default=True, help="Check whether the destination contains the object before copying.", ) @click.pass_context def content_replay(ctx, stop_after_objects, exclude_sha1_file, check_dst): """Fill a destination Object Storage using a journal stream. This is typically used for a mirror configuration, by reading a Journal and retrieving objects from an existing source ObjStorage. There can be several 'replayers' filling a given ObjStorage as long as they use the same ``group-id``. You can use the ``KAFKA_GROUP_INSTANCE_ID`` environment variable to use KIP-345 static group membership. This service retrieves object ids to copy from the 'content' topic. It will only copy object's content if the object's description in the kafka nmessage has the status:visible set. ``--exclude-sha1-file`` may be used to exclude some hashes to speed-up the replay in case many of the contents are already in the destination objstorage. It must contain a concatenation of all (sha1) hashes, and it must be sorted. This file will not be fully loaded into memory at any given time, so it can be arbitrarily large. ``--check-dst`` sets whether the replayer should check in the destination ObjStorage before copying an object. You can turn that off if you know you're copying to an empty ObjStorage. """ import functools import mmap from swh.journal.client import get_journal_client from swh.model.model import SHA1_SIZE from swh.objstorage.factory import get_objstorage from swh.objstorage.replayer.replay import ( is_hash_in_bytearray, process_replay_objects_content, ) conf = ctx.obj["config"] try: objstorage_src = get_objstorage(**conf.pop("objstorage")) except KeyError: ctx.fail("You must have a source objstorage configured in " "your config file.") try: objstorage_dst = get_objstorage(**conf.pop("objstorage_dst")) except KeyError: ctx.fail( "You must have a destination objstorage configured " "in your config file." ) if exclude_sha1_file: map_ = mmap.mmap(exclude_sha1_file.fileno(), 0, prot=mmap.PROT_READ) if map_.size() % SHA1_SIZE != 0: ctx.fail( "--exclude-sha1 must link to a file whose size is an " "exact multiple of %d bytes." % SHA1_SIZE ) nb_excluded_hashes = int(map_.size() / SHA1_SIZE) def exclude_fn(obj): return is_hash_in_bytearray(obj["sha1"], map_, nb_excluded_hashes) else: exclude_fn = None + journal_cfg = conf["journal_client"] + journal_cfg.setdefault("cls", "kafka") client = get_journal_client( - "kafka", - **conf["journal_client"], - stop_after_objects=stop_after_objects, - object_types=("content",), + **journal_cfg, stop_after_objects=stop_after_objects, object_types=("content",), ) worker_fn = functools.partial( process_replay_objects_content, src=objstorage_src, dst=objstorage_dst, exclude_fn=exclude_fn, check_dst=check_dst, ) if notify: notify("READY=1") try: client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: if notify: notify("STOPPING=1") client.close() def main(): logging.basicConfig() return objstorage_cli_group(auto_envvar_prefix="SWH_OBJSTORAGE") if __name__ == "__main__": main() diff --git a/swh/objstorage/replayer/replay.py b/swh/objstorage/replayer/replay.py index f22d550..f4e1b9b 100644 --- a/swh/objstorage/replayer/replay.py +++ b/swh/objstorage/replayer/replay.py @@ -1,298 +1,307 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from time import time from typing import Callable, Dict, List, Optional from sentry_sdk import capture_exception, push_scope try: from systemd.daemon import notify except ImportError: notify = None from tenacity import ( retry, retry_if_exception_type, stop_after_attempt, wait_random_exponential, ) from swh.core.statsd import statsd from swh.model.hashutil import hash_to_hex from swh.model.model import SHA1_SIZE from swh.objstorage.objstorage import ID_HASH_ALGO, ObjNotFoundError, ObjStorage logger = logging.getLogger(__name__) CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total" CONTENT_RETRY_METRIC = "swh_content_replayer_retries_total" CONTENT_BYTES_METRIC = "swh_content_replayer_bytes" CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds" def is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=SHA1_SIZE): """ Checks if the given hash is in the provided `array`. The array must be a *sorted* list of sha1 hashes, and contain `nb_hashes` hashes (so its size must by `nb_hashes*hash_size` bytes). Args: hash_ (bytes): the hash to look for array (bytes): a sorted concatenated array of hashes (may be of any type supporting slice indexing, eg. :class:`mmap.mmap`) nb_hashes (int): number of hashes in the array hash_size (int): size of a hash (defaults to 20, for SHA1) Example: >>> import os >>> hash1 = os.urandom(20) >>> hash2 = os.urandom(20) >>> hash3 = os.urandom(20) >>> array = b''.join(sorted([hash1, hash2])) >>> is_hash_in_bytearray(hash1, array, 2) True >>> is_hash_in_bytearray(hash2, array, 2) True >>> is_hash_in_bytearray(hash3, array, 2) False """ if len(hash_) != hash_size: raise ValueError("hash_ does not match the provided hash_size.") def get_hash(position): return array[position * hash_size : (position + 1) * hash_size] # Regular dichotomy: left = 0 right = nb_hashes while left < right - 1: middle = int((right + left) / 2) pivot = get_hash(middle) if pivot == hash_: return True elif pivot < hash_: left = middle else: right = middle return get_hash(left) == hash_ class ReplayError(Exception): """An error occurred during the replay of an object""" def __init__(self, operation, *, obj_id, exc): self.operation = operation self.obj_id = hash_to_hex(obj_id) self.exc = exc def __str__(self): return "ReplayError(doing %s, %s, %s)" % (self.operation, self.obj_id, self.exc) -def log_replay_retry(retry_obj, sleep, last_result): +def log_replay_retry(retry_state, sleep=None, last_result=None): """Log a retry of the content replayer""" - exc = last_result.exception() + try: + exc = retry_state.outcome.exception() + attempt_number = retry_state.attempt_number + except AttributeError: + # tenacity < 5.0 + exc = last_result.exception() + attempt_number = retry_state.statistics["attempt_number"] + logger.debug( "Retry operation %(operation)s on %(obj_id)s: %(exc)s", {"operation": exc.operation, "obj_id": exc.obj_id, "exc": str(exc.exc)}, ) statsd.increment( CONTENT_RETRY_METRIC, - tags={ - "operation": exc.operation, - "attempt": str(retry_obj.statistics["attempt_number"]), - }, + tags={"operation": exc.operation, "attempt": str(attempt_number),}, ) -def log_replay_error(last_attempt): +def log_replay_error(retry_state): """Log a replay error to sentry""" - exc = last_attempt.exception() + try: + exc = retry_state.outcome.exception() + except AttributeError: + # tenacity < 5.0 + exc = retry_state.exception() + with push_scope() as scope: scope.set_tag("operation", exc.operation) scope.set_extra("obj_id", exc.obj_id) capture_exception(exc.exc) logger.error( "Failed operation %(operation)s on %(obj_id)s after %(retries)s" " retries: %(exc)s", { "obj_id": exc.obj_id, "operation": exc.operation, "exc": str(exc.exc), - "retries": last_attempt.attempt_number, + "retries": retry_state.attempt_number, }, ) return None CONTENT_REPLAY_RETRIES = 3 content_replay_retry = retry( retry=retry_if_exception_type(ReplayError), stop=stop_after_attempt(CONTENT_REPLAY_RETRIES), wait=wait_random_exponential(multiplier=1, max=60), before_sleep=log_replay_retry, retry_error_callback=log_replay_error, ) @content_replay_retry def copy_object(obj_id, src, dst): hex_obj_id = hash_to_hex(obj_id) obj = "" try: with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "get"}): obj = src.get(obj_id) logger.debug("retrieved %(obj_id)s", {"obj_id": hex_obj_id}) with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "put"}): dst.add(obj, obj_id=obj_id, check_presence=False) logger.debug("copied %(obj_id)s", {"obj_id": hex_obj_id}) statsd.increment(CONTENT_BYTES_METRIC, len(obj)) except ObjNotFoundError: logger.error( "Failed to copy %(obj_id)s: object not found", {"obj_id": hex_obj_id} ) raise except Exception as exc: raise ReplayError("copy", obj_id=obj_id, exc=exc) from None return len(obj) @content_replay_retry def obj_in_objstorage(obj_id, dst): """Check if an object is already in an objstorage, tenaciously""" try: return obj_id in dst except Exception as exc: raise ReplayError("in_dst", obj_id=obj_id, exc=exc) from None def process_replay_objects_content( all_objects: Dict[str, List[dict]], *, src: ObjStorage, dst: ObjStorage, exclude_fn: Optional[Callable[[dict], bool]] = None, check_dst: bool = True, ): """ Takes a list of records from Kafka (see :py:func:`swh.journal.client.JournalClient.process`) and copies them from the `src` objstorage to the `dst` objstorage, if: * `obj['status']` is `'visible'` * `exclude_fn(obj)` is `False` (if `exclude_fn` is provided) * `obj['sha1'] not in dst` (if `check_dst` is True) Args: all_objects: Objects passed by the Kafka client. Most importantly, `all_objects['content'][*]['sha1']` is the sha1 hash of each content. src: An object storage (see :py:func:`swh.objstorage.get_objstorage`) dst: An object storage (see :py:func:`swh.objstorage.get_objstorage`) exclude_fn: Determines whether an object should be copied. check_dst: Determines whether we should check the destination objstorage before copying. Example: >>> from swh.objstorage.factory import get_objstorage >>> src = get_objstorage('memory') >>> dst = get_objstorage('memory') >>> id1 = src.add(b'foo bar') >>> id2 = src.add(b'baz qux') >>> kafka_partitions = { ... 'content': [ ... { ... 'sha1': id1, ... 'status': 'visible', ... }, ... { ... 'sha1': id2, ... 'status': 'visible', ... }, ... ] ... } >>> process_replay_objects_content( ... kafka_partitions, src=src, dst=dst, ... exclude_fn=lambda obj: obj['sha1'] == id1) >>> id1 in dst False >>> id2 in dst True """ vol = [] nb_skipped = 0 nb_failures = 0 t0 = time() for (object_type, objects) in all_objects.items(): if object_type != "content": logger.warning( "Received a series of %s, this should not happen", object_type ) continue for obj in objects: obj_id = obj[ID_HASH_ALGO] if obj["status"] != "visible": nb_skipped += 1 logger.debug( "skipped %s (status=%s)", hash_to_hex(obj_id), obj["status"] ) statsd.increment( CONTENT_OPERATIONS_METRIC, tags={"decision": "skipped", "status": obj["status"]}, ) elif exclude_fn and exclude_fn(obj): nb_skipped += 1 logger.debug("skipped %s (manually excluded)", hash_to_hex(obj_id)) statsd.increment( CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"} ) elif check_dst and obj_in_objstorage(obj_id, dst): nb_skipped += 1 logger.debug("skipped %s (in dst)", hash_to_hex(obj_id)) statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "in_dst"}) else: try: copied = copy_object(obj_id, src, dst) except ObjNotFoundError: nb_skipped += 1 statsd.increment( CONTENT_OPERATIONS_METRIC, tags={"decision": "not_in_src"} ) else: if copied is None: nb_failures += 1 statsd.increment( CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"} ) else: vol.append(copied) statsd.increment( CONTENT_OPERATIONS_METRIC, tags={"decision": "copied"} ) dt = time() - t0 logger.info( "processed %s content objects in %.1fsec " "(%.1f obj/sec, %.1fMB/sec) - %d failed - %d skipped", len(vol), dt, len(vol) / dt, sum(vol) / 1024 / 1024 / dt, nb_failures, nb_skipped, ) if notify: notify("WATCHDOG=1") diff --git a/swh/objstorage/replayer/tests/test_replay.py b/swh/objstorage/replayer/tests/test_replay.py index e292295..9ebbc8f 100644 --- a/swh/objstorage/replayer/tests/test_replay.py +++ b/swh/objstorage/replayer/tests/test_replay.py @@ -1,71 +1,72 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools from hypothesis import given, settings from hypothesis.strategies import sets from swh.journal.client import JournalClient -from swh.journal.writer.kafka import KafkaJournalWriter +from swh.journal.writer import get_journal_writer from swh.model.hypothesis_strategies import sha1 from swh.model.model import Content from swh.objstorage.factory import get_objstorage from swh.objstorage.replayer.replay import ( is_hash_in_bytearray, process_replay_objects_content, ) CONTENTS = [Content.from_data(f"foo{i}".encode()) for i in range(10)] + [ Content.from_data(f"forbidden foo{i}".encode(), status="hidden") for i in range(10) ] @settings(max_examples=500) @given( sets(sha1(), min_size=0, max_size=500), sets(sha1(), min_size=10), ) def test_is_hash_in_bytearray(haystack, needles): array = b"".join(sorted(haystack)) needles |= haystack # Exhaustively test for all objects in the array for needle in needles: assert is_hash_in_bytearray(needle, array, len(haystack)) == ( needle in haystack ) def test_replay_content(kafka_server, kafka_prefix, kafka_consumer_group): objstorage1 = get_objstorage(cls="memory") objstorage2 = get_objstorage(cls="memory") - writer = KafkaJournalWriter( + writer = get_journal_writer( + cls="kafka", brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, anonymize=False, ) for content in CONTENTS: objstorage1.add(content.data) writer.write_addition("content", content) replayer = JournalClient( brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, # stop_after_objects=len(objects), ) worker_fn = functools.partial( process_replay_objects_content, src=objstorage1, dst=objstorage2 ) replayer.process(worker_fn) # only content with status visible will be copied in storage2 expected_objstorage_state = { c.sha1: c.data for c in CONTENTS if c.status == "visible" } assert expected_objstorage_state == objstorage2.state diff --git a/tox.ini b/tox.ini index 49379b7..461372b 100644 --- a/tox.ini +++ b/tox.ini @@ -1,35 +1,73 @@ [tox] envlist=black,flake8,mypy,py3 [testenv] extras = testing deps = pytest-cov commands = pytest --doctest-modules \ {envsitepackagesdir}/swh/objstorage/replayer \ --cov={envsitepackagesdir}/swh/objstorage/replayer \ --cov-branch {posargs} [testenv:black] skip_install = true deps = black==19.10b0 commands = {envpython} -m black --check swh [testenv:flake8] skip_install = true deps = flake8 commands = {envpython} -m flake8 [testenv:mypy] extras = testing deps = mypy commands = mypy swh + +# build documentation outside swh-environment using the current +# git HEAD of swh-docs, is executed on CI for each diff to prevent +# breaking doc build +[testenv:sphinx] +whitelist_externals = make +usedevelop = true +extras = + testing +deps = + # fetch and install swh-docs in develop mode + -e git+https://forge.softwareheritage.org/source/swh-docs#egg=swh.docs + +setenv = + SWH_PACKAGE_DOC_TOX_BUILD = 1 + # turn warnings into errors + SPHINXOPTS = -W +commands = + make -I ../.tox/sphinx/src/swh-docs/swh/ -C docs + + +# build documentation only inside swh-environment using local state +# of swh-docs package +[testenv:sphinx-dev] +whitelist_externals = make +usedevelop = true +extras = + testing +deps = + # install swh-docs in develop mode + -e ../swh-docs + +setenv = + SWH_PACKAGE_DOC_TOX_BUILD = 1 + # turn warnings into errors + SPHINXOPTS = -W +commands = + make -I ../.tox/sphinx-dev/src/swh-docs/swh/ -C docs