diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1c95e3d..f972cd9 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,40 +1,40 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.1.0 + rev: v4.3.0 hooks: - id: trailing-whitespace - id: check-json - id: check-yaml - - repo: https://gitlab.com/pycqa/flake8 - rev: 4.0.1 + - repo: https://github.com/pycqa/flake8 + rev: 5.0.4 hooks: - id: flake8 - additional_dependencies: [flake8-bugbear==22.3.23] + additional_dependencies: [flake8-bugbear==22.9.23] - repo: https://github.com/codespell-project/codespell - rev: v2.1.0 + rev: v2.2.2 hooks: - id: codespell name: Check source code spelling stages: [commit] - repo: local hooks: - id: mypy name: mypy entry: mypy args: [swh] pass_filenames: false language: system types: [python] - repo: https://github.com/PyCQA/isort rev: 5.10.1 hooks: - id: isort - repo: https://github.com/python/black - rev: 22.3.0 + rev: 22.10.0 hooks: - id: black diff --git a/PKG-INFO b/PKG-INFO index 4ca8a9a..e322c90 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,26 +1,26 @@ Metadata-Version: 2.1 Name: swh.objstorage.replayer -Version: 1.1.0 +Version: 1.2.0 Summary: Software Heritage content replayer Home-page: https://forge.softwareheritage.org/diffusion/swh-objstorage-replayer Author: Software Heritage developers Author-email: swh-devel@inria.fr Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-objstorage-replayer Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 3 - Alpha Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing License-File: LICENSE License-File: AUTHORS swh-objstorage-replayer ======================= Simple command line tool to replicate content objects from a source Object storage to a destination one by listening the `content` topic of a `swh.journal` kafka stream. diff --git a/requirements.txt b/requirements.txt index a9a9521..8d5660f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,6 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html +humanize redis +tenacity diff --git a/swh.objstorage.replayer.egg-info/PKG-INFO b/swh.objstorage.replayer.egg-info/PKG-INFO index 4ca8a9a..e322c90 100644 --- a/swh.objstorage.replayer.egg-info/PKG-INFO +++ b/swh.objstorage.replayer.egg-info/PKG-INFO @@ -1,26 +1,26 @@ Metadata-Version: 2.1 Name: swh.objstorage.replayer -Version: 1.1.0 +Version: 1.2.0 Summary: Software Heritage content replayer Home-page: https://forge.softwareheritage.org/diffusion/swh-objstorage-replayer Author: Software Heritage developers Author-email: swh-devel@inria.fr Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-objstorage-replayer Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 3 - Alpha Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing License-File: LICENSE License-File: AUTHORS swh-objstorage-replayer ======================= Simple command line tool to replicate content objects from a source Object storage to a destination one by listening the `content` topic of a `swh.journal` kafka stream. diff --git a/swh.objstorage.replayer.egg-info/SOURCES.txt b/swh.objstorage.replayer.egg-info/SOURCES.txt index 5a1780c..1bb1393 100644 --- a/swh.objstorage.replayer.egg-info/SOURCES.txt +++ b/swh.objstorage.replayer.egg-info/SOURCES.txt @@ -1,42 +1,43 @@ .git-blame-ignore-revs .gitignore .pre-commit-config.yaml AUTHORS CODE_OF_CONDUCT.md CONTRIBUTORS LICENSE MANIFEST.in Makefile README.md mypy.ini pyproject.toml pytest.ini requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py tox.ini docs/.gitignore docs/Makefile docs/cli.rst docs/conf.py docs/index.rst docs/_static/.placeholder docs/_templates/.placeholder swh/__init__.py swh.objstorage.replayer.egg-info/PKG-INFO swh.objstorage.replayer.egg-info/SOURCES.txt swh.objstorage.replayer.egg-info/dependency_links.txt swh.objstorage.replayer.egg-info/entry_points.txt swh.objstorage.replayer.egg-info/requires.txt swh.objstorage.replayer.egg-info/top_level.txt swh/objstorage/__init__.py swh/objstorage/replayer/__init__.py swh/objstorage/replayer/cli.py swh/objstorage/replayer/py.typed swh/objstorage/replayer/replay.py swh/objstorage/replayer/tests/__init__.py swh/objstorage/replayer/tests/test_cli.py swh/objstorage/replayer/tests/test_replay.py +swh/objstorage/replayer/tests/test_replay_errors.py swh/objstorage/replayer/tests/test_statsd.py \ No newline at end of file diff --git a/swh.objstorage.replayer.egg-info/requires.txt b/swh.objstorage.replayer.egg-info/requires.txt index 31aa127..7463aa3 100644 --- a/swh.objstorage.replayer.egg-info/requires.txt +++ b/swh.objstorage.replayer.egg-info/requires.txt @@ -1,10 +1,12 @@ +humanize redis +tenacity swh.core[http]>=0.3 swh.objstorage>=0.2.2 swh.journal>=0.4.2 [testing] pytest pytest-redis types-pyyaml types-redis diff --git a/swh/objstorage/replayer/replay.py b/swh/objstorage/replayer/replay.py index 786b717..609cc30 100644 --- a/swh/objstorage/replayer/replay.py +++ b/swh/objstorage/replayer/replay.py @@ -1,348 +1,386 @@ -# Copyright (C) 2019-2020 The Software Heritage developers +# Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor from concurrent.futures import wait as futures_wait import logging from time import time +from traceback import format_tb from typing import Callable, Dict, List, Optional +from humanize import naturaldelta, naturalsize import msgpack from sentry_sdk import capture_exception, push_scope try: from systemd.daemon import notify except ImportError: notify = None from tenacity import ( retry, retry_if_exception_type, stop_after_attempt, wait_random_exponential, ) from tenacity.retry import retry_base from swh.core.statsd import statsd from swh.model.hashutil import hash_to_hex from swh.model.model import SHA1_SIZE -from swh.objstorage.objstorage import ID_HASH_ALGO, ObjNotFoundError, ObjStorage +from swh.objstorage.constants import ID_HASH_ALGO +from swh.objstorage.exc import ObjNotFoundError +from swh.objstorage.objstorage import ObjStorage logger = logging.getLogger(__name__) REPORTER = None CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total" CONTENT_RETRY_METRIC = "swh_content_replayer_retries_total" CONTENT_BYTES_METRIC = "swh_content_replayer_bytes" CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds" def is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=SHA1_SIZE): """ Checks if the given hash is in the provided `array`. The array must be a *sorted* list of sha1 hashes, and contain `nb_hashes` hashes (so its size must by `nb_hashes*hash_size` bytes). Args: hash_ (bytes): the hash to look for array (bytes): a sorted concatenated array of hashes (may be of any type supporting slice indexing, eg. :class:`mmap.mmap`) nb_hashes (int): number of hashes in the array hash_size (int): size of a hash (defaults to 20, for SHA1) Example: >>> import os >>> hash1 = os.urandom(20) >>> hash2 = os.urandom(20) >>> hash3 = os.urandom(20) >>> array = b''.join(sorted([hash1, hash2])) >>> is_hash_in_bytearray(hash1, array, 2) True >>> is_hash_in_bytearray(hash2, array, 2) True >>> is_hash_in_bytearray(hash3, array, 2) False """ if len(hash_) != hash_size: raise ValueError("hash_ does not match the provided hash_size.") def get_hash(position): return array[position * hash_size : (position + 1) * hash_size] # Regular dichotomy: left = 0 right = nb_hashes while left < right - 1: middle = int((right + left) / 2) pivot = get_hash(middle) if pivot == hash_: return True elif pivot < hash_: left = middle else: right = middle return get_hash(left) == hash_ class ReplayError(Exception): """An error occurred during the replay of an object""" def __init__(self, *, obj_id, exc): self.obj_id = hash_to_hex(obj_id) self.exc = exc def __str__(self): - return "ReplayError(%s, %s)" % (self.obj_id, self.exc) + return "ReplayError(%s, %r, %s)" % ( + self.obj_id, + self.exc, + format_tb(self.exc.__traceback__), + ) def log_replay_retry(retry_state, sleep=None, last_result=None): """Log a retry of the content replayer""" exc = retry_state.outcome.exception() operation = retry_state.fn.__name__ logger.debug( "Retry operation %(operation)s on %(obj_id)s: %(exc)s", {"operation": operation, "obj_id": exc.obj_id, "exc": str(exc.exc)}, ) def log_replay_error(retry_state): """Log a replay error to sentry""" exc = retry_state.outcome.exception() with push_scope() as scope: scope.set_tag("operation", retry_state.fn.__name__) scope.set_extra("obj_id", exc.obj_id) capture_exception(exc.exc) error_context = { "obj_id": exc.obj_id, "operation": retry_state.fn.__name__, - "exc": str(exc.exc), + "exc": str(exc), "retries": retry_state.attempt_number, } logger.error( "Failed operation %(operation)s on %(obj_id)s after %(retries)s" - " retries: %(exc)s", + " retries; last exception: %(exc)s", error_context, ) # if we have a global error (redis) reporter if REPORTER is not None: oid = f"blob:{exc.obj_id}" msg = msgpack.dumps(error_context) REPORTER(oid, msg) - return None + raise exc CONTENT_REPLAY_RETRIES = 3 class retry_log_if_success(retry_base): """Log in statsd the number of attempts required to succeed""" def __call__(self, retry_state): if not retry_state.outcome.failed: statsd.increment( CONTENT_RETRY_METRIC, tags={ "operation": retry_state.fn.__name__, "attempt": str(retry_state.attempt_number), }, ) return False content_replay_retry = retry( retry=retry_if_exception_type(ReplayError) | retry_log_if_success(), stop=stop_after_attempt(CONTENT_REPLAY_RETRIES), wait=wait_random_exponential(multiplier=1, max=60), before_sleep=log_replay_retry, retry_error_callback=log_replay_error, ) @content_replay_retry def get_object(objstorage, obj_id): try: with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "get"}): obj = objstorage.get(obj_id) logger.debug("retrieved %(obj_id)s", {"obj_id": hash_to_hex(obj_id)}) return obj except ObjNotFoundError: logger.error( "Failed to retrieve %(obj_id)s: object not found", {"obj_id": hash_to_hex(obj_id)}, ) raise except Exception as exc: raise ReplayError(obj_id=obj_id, exc=exc) from None @content_replay_retry def put_object(objstorage, obj_id, obj): try: + logger.debug("putting %(obj_id)s", {"obj_id": hash_to_hex(obj_id)}) with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "put"}): + logger.debug("storing %(obj_id)s", {"obj_id": hash_to_hex(obj_id)}) objstorage.add(obj, obj_id, check_presence=False) logger.debug("stored %(obj_id)s", {"obj_id": hash_to_hex(obj_id)}) except Exception as exc: + logger.error( + "putting %(obj_id)s failed: %(exc)r", + {"obj_id": hash_to_hex(obj_id), "exc": exc}, + ) raise ReplayError(obj_id=obj_id, exc=exc) from None def copy_object(obj_id, src, dst): obj = get_object(src, obj_id) if obj is not None: put_object(dst, obj_id, obj) statsd.increment(CONTENT_BYTES_METRIC, len(obj)) return len(obj) return 0 @content_replay_retry def obj_in_objstorage(obj_id, dst): """Check if an object is already in an objstorage, tenaciously""" try: return obj_id in dst except Exception as exc: raise ReplayError(obj_id=obj_id, exc=exc) from None def process_replay_objects_content( all_objects: Dict[str, List[dict]], *, src: ObjStorage, dst: ObjStorage, exclude_fn: Optional[Callable[[dict], bool]] = None, check_dst: bool = True, concurrency: int = 16, ): """ Takes a list of records from Kafka (see :py:func:`swh.journal.client.JournalClient.process`) and copies them from the `src` objstorage to the `dst` objstorage, if: * `obj['status']` is `'visible'` * `exclude_fn(obj)` is `False` (if `exclude_fn` is provided) * `obj['sha1'] not in dst` (if `check_dst` is True) Args: all_objects: Objects passed by the Kafka client. Most importantly, `all_objects['content'][*]['sha1']` is the sha1 hash of each content. src: An object storage (see :py:func:`swh.objstorage.get_objstorage`) dst: An object storage (see :py:func:`swh.objstorage.get_objstorage`) exclude_fn: Determines whether an object should be copied. check_dst: Determines whether we should check the destination objstorage before copying. Example: + >>> import hashlib >>> from swh.objstorage.factory import get_objstorage >>> src = get_objstorage('memory') >>> dst = get_objstorage('memory') - >>> id1 = src.add(b'foo bar') - >>> id2 = src.add(b'baz qux') + >>> cnt1 = b'foo bar' + >>> cnt2 = b'baz qux' + >>> id1 = hashlib.sha1(cnt1).digest() + >>> id2 = hashlib.sha1(cnt2).digest() + >>> src.add(b'foo bar', obj_id=id1) + >>> src.add(b'baz qux', obj_id=id2) >>> kafka_partitions = { ... 'content': [ ... { ... 'sha1': id1, ... 'status': 'visible', ... }, ... { ... 'sha1': id2, ... 'status': 'visible', ... }, ... ] ... } >>> process_replay_objects_content( ... kafka_partitions, src=src, dst=dst, ... exclude_fn=lambda obj: obj['sha1'] == id1) >>> id1 in dst False >>> id2 in dst True """ - vol = [] - nb_skipped = 0 - nb_failures = 0 - t0 = time() def _copy_object(obj): - nonlocal nb_skipped - nonlocal nb_failures - obj_id = obj[ID_HASH_ALGO] + logger.debug("Starting copy object %s", hash_to_hex(obj_id)) if obj["status"] != "visible": - nb_skipped += 1 logger.debug("skipped %s (status=%s)", hash_to_hex(obj_id), obj["status"]) statsd.increment( CONTENT_OPERATIONS_METRIC, tags={"decision": "skipped", "status": obj["status"]}, ) + return "skipped", None elif exclude_fn and exclude_fn(obj): - nb_skipped += 1 logger.debug("skipped %s (manually excluded)", hash_to_hex(obj_id)) statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"}) + return "excluded", None elif check_dst and obj_in_objstorage(obj_id, dst): - nb_skipped += 1 logger.debug("skipped %s (in dst)", hash_to_hex(obj_id)) statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "in_dst"}) + return "in_dst", None else: try: - copied = copy_object(obj_id, src, dst) + copied_bytes = copy_object(obj_id, src, dst) except ObjNotFoundError: - nb_skipped += 1 + logger.debug("not found %s", hash_to_hex(obj_id)) statsd.increment( CONTENT_OPERATIONS_METRIC, tags={"decision": "not_in_src"} ) + return "not_found", None + except Exception as exc: + logger.info("failed %s (%r)", hash_to_hex(obj_id), exc) + statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"}) + return "failed", None else: - if copied is None: - nb_failures += 1 + if copied_bytes is None: + logger.debug("failed %s (None)", hash_to_hex(obj_id)) statsd.increment( CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"} ) + return "failed", None else: - vol.append(copied) + logger.debug("copied %s (%d)", hash_to_hex(obj_id), copied_bytes) statsd.increment( CONTENT_OPERATIONS_METRIC, tags={"decision": "copied"} ) + return "copied", copied_bytes + logger.debug("failed %s (XXX)", hash_to_hex(obj_id)) + return "failed", None + + vol = 0 + stats = dict.fromkeys( + ["skipped", "excluded", "not_found", "failed", "copied", "in_dst"], 0 + ) + t0 = time() with ThreadPoolExecutor(max_workers=concurrency) as pool: futures = [] for (object_type, objects) in all_objects.items(): if object_type != "content": logger.warning( "Received a series of %s, this should not happen", object_type ) continue for obj in objects: futures.append(pool.submit(_copy_object, obj=obj)) - - futures_wait(futures, return_when=FIRST_EXCEPTION) + logger.debug("Waiting for futures (%d)", len(futures)) + futures_wait(futures) + logger.debug("Checking futures results") for f in futures: - if f.running(): - continue exc = f.exception() if exc: - pool.shutdown(wait=False) - f.result() + # XXX this should not happen, so it is probably wrong... raise exc + else: + state, nbytes = f.result() + if nbytes is not None: + vol += nbytes + stats[state] += 1 dt = time() - t0 logger.info( - "processed %s content objects in %.1fsec " - "(%.1f obj/sec, %.1fMB/sec) - %d failed - %d skipped", - len(vol), - dt, - len(vol) / dt, - sum(vol) / 1024 / 1024 / dt, - nb_failures, - nb_skipped, + "processed %s content objects (%s) in %s " + "(%.1f obj/sec, %s/sec) " + "- %d copied - %d in dst - %d skipped " + "- %d excluded - %d not found - %d failed", + len(futures), + naturalsize(vol), + naturaldelta(dt), + len(futures) / dt, + naturalsize(vol / dt), + stats["copied"], + stats["in_dst"], + stats["skipped"], + stats["excluded"], + stats["not_found"], + stats["failed"], ) if notify: notify("WATCHDOG=1") diff --git a/swh/objstorage/replayer/tests/test_cli.py b/swh/objstorage/replayer/tests/test_cli.py index d5be3a8..60705da 100644 --- a/swh/objstorage/replayer/tests/test_cli.py +++ b/swh/objstorage/replayer/tests/test_cli.py @@ -1,697 +1,707 @@ # Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter import copy import functools import logging import re from subprocess import Popen import tempfile -from typing import Tuple +from typing import Any, Dict, Optional, Tuple from unittest.mock import patch from click.testing import CliRunner from confluent_kafka import Producer import msgpack import pytest import yaml from swh.journal.serializers import key_to_kafka from swh.model.hashutil import MultiHash, hash_to_hex from swh.objstorage.backends.in_memory import InMemoryObjStorage from swh.objstorage.replayer.cli import objstorage_cli_group from swh.objstorage.replayer.replay import CONTENT_REPLAY_RETRIES logger = logging.getLogger(__name__) CLI_CONFIG = { "objstorage": { "cls": "mocked", "name": "src", }, "objstorage_dst": { "cls": "mocked", "name": "dst", }, } @pytest.fixture def monkeypatch_retry_sleep(monkeypatch): from swh.objstorage.replayer.replay import get_object, obj_in_objstorage, put_object monkeypatch.setattr(get_object.retry, "sleep", lambda x: None) monkeypatch.setattr(put_object.retry, "sleep", lambda x: None) monkeypatch.setattr(obj_in_objstorage.retry, "sleep", lambda x: None) def _patch_objstorages(names): objstorages = {name: InMemoryObjStorage() for name in names} def get_mock_objstorage(cls, **args): assert cls == "mocked", cls return objstorages[args["name"]] def decorator(f): @functools.wraps(f) @patch("swh.objstorage.factory.get_objstorage") def newf(get_objstorage_mock, *args, **kwargs): get_objstorage_mock.side_effect = get_mock_objstorage f(*args, objstorages=objstorages, **kwargs) return newf return decorator def invoke(*args, env=None, **kwargs): config = copy.deepcopy(CLI_CONFIG) config.update(kwargs) runner = CliRunner() with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: yaml.dump(config, config_fd) config_fd.seek(0) args = ["-C" + config_fd.name] + list(args) return runner.invoke( objstorage_cli_group, args, obj={"log_level": logging.DEBUG}, env=env, ) def test_replay_help(): result = invoke( "replay", "--help", ) expected = ( r"^\s*Usage: objstorage replay \[OPTIONS\]\s+" r"Fill a destination Object Storage.*" ) assert result.exit_code == 0, result.output assert re.match(expected, result.output, re.MULTILINE), result.output NUM_CONTENTS = 10 def _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorage): producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test-producer", "acks": "all", } ) contents = {} for i in range(NUM_CONTENTS): content = b"\x00" * 19 + bytes([i]) sha1 = MultiHash(["sha1"]).from_data(content).digest()["sha1"] objstorage.add(content=content, obj_id=sha1) contents[sha1] = content producer.produce( topic=kafka_prefix + ".content", key=key_to_kafka(sha1), value=key_to_kafka( { "sha1": sha1, "status": "visible", } ), ) producer.flush() return contents @_patch_objstorages(["src", "dst"]) def test_replay_content( objstorages, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], ): """Check the content replayer in normal conditions""" contents = _fill_objstorage_and_kafka( kafka_server, kafka_prefix, objstorages["src"] ) result = invoke( "replay", "--stop-after-objects", str(NUM_CONTENTS), journal_client={ "cls": "kafka", "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content @_patch_objstorages(["src", "dst"]) def test_replay_content_structured_log( objstorages, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], caplog, ): """Check the logs produced by the content replayer in normal conditions""" contents = _fill_objstorage_and_kafka( kafka_server, kafka_prefix, objstorages["src"] ) caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") expected_obj_ids = set(hash_to_hex(sha1) for sha1 in contents) result = invoke( "replay", "--stop-after-objects", str(NUM_CONTENTS), journal_client={ "cls": "kafka", "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = set() for record in caplog.records: logtext = record.getMessage() if "stored" in logtext: copied.add(record.args["obj_id"]) assert ( copied == expected_obj_ids ), "Mismatched logging; see captured log output for details." @_patch_objstorages(["src", "dst"]) def test_replay_content_static_group_id( objstorages, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], caplog, ): """Check the content replayer in normal conditions with KAFKA_GROUP_INSTANCE_ID set """ contents = _fill_objstorage_and_kafka( kafka_server, kafka_prefix, objstorages["src"] ) # Setup log capture to fish the consumer settings out of the log messages caplog.set_level(logging.DEBUG, "swh.journal.client") result = invoke( "replay", "--stop-after-objects", str(NUM_CONTENTS), env={"KAFKA_GROUP_INSTANCE_ID": "static-group-instance-id"}, journal_client={ "cls": "kafka", "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output - consumer_settings = None + consumer_settings: Optional[Dict[str, Any]] = None for record in caplog.records: if "Consumer settings" in record.message: - consumer_settings = record.args - break + consumer_settings = {} + elif consumer_settings is not None and len(record.args) == 2: + key, value = record.args + consumer_settings[key] = value assert consumer_settings is not None, ( "Failed to get consumer settings out of the consumer log. " "See log capture for details." ) + assert consumer_settings["group.instance.id"] == "static-group-instance-id" assert consumer_settings["session.timeout.ms"] == 60 * 10 * 1000 assert consumer_settings["max.poll.interval.ms"] == 90 * 10 * 1000 for (sha1, content) in contents.items(): assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content @_patch_objstorages(["src", "dst"]) def test_replay_content_exclude( objstorages, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], ): """Check the content replayer in normal conditions with a exclusion file (--exclude-sha1-file) """ contents = _fill_objstorage_and_kafka( kafka_server, kafka_prefix, objstorages["src"] ) excluded_contents = list(contents)[0::2] # picking half of them with tempfile.NamedTemporaryFile(mode="w+b") as fd: fd.write(b"".join(sorted(excluded_contents))) fd.seek(0) result = invoke( "replay", "--stop-after-objects", str(NUM_CONTENTS), "--exclude-sha1-file", fd.name, journal_client={ "cls": "kafka", "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): if sha1 in excluded_contents: assert sha1 not in objstorages["dst"], sha1 else: assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content NUM_CONTENTS_DST = 5 @_patch_objstorages(["src", "dst"]) @pytest.mark.parametrize( "check_dst,expected_copied,expected_in_dst", [ (True, NUM_CONTENTS - NUM_CONTENTS_DST, NUM_CONTENTS_DST), (False, NUM_CONTENTS, 0), ], ) def test_replay_content_check_dst( objstorages, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], check_dst: bool, expected_copied: int, expected_in_dst: int, caplog, ): """Check the content replayer in normal conditions with some objects already in the dst objstorage. When check_dst is True, expect those not to be neither retrieved from the src objstorage nor pushed in the dst objstorage. """ contents = _fill_objstorage_and_kafka( kafka_server, kafka_prefix, objstorages["src"] ) # add some objects in the dst objstorage for i, (sha1, content) in enumerate(contents.items()): if i >= NUM_CONTENTS_DST: break objstorages["dst"].add(content, obj_id=sha1) caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") result = invoke( "replay", "--stop-after-objects", str(NUM_CONTENTS), "--check-dst" if check_dst else "--no-check-dst", journal_client={ "cls": "kafka", "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output - retrieved = 0 - stored = 0 - in_dst = 0 + stats = dict.fromkeys( + ["tot", "copied", "in_dst", "skipped", "excluded", "not_found", "failed"], 0 + ) + reg = re.compile( + r"processed (?P\d+) content objects .*" + r" *- (?P\d+) copied" + r" *- (?P\d+) in dst" + r" *- (?P\d+) skipped" + r" *- (?P\d+) excluded" + r" *- (?P\d+) not found" + r" *- (?P\d+) failed" + ) for record in caplog.records: logtext = record.getMessage() - if "retrieved" in logtext: - retrieved += 1 - elif "stored" in logtext: - stored += 1 - elif "in dst" in logtext: - in_dst += 1 + m = reg.match(logtext) + if m: + for k, v in m.groupdict().items(): + stats[k] += int(v) + + assert stats["tot"] == sum(v for k, v in stats.items() if k != "tot") assert ( - retrieved == expected_copied - and stored == expected_copied - and in_dst == expected_in_dst + stats["copied"] == expected_copied and stats["in_dst"] == expected_in_dst ), "Unexpected amount of objects copied, see the captured log for details" for (sha1, content) in contents.items(): assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content class FlakyObjStorage(InMemoryObjStorage): """Flaky objstorage Any 'get', 'add' or 'in' (i.e. '__contains__()') operation will fail according to configured 'failures'. 'failures' is expected to be a dict which keys are couples (operation, obj_id) and values are the number of time the operation 'operation' is expected to fail for object 'obj_id' before being performed successfully. An optional state ('state') can be also given as argument (see InMemoryObjStorage). """ def __init__(self, *args, **kwargs): state = kwargs.pop("state") self.failures_left = Counter(kwargs.pop("failures")) super().__init__(*args, **kwargs) if state: self.state = state def flaky_operation(self, op, obj_id): if self.failures_left[op, obj_id] > 0: self.failures_left[op, obj_id] -= 1 raise RuntimeError("Failed %s on %s" % (op, hash_to_hex(obj_id))) def get(self, obj_id): self.flaky_operation("get", obj_id) return super().get(obj_id) def add(self, data, obj_id=None, check_presence=True): self.flaky_operation("add", obj_id) return super().add(data, obj_id=obj_id, check_presence=check_presence) def __contains__(self, obj_id): self.flaky_operation("in", obj_id) return super().__contains__(obj_id) @_patch_objstorages(["src", "dst"]) def test_replay_content_check_dst_retry( objstorages, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], monkeypatch_retry_sleep, caplog, redis_proc, redisdb, ): """Check the content replayer with a flaky dst objstorage for 'in' operations. """ contents = _fill_objstorage_and_kafka( kafka_server, kafka_prefix, objstorages["src"] ) # build a flaky dst objstorage in which the 'in' operation for the first # NUM_CONTENT_DST objects will fail once failures = {} for i, (sha1, content) in enumerate(contents.items()): if i >= NUM_CONTENTS_DST: break objstorages["dst"].add(content, obj_id=sha1) failures["in", sha1] = 1 orig_dst = objstorages["dst"] objstorages["dst"] = FlakyObjStorage(state=orig_dst.state, failures=failures) caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") result = invoke( "replay", "--check-dst", "--stop-after-objects", str(NUM_CONTENTS), journal_client={ "cls": "kafka", "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, replayer={ "error_reporter": {"host": redis_proc.host, "port": redis_proc.port}, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output # check that exactly NUM_CONTENTS_DST 'in' operations have failed once failed_in = 0 for record in caplog.records: logtext = record.getMessage() if "Retry operation obj_in_objstorage" in logtext: failed_in += 1 elif "Retry operation" in logtext: assert False, "No other failure expected than 'in' operations" assert failed_in == NUM_CONTENTS_DST # check nothing has been reported in redis assert not redisdb.keys() # in the end, the replay process should be OK for (sha1, content) in contents.items(): assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content @_patch_objstorages(["src", "dst"]) def test_replay_content_failed_copy_retry( objstorages, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], caplog, monkeypatch_retry_sleep, redis_proc, redisdb, ): """Check the content replayer with a flaky src and dst objstorages for 'get' and 'add' operations, and a few non-recoverable failures (some objects failed to be replayed). """ contents = _fill_objstorage_and_kafka( kafka_server, kafka_prefix, objstorages["src"] ) add_failures = {} get_failures = {} definitely_failed = set() # We want to generate operations failing 1 to CONTENT_REPLAY_RETRIES times. # We generate failures for 2 different operations, get and add. num_retry_contents = 2 * CONTENT_REPLAY_RETRIES assert ( num_retry_contents < NUM_CONTENTS ), "Need to generate more test contents to properly test retry behavior" for i, sha1 in enumerate(contents): if i >= num_retry_contents: break # This generates a number of failures, up to CONTENT_REPLAY_RETRIES num_failures = (i % CONTENT_REPLAY_RETRIES) + 1 # This generates failures of add for the first CONTENT_REPLAY_RETRIES # objects, then failures of get. if i < CONTENT_REPLAY_RETRIES: add_failures["add", sha1] = num_failures else: get_failures["get", sha1] = num_failures # Only contents that have CONTENT_REPLAY_RETRIES or more are # definitely failing if num_failures >= CONTENT_REPLAY_RETRIES: definitely_failed.add(hash_to_hex(sha1)) assert add_failures assert get_failures assert definitely_failed objstorages["dst"] = FlakyObjStorage( state=objstorages["dst"].state, failures=add_failures, ) objstorages["src"] = FlakyObjStorage( state=objstorages["src"].state, failures=get_failures, ) caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") result = invoke( "replay", "--stop-after-objects", str(NUM_CONTENTS), journal_client={ "cls": "kafka", "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, replayer={ "error_reporter": {"host": redis_proc.host, "port": redis_proc.port}, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output # check the logs looks as expected copied = 0 failed_put = set() failed_get = set() for record in caplog.records: logtext = record.getMessage() if "stored" in logtext: copied += 1 elif "Failed operation" in logtext: assert record.levelno == logging.ERROR assert record.args["retries"] == CONTENT_REPLAY_RETRIES assert record.args["operation"] in ("get_object", "put_object") if record.args["operation"] == "get_object": failed_get.add(record.args["obj_id"]) else: failed_put.add(record.args["obj_id"]) assert ( failed_put | failed_get == definitely_failed ), "Unexpected object copy failures; see captured log for details" # check failed objects are referenced in redis assert set(redisdb.keys()) == { f"blob:{objid}".encode() for objid in definitely_failed } # and have a consistent error report in redis for key in redisdb.keys(): report = msgpack.loads(redisdb[key]) assert report["operation"] in ("get_object", "put_object") if report["operation"] == "get_object": assert report["obj_id"] in failed_get else: assert report["obj_id"] in failed_put # check valid object are in the dst objstorage, but # failed objects are not. for (sha1, content) in contents.items(): if hash_to_hex(sha1) in definitely_failed: assert sha1 not in objstorages["dst"] continue assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content @_patch_objstorages(["src", "dst"]) def test_replay_content_objnotfound( objstorages, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], caplog, ): """Check the ContentNotFound is not considered a failure to retry""" contents = _fill_objstorage_and_kafka( kafka_server, kafka_prefix, objstorages["src"] ) # delete a few objects from the src objstorage num_contents_deleted = 5 contents_deleted = set() for i, sha1 in enumerate(contents): if i >= num_contents_deleted: break del objstorages["src"].state[sha1] contents_deleted.add(hash_to_hex(sha1)) caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") result = invoke( "replay", "--stop-after-objects", str(NUM_CONTENTS), journal_client={ "cls": "kafka", "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = 0 not_in_src = set() for record in caplog.records: logtext = record.getMessage() if "stored" in logtext: copied += 1 elif "object not found" in logtext: # Check that the object id can be recovered from logs assert record.levelno == logging.ERROR not_in_src.add(record.args["obj_id"]) elif "Retry operation" in logtext: assert False, "Not found objects should not be retried" assert ( copied == NUM_CONTENTS - num_contents_deleted ), "Unexpected number of contents copied" assert ( not_in_src == contents_deleted ), "Mismatch between deleted contents and not_in_src logs" for (sha1, content) in contents.items(): if sha1 not in objstorages["src"]: continue assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content diff --git a/swh/objstorage/replayer/tests/test_replay_errors.py b/swh/objstorage/replayer/tests/test_replay_errors.py new file mode 100644 index 0000000..cd0f553 --- /dev/null +++ b/swh/objstorage/replayer/tests/test_replay_errors.py @@ -0,0 +1,301 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from collections import defaultdict +import functools +from queue import Queue + +from swh.journal.client import JournalClient +from swh.journal.writer import get_journal_writer +from swh.model.model import Content +from swh.objstorage.exc import ObjNotFoundError +from swh.objstorage.factory import get_objstorage +from swh.objstorage.multiplexer.filter.filter import ObjStorageFilter +from swh.objstorage.replayer import replay +from swh.objstorage.replayer.replay import copy_object # needed for MonkeyPatch + +CONTENTS = [Content.from_data(f"foo{i}".encode()) for i in range(10)] + [ + Content.from_data(f"forbidden foo{i}".encode(), status="hidden") for i in range(10) +] + + +class FailingObjstorage(ObjStorageFilter): + def __init__(self, storage): + super().__init__(storage) + self.calls = defaultdict(lambda: 0) + self.rate = 3 + + def get(self, obj_id, *args, **kwargs): + self.calls[obj_id] += 1 + if (self.calls[obj_id] % self.rate) == 0: + return self.storage.get(obj_id, *args, **kwargs) + raise Exception("Nope") + + def add(self, content, obj_id, check_presence=True, *args, **kwargs): + self.calls[obj_id] += 1 + if (self.calls[obj_id] % self.rate) == 0: + return self.storage.add(content, obj_id, check_presence, *args, **kwargs) + raise Exception("Nope") + + +class NotFoundObjstorage(ObjStorageFilter): + def get(self, obj_id, *args, **kwargs): + raise ObjNotFoundError(obj_id) + + +def prepare_test(kafka_server, kafka_prefix, kafka_consumer_group): + src_objstorage = get_objstorage(cls="memory") + + writer = get_journal_writer( + cls="kafka", + brokers=[kafka_server], + client_id="kafka_writer", + prefix=kafka_prefix, + anonymize=False, + ) + + for content in CONTENTS: + src_objstorage.add(content.data, obj_id=content.sha1) + writer.write_addition("content", content) + + replayer = JournalClient( + brokers=kafka_server, + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_on_eof=True, + ) + + return replayer, src_objstorage + + +def copy_object_q(q): + """Wrap the original copy_object function to capture (thread-local) tenacity + stats and puch them in a queue suitable for checking in a test session""" + + def wrap(obj_id, src, dst): + try: + ret = copy_object(obj_id, src, dst) + return ret + finally: + q.put(("get", obj_id, replay.get_object.retry.statistics.copy())) + q.put(("put", obj_id, replay.put_object.retry.statistics.copy())) + + return wrap + + +def test_replay_content_with_transient_errors( + kafka_server, kafka_prefix, kafka_consumer_group, monkeypatch +): + replayer, src_objstorage = prepare_test( + kafka_server, kafka_prefix, kafka_consumer_group + ) + dst_objstorage = get_objstorage(cls="memory") + src_objstorage = FailingObjstorage(src_objstorage) + + q = Queue() + monkeypatch.setattr(replay, "copy_object", copy_object_q(q)) + + worker_fn = functools.partial( + replay.process_replay_objects_content, + src=src_objstorage, + dst=dst_objstorage, + ) + replayer.process(worker_fn) + + # only content with status visible will be copied in storage2 + expected_objstorage_state = { + c.sha1: c.data for c in CONTENTS if c.status == "visible" + } + assert expected_objstorage_state == dst_objstorage.state + + stats = [q.get_nowait() for i in range(q.qsize())] + for obj_id in expected_objstorage_state: + put = next( + stat for (meth, oid, stat) in stats if oid == obj_id and meth == "put" + ) + assert put.get("attempt_number") == 1 + assert put.get("start_time") > 0 + assert put.get("idle_for") == 0 + + get = next( + stat for (meth, oid, stat) in stats if oid == obj_id and meth == "get" + ) + assert get.get("attempt_number") == 3 + assert get.get("start_time") > 0 + assert get.get("idle_for") > 0 + assert get.get("delay_since_first_attempt") > 0 + + +def test_replay_content_with_errors( + kafka_server, kafka_prefix, kafka_consumer_group, monkeypatch +): + replayer, src_objstorage = prepare_test( + kafka_server, kafka_prefix, kafka_consumer_group + ) + dst_objstorage = get_objstorage(cls="memory") + src_objstorage = FailingObjstorage(src_objstorage) + + q = Queue() + monkeypatch.setattr(replay, "copy_object", copy_object_q(q)) + monkeypatch.setattr(replay.get_object.retry.stop, "max_attempt_number", 2) + + worker_fn = functools.partial( + replay.process_replay_objects_content, + src=src_objstorage, + dst=dst_objstorage, + ) + replayer.process(worker_fn) + + # no object could be replicated + assert dst_objstorage.state == {} + + stats = [q.get_nowait() for i in range(q.qsize())] + for obj in CONTENTS: + if obj.status != "visible": + continue + + obj_id = obj.sha1 + put = next( + stat for (meth, oid, stat) in stats if oid == obj_id and meth == "put" + ) + assert put == {} + + get = next( + stat for (meth, oid, stat) in stats if oid == obj_id and meth == "get" + ) + assert get.get("attempt_number") == 2 + assert get.get("start_time") > 0 + assert get.get("idle_for") > 0 + assert get.get("delay_since_first_attempt") > 0 + + +def test_replay_content_not_found( + kafka_server, kafka_prefix, kafka_consumer_group, monkeypatch +): + replayer, src_objstorage = prepare_test( + kafka_server, kafka_prefix, kafka_consumer_group + ) + dst_objstorage = get_objstorage(cls="memory") + src_objstorage = NotFoundObjstorage(src_objstorage) + + q = Queue() + monkeypatch.setattr(replay, "copy_object", copy_object_q(q)) + + worker_fn = functools.partial( + replay.process_replay_objects_content, + src=src_objstorage, + dst=dst_objstorage, + ) + replayer.process(worker_fn) + + # no object could be replicated + assert dst_objstorage.state == {} + + stats = [q.get_nowait() for i in range(q.qsize())] + for obj in CONTENTS: + if obj.status != "visible": + continue + + obj_id = obj.sha1 + put = next( + stat for (meth, oid, stat) in stats if oid == obj_id and meth == "put" + ) + assert put == {} + + get = next( + stat for (meth, oid, stat) in stats if oid == obj_id and meth == "get" + ) + # ObjectNotFound should not be retried several times... + assert get.get("attempt_number") == 1 + assert get.get("start_time") > 0 + assert get.get("idle_for") == 0 + + +def test_replay_content_with_transient_add_errors( + kafka_server, kafka_prefix, kafka_consumer_group, monkeypatch +): + replayer, src_objstorage = prepare_test( + kafka_server, kafka_prefix, kafka_consumer_group + ) + dst_objstorage = get_objstorage(cls="memory") + dst_objstorage = FailingObjstorage(dst_objstorage) + + q = Queue() + monkeypatch.setattr(replay, "copy_object", copy_object_q(q)) + + worker_fn = functools.partial( + replay.process_replay_objects_content, + src=src_objstorage, + dst=dst_objstorage, + ) + replayer.process(worker_fn) + + # only content with status visible will be copied in storage2 + expected_objstorage_state = { + c.sha1: c.data for c in CONTENTS if c.status == "visible" + } + assert expected_objstorage_state == dst_objstorage.storage.state + + stats = [q.get_nowait() for i in range(q.qsize())] + for obj_id in expected_objstorage_state: + put = next( + stat for (meth, oid, stat) in stats if oid == obj_id and meth == "put" + ) + assert put.get("attempt_number") == 3 + assert put.get("start_time") > 0 + assert put.get("idle_for") > 0 + assert put.get("delay_since_first_attempt") > 0 + + get = next( + stat for (meth, oid, stat) in stats if oid == obj_id and meth == "get" + ) + assert get.get("attempt_number") == 1 + assert get.get("start_time") > 0 + assert get.get("idle_for") == 0 + + +def test_replay_content_with_add_errors( + kafka_server, kafka_prefix, kafka_consumer_group, monkeypatch +): + replayer, src_objstorage = prepare_test( + kafka_server, kafka_prefix, kafka_consumer_group + ) + dst_objstorage = get_objstorage(cls="memory") + dst_objstorage = FailingObjstorage(dst_objstorage) + + q = Queue() + monkeypatch.setattr(replay, "copy_object", copy_object_q(q)) + monkeypatch.setattr(replay.get_object.retry.stop, "max_attempt_number", 2) + + worker_fn = functools.partial( + replay.process_replay_objects_content, + src=src_objstorage, + dst=dst_objstorage, + ) + replayer.process(worker_fn) + + # no object could be replicated + assert dst_objstorage.storage.state == {} + + stats = [q.get_nowait() for i in range(q.qsize())] + for obj in CONTENTS: + if obj.status != "visible": + continue + + obj_id = obj.sha1 + put = next( + stat for (meth, oid, stat) in stats if oid == obj_id and meth == "put" + ) + assert put.get("attempt_number") == 2 + assert put.get("start_time") > 0 + assert put.get("idle_for") > 0 + assert put.get("delay_since_first_attempt") > 0 + + get = next( + stat for (meth, oid, stat) in stats if oid == obj_id and meth == "get" + ) + assert get.get("attempt_number") == 1 + assert get.get("start_time") > 0 + assert get.get("idle_for") == 0 diff --git a/tox.ini b/tox.ini index 0ba992c..8d37b80 100644 --- a/tox.ini +++ b/tox.ini @@ -1,74 +1,75 @@ [tox] envlist=black,flake8,mypy,py3 [testenv] extras = testing deps = pytest-cov commands = pytest --doctest-modules \ {envsitepackagesdir}/swh/objstorage/replayer \ --cov={envsitepackagesdir}/swh/objstorage/replayer \ --cov-branch {posargs} [testenv:black] skip_install = true deps = - black==22.3.0 + black==22.10.0 commands = {envpython} -m black --check swh [testenv:flake8] skip_install = true deps = - flake8==4.0.1 - flake8-bugbear==22.3.23 + flake8==5.0.4 + flake8-bugbear==22.9.23 + pycodestyle==2.9.1 commands = {envpython} -m flake8 [testenv:mypy] extras = testing deps = mypy==0.942 commands = mypy swh # build documentation outside swh-environment using the current # git HEAD of swh-docs, is executed on CI for each diff to prevent # breaking doc build [testenv:sphinx] whitelist_externals = make usedevelop = true extras = testing deps = # fetch and install swh-docs in develop mode -e git+https://forge.softwareheritage.org/source/swh-docs#egg=swh.docs setenv = SWH_PACKAGE_DOC_TOX_BUILD = 1 # turn warnings into errors SPHINXOPTS = -W commands = make -I ../.tox/sphinx/src/swh-docs/swh/ -C docs # build documentation only inside swh-environment using local state # of swh-docs package [testenv:sphinx-dev] whitelist_externals = make usedevelop = true extras = testing deps = # install swh-docs in develop mode -e ../swh-docs setenv = SWH_PACKAGE_DOC_TOX_BUILD = 1 # turn warnings into errors SPHINXOPTS = -W commands = make -I ../.tox/sphinx-dev/src/swh-docs/swh/ -C docs