diff --git a/PKG-INFO b/PKG-INFO index 8664bd3..81096b0 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,26 +1,26 @@ Metadata-Version: 2.1 Name: swh.objstorage.replayer -Version: 0.2.0 +Version: 0.2.1 Summary: Software Heritage content replayer Home-page: https://forge.softwareheritage.org/diffusion/swh-objstorage-replayer Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-objstorage-replayer Description: swh-objstorage-replayer ======================= Simple command line tool to replicate content objects from a source Object storage to a destination one by listening the `content` topic of a `swh.journal` kafka stream. Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 3 - Alpha Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/debian/changelog b/debian/changelog deleted file mode 100644 index c6cbb11..0000000 --- a/debian/changelog +++ /dev/null @@ -1,5 +0,0 @@ - (0.0.1-1) unstable; urgency=low - - * - - -- AUTHOR-NAME diff --git a/debian/compat b/debian/compat deleted file mode 100644 index ec63514..0000000 --- a/debian/compat +++ /dev/null @@ -1 +0,0 @@ -9 diff --git a/debian/control b/debian/control deleted file mode 100644 index 68a9ef5..0000000 --- a/debian/control +++ /dev/null @@ -1,19 +0,0 @@ -Source: # example: swh-loader-pypi -Maintainer: Software Heritage developers -Section: python -Priority: optional -Build-Depends: debhelper (>= 9), - dh-python (>= 2), - python3-all, - python3-nose, - python3-setuptools, - python3-swh.core, - python3-swh.storage, - python3-vcversioner -Standards-Version: 3.9.6 -Homepage: https://forge.softwareheritage.org/source// - -Package: python3- # example: python3-swh.loader.pypi -Architecture: all -Depends: ${misc:Depends}, ${python3:Depends} -Description: Software Heritage diff --git a/debian/copyright b/debian/copyright deleted file mode 100644 index f216ea5..0000000 --- a/debian/copyright +++ /dev/null @@ -1,22 +0,0 @@ -Format: http://www.debian.org/doc/packaging-manuals/copyright-format/1.0/ - -Files: * -Copyright: 2019 The Software Heritage developers -License: GPL-3+ - -License: GPL-3+ - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - . - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - . - You should have received a copy of the GNU General Public License - along with this program. If not, see . - . - On Debian systems, the complete text of the GNU General Public - License version 3 can be found in `/usr/share/common-licenses/GPL-3'. diff --git a/debian/rules b/debian/rules deleted file mode 100755 index 32f59e6..0000000 --- a/debian/rules +++ /dev/null @@ -1,11 +0,0 @@ -#!/usr/bin/make -f - -export PYBUILD_NAME= # example: swh.loader.pypi -export PYBUILD_TEST_ARGS=--with-doctest -sva !db,!fs - -%: - dh $@ --with python3 --buildsystem=pybuild - -override_dh_install: - dh_install - rm -v $(CURDIR)/debian/python3-*/usr/lib/python*/dist-packages/swh/__init__.py diff --git a/debian/source/format b/debian/source/format deleted file mode 100644 index 163aaf8..0000000 --- a/debian/source/format +++ /dev/null @@ -1 +0,0 @@ -3.0 (quilt) diff --git a/requirements-swh.txt b/requirements-swh.txt index b2ea0a2..47fec26 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,4 +1,4 @@ # Add here internal Software Heritage dependencies, one per line. swh.core[http] >= 0.3 -swh.objstorage >= 0.2.1 +swh.objstorage >= 0.2.2 swh.journal >= 0.4.2 diff --git a/swh.objstorage.replayer.egg-info/PKG-INFO b/swh.objstorage.replayer.egg-info/PKG-INFO index 8664bd3..81096b0 100644 --- a/swh.objstorage.replayer.egg-info/PKG-INFO +++ b/swh.objstorage.replayer.egg-info/PKG-INFO @@ -1,26 +1,26 @@ Metadata-Version: 2.1 Name: swh.objstorage.replayer -Version: 0.2.0 +Version: 0.2.1 Summary: Software Heritage content replayer Home-page: https://forge.softwareheritage.org/diffusion/swh-objstorage-replayer Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-objstorage-replayer Description: swh-objstorage-replayer ======================= Simple command line tool to replicate content objects from a source Object storage to a destination one by listening the `content` topic of a `swh.journal` kafka stream. Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 3 - Alpha Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.objstorage.replayer.egg-info/SOURCES.txt b/swh.objstorage.replayer.egg-info/SOURCES.txt index 0042fe5..cd46664 100644 --- a/swh.objstorage.replayer.egg-info/SOURCES.txt +++ b/swh.objstorage.replayer.egg-info/SOURCES.txt @@ -1,45 +1,39 @@ .gitignore .pre-commit-config.yaml AUTHORS CODE_OF_CONDUCT.md CONTRIBUTORS LICENSE MANIFEST.in Makefile README.md mypy.ini pyproject.toml pytest.ini requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py tox.ini -debian/changelog -debian/compat -debian/control -debian/copyright -debian/rules -debian/source/format docs/.gitignore docs/Makefile docs/conf.py docs/index.rst docs/_static/.placeholder docs/_templates/.placeholder swh/__init__.py swh.objstorage.replayer.egg-info/PKG-INFO swh.objstorage.replayer.egg-info/SOURCES.txt swh.objstorage.replayer.egg-info/dependency_links.txt swh.objstorage.replayer.egg-info/entry_points.txt swh.objstorage.replayer.egg-info/requires.txt swh.objstorage.replayer.egg-info/top_level.txt swh/objstorage/__init__.py swh/objstorage/replayer/__init__.py swh/objstorage/replayer/cli.py swh/objstorage/replayer/py.typed swh/objstorage/replayer/replay.py swh/objstorage/replayer/tests/__init__.py swh/objstorage/replayer/tests/test_cli.py swh/objstorage/replayer/tests/test_replay.py \ No newline at end of file diff --git a/swh.objstorage.replayer.egg-info/requires.txt b/swh.objstorage.replayer.egg-info/requires.txt index 6d71c31..b4bdd90 100644 --- a/swh.objstorage.replayer.egg-info/requires.txt +++ b/swh.objstorage.replayer.egg-info/requires.txt @@ -1,6 +1,6 @@ swh.core[http]>=0.3 -swh.objstorage>=0.2.1 +swh.objstorage>=0.2.2 swh.journal>=0.4.2 [testing] pytest diff --git a/swh/objstorage/replayer/replay.py b/swh/objstorage/replayer/replay.py index 9173629..f22d550 100644 --- a/swh/objstorage/replayer/replay.py +++ b/swh/objstorage/replayer/replay.py @@ -1,298 +1,298 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from time import time from typing import Callable, Dict, List, Optional from sentry_sdk import capture_exception, push_scope try: from systemd.daemon import notify except ImportError: notify = None from tenacity import ( retry, retry_if_exception_type, stop_after_attempt, wait_random_exponential, ) from swh.core.statsd import statsd from swh.model.hashutil import hash_to_hex from swh.model.model import SHA1_SIZE from swh.objstorage.objstorage import ID_HASH_ALGO, ObjNotFoundError, ObjStorage logger = logging.getLogger(__name__) CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total" CONTENT_RETRY_METRIC = "swh_content_replayer_retries_total" CONTENT_BYTES_METRIC = "swh_content_replayer_bytes" CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds" def is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=SHA1_SIZE): """ Checks if the given hash is in the provided `array`. The array must be a *sorted* list of sha1 hashes, and contain `nb_hashes` hashes (so its size must by `nb_hashes*hash_size` bytes). Args: hash_ (bytes): the hash to look for array (bytes): a sorted concatenated array of hashes (may be of any type supporting slice indexing, eg. :class:`mmap.mmap`) nb_hashes (int): number of hashes in the array hash_size (int): size of a hash (defaults to 20, for SHA1) Example: >>> import os >>> hash1 = os.urandom(20) >>> hash2 = os.urandom(20) >>> hash3 = os.urandom(20) >>> array = b''.join(sorted([hash1, hash2])) >>> is_hash_in_bytearray(hash1, array, 2) True >>> is_hash_in_bytearray(hash2, array, 2) True >>> is_hash_in_bytearray(hash3, array, 2) False """ if len(hash_) != hash_size: raise ValueError("hash_ does not match the provided hash_size.") def get_hash(position): return array[position * hash_size : (position + 1) * hash_size] # Regular dichotomy: left = 0 right = nb_hashes while left < right - 1: middle = int((right + left) / 2) pivot = get_hash(middle) if pivot == hash_: return True elif pivot < hash_: left = middle else: right = middle return get_hash(left) == hash_ class ReplayError(Exception): """An error occurred during the replay of an object""" def __init__(self, operation, *, obj_id, exc): self.operation = operation self.obj_id = hash_to_hex(obj_id) self.exc = exc def __str__(self): return "ReplayError(doing %s, %s, %s)" % (self.operation, self.obj_id, self.exc) def log_replay_retry(retry_obj, sleep, last_result): """Log a retry of the content replayer""" exc = last_result.exception() logger.debug( "Retry operation %(operation)s on %(obj_id)s: %(exc)s", {"operation": exc.operation, "obj_id": exc.obj_id, "exc": str(exc.exc)}, ) statsd.increment( CONTENT_RETRY_METRIC, tags={ "operation": exc.operation, "attempt": str(retry_obj.statistics["attempt_number"]), }, ) def log_replay_error(last_attempt): """Log a replay error to sentry""" exc = last_attempt.exception() with push_scope() as scope: scope.set_tag("operation", exc.operation) scope.set_extra("obj_id", exc.obj_id) capture_exception(exc.exc) logger.error( "Failed operation %(operation)s on %(obj_id)s after %(retries)s" " retries: %(exc)s", { "obj_id": exc.obj_id, "operation": exc.operation, "exc": str(exc.exc), "retries": last_attempt.attempt_number, }, ) return None CONTENT_REPLAY_RETRIES = 3 content_replay_retry = retry( retry=retry_if_exception_type(ReplayError), stop=stop_after_attempt(CONTENT_REPLAY_RETRIES), wait=wait_random_exponential(multiplier=1, max=60), before_sleep=log_replay_retry, retry_error_callback=log_replay_error, ) @content_replay_retry def copy_object(obj_id, src, dst): hex_obj_id = hash_to_hex(obj_id) obj = "" try: with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "get"}): obj = src.get(obj_id) logger.debug("retrieved %(obj_id)s", {"obj_id": hex_obj_id}) with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "put"}): dst.add(obj, obj_id=obj_id, check_presence=False) logger.debug("copied %(obj_id)s", {"obj_id": hex_obj_id}) statsd.increment(CONTENT_BYTES_METRIC, len(obj)) except ObjNotFoundError: logger.error( "Failed to copy %(obj_id)s: object not found", {"obj_id": hex_obj_id} ) raise except Exception as exc: raise ReplayError("copy", obj_id=obj_id, exc=exc) from None return len(obj) @content_replay_retry def obj_in_objstorage(obj_id, dst): """Check if an object is already in an objstorage, tenaciously""" try: return obj_id in dst except Exception as exc: raise ReplayError("in_dst", obj_id=obj_id, exc=exc) from None def process_replay_objects_content( all_objects: Dict[str, List[dict]], *, src: ObjStorage, dst: ObjStorage, exclude_fn: Optional[Callable[[dict], bool]] = None, check_dst: bool = True, ): """ Takes a list of records from Kafka (see :py:func:`swh.journal.client.JournalClient.process`) and copies them from the `src` objstorage to the `dst` objstorage, if: * `obj['status']` is `'visible'` * `exclude_fn(obj)` is `False` (if `exclude_fn` is provided) * `obj['sha1'] not in dst` (if `check_dst` is True) Args: all_objects: Objects passed by the Kafka client. Most importantly, `all_objects['content'][*]['sha1']` is the sha1 hash of each content. src: An object storage (see :py:func:`swh.objstorage.get_objstorage`) dst: An object storage (see :py:func:`swh.objstorage.get_objstorage`) exclude_fn: Determines whether an object should be copied. check_dst: Determines whether we should check the destination objstorage before copying. Example: >>> from swh.objstorage.factory import get_objstorage - >>> src = get_objstorage('memory', {}) - >>> dst = get_objstorage('memory', {}) + >>> src = get_objstorage('memory') + >>> dst = get_objstorage('memory') >>> id1 = src.add(b'foo bar') >>> id2 = src.add(b'baz qux') >>> kafka_partitions = { ... 'content': [ ... { ... 'sha1': id1, ... 'status': 'visible', ... }, ... { ... 'sha1': id2, ... 'status': 'visible', ... }, ... ] ... } >>> process_replay_objects_content( ... kafka_partitions, src=src, dst=dst, ... exclude_fn=lambda obj: obj['sha1'] == id1) >>> id1 in dst False >>> id2 in dst True """ vol = [] nb_skipped = 0 nb_failures = 0 t0 = time() for (object_type, objects) in all_objects.items(): if object_type != "content": logger.warning( "Received a series of %s, this should not happen", object_type ) continue for obj in objects: obj_id = obj[ID_HASH_ALGO] if obj["status"] != "visible": nb_skipped += 1 logger.debug( "skipped %s (status=%s)", hash_to_hex(obj_id), obj["status"] ) statsd.increment( CONTENT_OPERATIONS_METRIC, tags={"decision": "skipped", "status": obj["status"]}, ) elif exclude_fn and exclude_fn(obj): nb_skipped += 1 logger.debug("skipped %s (manually excluded)", hash_to_hex(obj_id)) statsd.increment( CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"} ) elif check_dst and obj_in_objstorage(obj_id, dst): nb_skipped += 1 logger.debug("skipped %s (in dst)", hash_to_hex(obj_id)) statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "in_dst"}) else: try: copied = copy_object(obj_id, src, dst) except ObjNotFoundError: nb_skipped += 1 statsd.increment( CONTENT_OPERATIONS_METRIC, tags={"decision": "not_in_src"} ) else: if copied is None: nb_failures += 1 statsd.increment( CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"} ) else: vol.append(copied) statsd.increment( CONTENT_OPERATIONS_METRIC, tags={"decision": "copied"} ) dt = time() - t0 logger.info( "processed %s content objects in %.1fsec " "(%.1f obj/sec, %.1fMB/sec) - %d failed - %d skipped", len(vol), dt, len(vol) / dt, sum(vol) / 1024 / 1024 / dt, nb_failures, nb_skipped, ) if notify: notify("WATCHDOG=1") diff --git a/swh/objstorage/replayer/tests/test_replay.py b/swh/objstorage/replayer/tests/test_replay.py index ffa3635..e292295 100644 --- a/swh/objstorage/replayer/tests/test_replay.py +++ b/swh/objstorage/replayer/tests/test_replay.py @@ -1,71 +1,71 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools from hypothesis import given, settings from hypothesis.strategies import sets from swh.journal.client import JournalClient from swh.journal.writer.kafka import KafkaJournalWriter from swh.model.hypothesis_strategies import sha1 from swh.model.model import Content from swh.objstorage.factory import get_objstorage from swh.objstorage.replayer.replay import ( is_hash_in_bytearray, process_replay_objects_content, ) CONTENTS = [Content.from_data(f"foo{i}".encode()) for i in range(10)] + [ Content.from_data(f"forbidden foo{i}".encode(), status="hidden") for i in range(10) ] @settings(max_examples=500) @given( sets(sha1(), min_size=0, max_size=500), sets(sha1(), min_size=10), ) def test_is_hash_in_bytearray(haystack, needles): array = b"".join(sorted(haystack)) needles |= haystack # Exhaustively test for all objects in the array for needle in needles: assert is_hash_in_bytearray(needle, array, len(haystack)) == ( needle in haystack ) def test_replay_content(kafka_server, kafka_prefix, kafka_consumer_group): - objstorage1 = get_objstorage(cls="memory", args={}) - objstorage2 = get_objstorage(cls="memory", args={}) + objstorage1 = get_objstorage(cls="memory") + objstorage2 = get_objstorage(cls="memory") writer = KafkaJournalWriter( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, anonymize=False, ) for content in CONTENTS: objstorage1.add(content.data) writer.write_addition("content", content) replayer = JournalClient( brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, # stop_after_objects=len(objects), ) worker_fn = functools.partial( process_replay_objects_content, src=objstorage1, dst=objstorage2 ) replayer.process(worker_fn) # only content with status visible will be copied in storage2 expected_objstorage_state = { c.sha1: c.data for c in CONTENTS if c.status == "visible" } assert expected_objstorage_state == objstorage2.state diff --git a/tox.ini b/tox.ini index ab7627b..49379b7 100644 --- a/tox.ini +++ b/tox.ini @@ -1,35 +1,35 @@ [tox] envlist=black,flake8,mypy,py3 [testenv] extras = testing deps = pytest-cov commands = pytest --doctest-modules \ {envsitepackagesdir}/swh/objstorage/replayer \ --cov={envsitepackagesdir}/swh/objstorage/replayer \ --cov-branch {posargs} [testenv:black] skip_install = true deps = - black + black==19.10b0 commands = {envpython} -m black --check swh [testenv:flake8] skip_install = true deps = flake8 commands = {envpython} -m flake8 [testenv:mypy] extras = testing deps = mypy commands = mypy swh