diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs new file mode 100644 index 0000000..651d43d --- /dev/null +++ b/.git-blame-ignore-revs @@ -0,0 +1,2 @@ +# python: Reformat code with black 22.3.0 +301bf25526b593f5e8dca588dfa24284dfa89006 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ace1fe2..1c95e3d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,48 +1,40 @@ repos: -- repo: https://github.com/pre-commit/pre-commit-hooks - rev: v2.4.0 - hooks: - - id: trailing-whitespace - - id: check-json - - id: check-yaml + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.1.0 + hooks: + - id: trailing-whitespace + - id: check-json + - id: check-yaml -- repo: https://gitlab.com/pycqa/flake8 - rev: 3.8.3 - hooks: - - id: flake8 + - repo: https://gitlab.com/pycqa/flake8 + rev: 4.0.1 + hooks: + - id: flake8 + additional_dependencies: [flake8-bugbear==22.3.23] -- repo: https://github.com/codespell-project/codespell - rev: v1.16.0 - hooks: - - id: codespell + - repo: https://github.com/codespell-project/codespell + rev: v2.1.0 + hooks: + - id: codespell + name: Check source code spelling + stages: [commit] -- repo: local - hooks: - - id: mypy - name: mypy - entry: mypy - args: [swh] - pass_filenames: false - language: system - types: [python] + - repo: local + hooks: + - id: mypy + name: mypy + entry: mypy + args: [swh] + pass_filenames: false + language: system + types: [python] -# unfortunately, we are far from being able to enable this... -# - repo: https://github.com/PyCQA/pydocstyle.git -# rev: 4.0.0 -# hooks: -# - id: pydocstyle -# name: pydocstyle -# description: pydocstyle is a static analysis tool for checking compliance with Python docstring conventions. -# entry: pydocstyle --convention=google -# language: python -# types: [python] + - repo: https://github.com/PyCQA/isort + rev: 5.10.1 + hooks: + - id: isort -- repo: https://github.com/PyCQA/isort - rev: 5.5.2 - hooks: - - id: isort - -- repo: https://github.com/python/black - rev: 19.10b0 - hooks: - - id: black + - repo: https://github.com/python/black + rev: 22.3.0 + hooks: + - id: black diff --git a/PKG-INFO b/PKG-INFO index 7fd09e5..4ca8a9a 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,30 +1,26 @@ Metadata-Version: 2.1 Name: swh.objstorage.replayer -Version: 1.0.0 +Version: 1.1.0 Summary: Software Heritage content replayer Home-page: https://forge.softwareheritage.org/diffusion/swh-objstorage-replayer Author: Software Heritage developers Author-email: swh-devel@inria.fr -License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-objstorage-replayer -Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 3 - Alpha Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing License-File: LICENSE License-File: AUTHORS swh-objstorage-replayer ======================= Simple command line tool to replicate content objects from a source Object storage to a destination one by listening the `content` topic of a `swh.journal` kafka stream. - - diff --git a/pytest.ini b/pytest.ini index b712d00..7c07895 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,2 +1,4 @@ [pytest] -norecursedirs = docs .* +norecursedirs = build docs .* + +asyncio_mode = strict diff --git a/setup.cfg b/setup.cfg index 1d722c2..f65ba0a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,8 +1,9 @@ [flake8] -ignore = E203,E231,W503 +select = C,E,F,W,B950 +ignore = E203,E231,E501,W503 max-line-length = 88 [egg_info] tag_build = tag_date = 0 diff --git a/swh.objstorage.replayer.egg-info/PKG-INFO b/swh.objstorage.replayer.egg-info/PKG-INFO index 7fd09e5..4ca8a9a 100644 --- a/swh.objstorage.replayer.egg-info/PKG-INFO +++ b/swh.objstorage.replayer.egg-info/PKG-INFO @@ -1,30 +1,26 @@ Metadata-Version: 2.1 Name: swh.objstorage.replayer -Version: 1.0.0 +Version: 1.1.0 Summary: Software Heritage content replayer Home-page: https://forge.softwareheritage.org/diffusion/swh-objstorage-replayer Author: Software Heritage developers Author-email: swh-devel@inria.fr -License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-objstorage-replayer -Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 3 - Alpha Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing License-File: LICENSE License-File: AUTHORS swh-objstorage-replayer ======================= Simple command line tool to replicate content objects from a source Object storage to a destination one by listening the `content` topic of a `swh.journal` kafka stream. - - diff --git a/swh.objstorage.replayer.egg-info/SOURCES.txt b/swh.objstorage.replayer.egg-info/SOURCES.txt index 53adbe9..5a1780c 100644 --- a/swh.objstorage.replayer.egg-info/SOURCES.txt +++ b/swh.objstorage.replayer.egg-info/SOURCES.txt @@ -1,41 +1,42 @@ +.git-blame-ignore-revs .gitignore .pre-commit-config.yaml AUTHORS CODE_OF_CONDUCT.md CONTRIBUTORS LICENSE MANIFEST.in Makefile README.md mypy.ini pyproject.toml pytest.ini requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py tox.ini docs/.gitignore docs/Makefile docs/cli.rst docs/conf.py docs/index.rst docs/_static/.placeholder docs/_templates/.placeholder swh/__init__.py swh.objstorage.replayer.egg-info/PKG-INFO swh.objstorage.replayer.egg-info/SOURCES.txt swh.objstorage.replayer.egg-info/dependency_links.txt swh.objstorage.replayer.egg-info/entry_points.txt swh.objstorage.replayer.egg-info/requires.txt swh.objstorage.replayer.egg-info/top_level.txt swh/objstorage/__init__.py swh/objstorage/replayer/__init__.py swh/objstorage/replayer/cli.py swh/objstorage/replayer/py.typed swh/objstorage/replayer/replay.py swh/objstorage/replayer/tests/__init__.py swh/objstorage/replayer/tests/test_cli.py swh/objstorage/replayer/tests/test_replay.py swh/objstorage/replayer/tests/test_statsd.py \ No newline at end of file diff --git a/swh.objstorage.replayer.egg-info/entry_points.txt b/swh.objstorage.replayer.egg-info/entry_points.txt index 5db5b59..1ca4ef7 100644 --- a/swh.objstorage.replayer.egg-info/entry_points.txt +++ b/swh.objstorage.replayer.egg-info/entry_points.txt @@ -1,4 +1,2 @@ - - [swh.cli.subcommands] - content-replayer=swh.objstorage.replayer.cli - \ No newline at end of file +[swh.cli.subcommands] +content-replayer = swh.objstorage.replayer.cli diff --git a/swh/objstorage/replayer/cli.py b/swh/objstorage/replayer/cli.py index b2873e8..2bd6c1c 100644 --- a/swh/objstorage/replayer/cli.py +++ b/swh/objstorage/replayer/cli.py @@ -1,194 +1,196 @@ # Copyright (C) 2016-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import logging import click try: from systemd.daemon import notify except ImportError: notify = None from swh.objstorage.cli import objstorage_cli_group @objstorage_cli_group.command("replay") @click.option( "--stop-after-objects", "-n", default=None, type=int, help="Stop after processing this many objects. Default is to run forever.", ) @click.option( "--exclude-sha1-file", default=None, type=click.File("rb"), help="File containing a sorted array of hashes to be excluded.", ) @click.option( "--check-dst/--no-check-dst", default=True, help="Check whether the destination contains the object before copying.", ) @click.option( "--concurrency", default=4, help=( "Number of concurrent threads doing the actual copy of blobs between " "the source and destination objstorages." ), ) @click.pass_context def content_replay(ctx, stop_after_objects, exclude_sha1_file, check_dst, concurrency): """Fill a destination Object Storage using a journal stream. This is typically used for a mirror configuration, by reading a Journal and retrieving objects from an existing source ObjStorage. There can be several 'replayers' filling a given ObjStorage as long as they use the same ``group-id``. You can use the ``KAFKA_GROUP_INSTANCE_ID`` environment variable to use KIP-345 static group membership. This service retrieves object ids to copy from the 'content' topic. It will only copy object's content if the object's description in the kafka nmessage has the status:visible set. ``--exclude-sha1-file`` may be used to exclude some hashes to speed-up the replay in case many of the contents are already in the destination objstorage. It must contain a concatenation of all (sha1) hashes, and it must be sorted. This file will not be fully loaded into memory at any given time, so it can be arbitrarily large. ``--check-dst`` sets whether the replayer should check in the destination ObjStorage before copying an object. You can turn that off if you know you're copying to an empty ObjStorage. ``--concurrency N`` sets the number of threads in charge of copy blob objects from the source objstorage to the destination one. Using a large concurrency value make sense if both the source and destination objstorages support highly parallel workloads. Make not to set the ``batch_size`` configuration option too low for the concurrency to be actually useful (each batch of kafka messages is dispatched among the threads). The expected configuration file should have 3 sections: - objstorage: the source object storage from which to retrieve objects to copy; this objstorage can (and should) be a read-only objstorage, https://docs.softwareheritage.org/devel/apidoc/swh.objstorage.html - objstorage_dst: the destination objstorage in which objects will be written into, - journal_client: the configuration of the kafka journal from which the `content` topic will be consumed to get the list of content objects to copy from the source objstorage to the destination one. https://docs.softwareheritage.org/devel/apidoc/swh.journal.client.html In addition to these 3 mandatory sections, an optional 'replayer' section can be provided with an 'error_reporter' config entry allowing to specify a Redis connection parameter set that will be used to report objects that could not be copied, eg.:: objstorage: [...] objstorage_dst: [...] journal_client: [...] replayer: error_reporter: host: redis.local port: 6379 db: 1 """ import functools import mmap from swh.journal.client import get_journal_client from swh.model.model import SHA1_SIZE from swh.objstorage.factory import get_objstorage from swh.objstorage.replayer.replay import ( is_hash_in_bytearray, process_replay_objects_content, ) conf = ctx.obj["config"] try: objstorage_src = get_objstorage(**conf.pop("objstorage")) except KeyError: ctx.fail("You must have a source objstorage configured in " "your config file.") try: objstorage_dst = get_objstorage(**conf.pop("objstorage_dst")) except KeyError: ctx.fail( "You must have a destination objstorage configured " "in your config file." ) if exclude_sha1_file: map_ = mmap.mmap(exclude_sha1_file.fileno(), 0, prot=mmap.PROT_READ) if map_.size() % SHA1_SIZE != 0: ctx.fail( "--exclude-sha1 must link to a file whose size is an " "exact multiple of %d bytes." % SHA1_SIZE ) nb_excluded_hashes = int(map_.size() / SHA1_SIZE) def exclude_fn(obj): return is_hash_in_bytearray(obj["sha1"], map_, nb_excluded_hashes) else: exclude_fn = None journal_cfg = conf.pop("journal_client") replayer_cfg = conf.pop("replayer", {}) if "error_reporter" in replayer_cfg: from redis import Redis from swh.objstorage.replayer import replay replay.REPORTER = Redis(**replayer_cfg.get("error_reporter")).set client = get_journal_client( - **journal_cfg, stop_after_objects=stop_after_objects, object_types=("content",), + **journal_cfg, + stop_after_objects=stop_after_objects, + object_types=("content",), ) worker_fn = functools.partial( process_replay_objects_content, src=objstorage_src, dst=objstorage_dst, exclude_fn=exclude_fn, check_dst=check_dst, concurrency=concurrency, ) if notify: notify("READY=1") try: client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: if notify: notify("STOPPING=1") client.close() def main(): logging.basicConfig() return objstorage_cli_group(auto_envvar_prefix="SWH_OBJSTORAGE") if __name__ == "__main__": main() diff --git a/swh/objstorage/replayer/replay.py b/swh/objstorage/replayer/replay.py index 5e42b2b..786b717 100644 --- a/swh/objstorage/replayer/replay.py +++ b/swh/objstorage/replayer/replay.py @@ -1,348 +1,348 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor from concurrent.futures import wait as futures_wait import logging from time import time from typing import Callable, Dict, List, Optional import msgpack from sentry_sdk import capture_exception, push_scope try: from systemd.daemon import notify except ImportError: notify = None from tenacity import ( retry, retry_if_exception_type, stop_after_attempt, wait_random_exponential, ) from tenacity.retry import retry_base from swh.core.statsd import statsd from swh.model.hashutil import hash_to_hex from swh.model.model import SHA1_SIZE from swh.objstorage.objstorage import ID_HASH_ALGO, ObjNotFoundError, ObjStorage logger = logging.getLogger(__name__) REPORTER = None CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total" CONTENT_RETRY_METRIC = "swh_content_replayer_retries_total" CONTENT_BYTES_METRIC = "swh_content_replayer_bytes" CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds" def is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=SHA1_SIZE): """ Checks if the given hash is in the provided `array`. The array must be a *sorted* list of sha1 hashes, and contain `nb_hashes` hashes (so its size must by `nb_hashes*hash_size` bytes). Args: hash_ (bytes): the hash to look for array (bytes): a sorted concatenated array of hashes (may be of any type supporting slice indexing, eg. :class:`mmap.mmap`) nb_hashes (int): number of hashes in the array hash_size (int): size of a hash (defaults to 20, for SHA1) Example: >>> import os >>> hash1 = os.urandom(20) >>> hash2 = os.urandom(20) >>> hash3 = os.urandom(20) >>> array = b''.join(sorted([hash1, hash2])) >>> is_hash_in_bytearray(hash1, array, 2) True >>> is_hash_in_bytearray(hash2, array, 2) True >>> is_hash_in_bytearray(hash3, array, 2) False """ if len(hash_) != hash_size: raise ValueError("hash_ does not match the provided hash_size.") def get_hash(position): return array[position * hash_size : (position + 1) * hash_size] # Regular dichotomy: left = 0 right = nb_hashes while left < right - 1: middle = int((right + left) / 2) pivot = get_hash(middle) if pivot == hash_: return True elif pivot < hash_: left = middle else: right = middle return get_hash(left) == hash_ class ReplayError(Exception): """An error occurred during the replay of an object""" def __init__(self, *, obj_id, exc): self.obj_id = hash_to_hex(obj_id) self.exc = exc def __str__(self): return "ReplayError(%s, %s)" % (self.obj_id, self.exc) def log_replay_retry(retry_state, sleep=None, last_result=None): """Log a retry of the content replayer""" exc = retry_state.outcome.exception() operation = retry_state.fn.__name__ logger.debug( "Retry operation %(operation)s on %(obj_id)s: %(exc)s", {"operation": operation, "obj_id": exc.obj_id, "exc": str(exc.exc)}, ) def log_replay_error(retry_state): """Log a replay error to sentry""" exc = retry_state.outcome.exception() with push_scope() as scope: scope.set_tag("operation", retry_state.fn.__name__) scope.set_extra("obj_id", exc.obj_id) capture_exception(exc.exc) error_context = { "obj_id": exc.obj_id, "operation": retry_state.fn.__name__, "exc": str(exc.exc), "retries": retry_state.attempt_number, } logger.error( "Failed operation %(operation)s on %(obj_id)s after %(retries)s" " retries: %(exc)s", error_context, ) # if we have a global error (redis) reporter if REPORTER is not None: oid = f"blob:{exc.obj_id}" msg = msgpack.dumps(error_context) REPORTER(oid, msg) return None CONTENT_REPLAY_RETRIES = 3 class retry_log_if_success(retry_base): """Log in statsd the number of attempts required to succeed""" def __call__(self, retry_state): if not retry_state.outcome.failed: statsd.increment( CONTENT_RETRY_METRIC, tags={ "operation": retry_state.fn.__name__, "attempt": str(retry_state.attempt_number), }, ) return False content_replay_retry = retry( retry=retry_if_exception_type(ReplayError) | retry_log_if_success(), stop=stop_after_attempt(CONTENT_REPLAY_RETRIES), wait=wait_random_exponential(multiplier=1, max=60), before_sleep=log_replay_retry, retry_error_callback=log_replay_error, ) @content_replay_retry def get_object(objstorage, obj_id): try: with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "get"}): obj = objstorage.get(obj_id) logger.debug("retrieved %(obj_id)s", {"obj_id": hash_to_hex(obj_id)}) return obj except ObjNotFoundError: logger.error( "Failed to retrieve %(obj_id)s: object not found", {"obj_id": hash_to_hex(obj_id)}, ) raise except Exception as exc: raise ReplayError(obj_id=obj_id, exc=exc) from None @content_replay_retry def put_object(objstorage, obj_id, obj): try: with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "put"}): - obj = objstorage.add(obj, obj_id, check_presence=False) + objstorage.add(obj, obj_id, check_presence=False) logger.debug("stored %(obj_id)s", {"obj_id": hash_to_hex(obj_id)}) except Exception as exc: raise ReplayError(obj_id=obj_id, exc=exc) from None def copy_object(obj_id, src, dst): obj = get_object(src, obj_id) if obj is not None: put_object(dst, obj_id, obj) statsd.increment(CONTENT_BYTES_METRIC, len(obj)) return len(obj) return 0 @content_replay_retry def obj_in_objstorage(obj_id, dst): """Check if an object is already in an objstorage, tenaciously""" try: return obj_id in dst except Exception as exc: raise ReplayError(obj_id=obj_id, exc=exc) from None def process_replay_objects_content( all_objects: Dict[str, List[dict]], *, src: ObjStorage, dst: ObjStorage, exclude_fn: Optional[Callable[[dict], bool]] = None, check_dst: bool = True, concurrency: int = 16, ): """ Takes a list of records from Kafka (see :py:func:`swh.journal.client.JournalClient.process`) and copies them from the `src` objstorage to the `dst` objstorage, if: * `obj['status']` is `'visible'` * `exclude_fn(obj)` is `False` (if `exclude_fn` is provided) * `obj['sha1'] not in dst` (if `check_dst` is True) Args: all_objects: Objects passed by the Kafka client. Most importantly, `all_objects['content'][*]['sha1']` is the sha1 hash of each content. src: An object storage (see :py:func:`swh.objstorage.get_objstorage`) dst: An object storage (see :py:func:`swh.objstorage.get_objstorage`) exclude_fn: Determines whether an object should be copied. check_dst: Determines whether we should check the destination objstorage before copying. Example: >>> from swh.objstorage.factory import get_objstorage >>> src = get_objstorage('memory') >>> dst = get_objstorage('memory') >>> id1 = src.add(b'foo bar') >>> id2 = src.add(b'baz qux') >>> kafka_partitions = { ... 'content': [ ... { ... 'sha1': id1, ... 'status': 'visible', ... }, ... { ... 'sha1': id2, ... 'status': 'visible', ... }, ... ] ... } >>> process_replay_objects_content( ... kafka_partitions, src=src, dst=dst, ... exclude_fn=lambda obj: obj['sha1'] == id1) >>> id1 in dst False >>> id2 in dst True """ vol = [] nb_skipped = 0 nb_failures = 0 t0 = time() def _copy_object(obj): nonlocal nb_skipped nonlocal nb_failures obj_id = obj[ID_HASH_ALGO] if obj["status"] != "visible": nb_skipped += 1 logger.debug("skipped %s (status=%s)", hash_to_hex(obj_id), obj["status"]) statsd.increment( CONTENT_OPERATIONS_METRIC, tags={"decision": "skipped", "status": obj["status"]}, ) elif exclude_fn and exclude_fn(obj): nb_skipped += 1 logger.debug("skipped %s (manually excluded)", hash_to_hex(obj_id)) statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"}) elif check_dst and obj_in_objstorage(obj_id, dst): nb_skipped += 1 logger.debug("skipped %s (in dst)", hash_to_hex(obj_id)) statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "in_dst"}) else: try: copied = copy_object(obj_id, src, dst) except ObjNotFoundError: nb_skipped += 1 statsd.increment( CONTENT_OPERATIONS_METRIC, tags={"decision": "not_in_src"} ) else: if copied is None: nb_failures += 1 statsd.increment( CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"} ) else: vol.append(copied) statsd.increment( CONTENT_OPERATIONS_METRIC, tags={"decision": "copied"} ) with ThreadPoolExecutor(max_workers=concurrency) as pool: futures = [] for (object_type, objects) in all_objects.items(): if object_type != "content": logger.warning( "Received a series of %s, this should not happen", object_type ) continue for obj in objects: futures.append(pool.submit(_copy_object, obj=obj)) futures_wait(futures, return_when=FIRST_EXCEPTION) for f in futures: if f.running(): continue exc = f.exception() if exc: pool.shutdown(wait=False) f.result() raise exc dt = time() - t0 logger.info( "processed %s content objects in %.1fsec " "(%.1f obj/sec, %.1fMB/sec) - %d failed - %d skipped", len(vol), dt, len(vol) / dt, sum(vol) / 1024 / 1024 / dt, nb_failures, nb_skipped, ) if notify: notify("WATCHDOG=1") diff --git a/swh/objstorage/replayer/tests/test_cli.py b/swh/objstorage/replayer/tests/test_cli.py index 3ba15c0..d5be3a8 100644 --- a/swh/objstorage/replayer/tests/test_cli.py +++ b/swh/objstorage/replayer/tests/test_cli.py @@ -1,677 +1,697 @@ # Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter import copy import functools import logging import re from subprocess import Popen import tempfile from typing import Tuple from unittest.mock import patch from click.testing import CliRunner from confluent_kafka import Producer import msgpack import pytest import yaml from swh.journal.serializers import key_to_kafka -from swh.model.hashutil import hash_to_hex +from swh.model.hashutil import MultiHash, hash_to_hex from swh.objstorage.backends.in_memory import InMemoryObjStorage from swh.objstorage.replayer.cli import objstorage_cli_group from swh.objstorage.replayer.replay import CONTENT_REPLAY_RETRIES logger = logging.getLogger(__name__) CLI_CONFIG = { - "objstorage": {"cls": "mocked", "name": "src",}, - "objstorage_dst": {"cls": "mocked", "name": "dst",}, + "objstorage": { + "cls": "mocked", + "name": "src", + }, + "objstorage_dst": { + "cls": "mocked", + "name": "dst", + }, } @pytest.fixture def monkeypatch_retry_sleep(monkeypatch): from swh.objstorage.replayer.replay import get_object, obj_in_objstorage, put_object monkeypatch.setattr(get_object.retry, "sleep", lambda x: None) monkeypatch.setattr(put_object.retry, "sleep", lambda x: None) monkeypatch.setattr(obj_in_objstorage.retry, "sleep", lambda x: None) def _patch_objstorages(names): objstorages = {name: InMemoryObjStorage() for name in names} def get_mock_objstorage(cls, **args): assert cls == "mocked", cls return objstorages[args["name"]] def decorator(f): @functools.wraps(f) @patch("swh.objstorage.factory.get_objstorage") def newf(get_objstorage_mock, *args, **kwargs): get_objstorage_mock.side_effect = get_mock_objstorage f(*args, objstorages=objstorages, **kwargs) return newf return decorator def invoke(*args, env=None, **kwargs): config = copy.deepcopy(CLI_CONFIG) config.update(kwargs) runner = CliRunner() with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: yaml.dump(config, config_fd) config_fd.seek(0) args = ["-C" + config_fd.name] + list(args) return runner.invoke( - objstorage_cli_group, args, obj={"log_level": logging.DEBUG}, env=env, + objstorage_cli_group, + args, + obj={"log_level": logging.DEBUG}, + env=env, ) def test_replay_help(): - result = invoke("replay", "--help",) + result = invoke( + "replay", + "--help", + ) expected = ( r"^\s*Usage: objstorage replay \[OPTIONS\]\s+" r"Fill a destination Object Storage.*" ) assert result.exit_code == 0, result.output assert re.match(expected, result.output, re.MULTILINE), result.output NUM_CONTENTS = 10 def _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorage): producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test-producer", "acks": "all", } ) contents = {} for i in range(NUM_CONTENTS): content = b"\x00" * 19 + bytes([i]) - sha1 = objstorage.add(content) + sha1 = MultiHash(["sha1"]).from_data(content).digest()["sha1"] + objstorage.add(content=content, obj_id=sha1) contents[sha1] = content producer.produce( topic=kafka_prefix + ".content", key=key_to_kafka(sha1), - value=key_to_kafka({"sha1": sha1, "status": "visible",}), + value=key_to_kafka( + { + "sha1": sha1, + "status": "visible", + } + ), ) producer.flush() return contents @_patch_objstorages(["src", "dst"]) def test_replay_content( objstorages, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], ): """Check the content replayer in normal conditions""" contents = _fill_objstorage_and_kafka( kafka_server, kafka_prefix, objstorages["src"] ) result = invoke( "replay", "--stop-after-objects", str(NUM_CONTENTS), journal_client={ "cls": "kafka", "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content @_patch_objstorages(["src", "dst"]) def test_replay_content_structured_log( objstorages, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], caplog, ): """Check the logs produced by the content replayer in normal conditions""" contents = _fill_objstorage_and_kafka( kafka_server, kafka_prefix, objstorages["src"] ) caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") expected_obj_ids = set(hash_to_hex(sha1) for sha1 in contents) result = invoke( "replay", "--stop-after-objects", str(NUM_CONTENTS), journal_client={ "cls": "kafka", "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = set() for record in caplog.records: logtext = record.getMessage() if "stored" in logtext: copied.add(record.args["obj_id"]) assert ( copied == expected_obj_ids ), "Mismatched logging; see captured log output for details." @_patch_objstorages(["src", "dst"]) def test_replay_content_static_group_id( objstorages, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], caplog, ): """Check the content replayer in normal conditions with KAFKA_GROUP_INSTANCE_ID set """ contents = _fill_objstorage_and_kafka( kafka_server, kafka_prefix, objstorages["src"] ) # Setup log capture to fish the consumer settings out of the log messages caplog.set_level(logging.DEBUG, "swh.journal.client") result = invoke( "replay", "--stop-after-objects", str(NUM_CONTENTS), env={"KAFKA_GROUP_INSTANCE_ID": "static-group-instance-id"}, journal_client={ "cls": "kafka", "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output consumer_settings = None for record in caplog.records: if "Consumer settings" in record.message: consumer_settings = record.args break assert consumer_settings is not None, ( "Failed to get consumer settings out of the consumer log. " "See log capture for details." ) assert consumer_settings["group.instance.id"] == "static-group-instance-id" assert consumer_settings["session.timeout.ms"] == 60 * 10 * 1000 assert consumer_settings["max.poll.interval.ms"] == 90 * 10 * 1000 for (sha1, content) in contents.items(): assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content @_patch_objstorages(["src", "dst"]) def test_replay_content_exclude( objstorages, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], ): """Check the content replayer in normal conditions with a exclusion file (--exclude-sha1-file) """ contents = _fill_objstorage_and_kafka( kafka_server, kafka_prefix, objstorages["src"] ) excluded_contents = list(contents)[0::2] # picking half of them with tempfile.NamedTemporaryFile(mode="w+b") as fd: fd.write(b"".join(sorted(excluded_contents))) fd.seek(0) result = invoke( "replay", "--stop-after-objects", str(NUM_CONTENTS), "--exclude-sha1-file", fd.name, journal_client={ "cls": "kafka", "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): if sha1 in excluded_contents: assert sha1 not in objstorages["dst"], sha1 else: assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content NUM_CONTENTS_DST = 5 @_patch_objstorages(["src", "dst"]) @pytest.mark.parametrize( "check_dst,expected_copied,expected_in_dst", [ (True, NUM_CONTENTS - NUM_CONTENTS_DST, NUM_CONTENTS_DST), (False, NUM_CONTENTS, 0), ], ) def test_replay_content_check_dst( objstorages, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], check_dst: bool, expected_copied: int, expected_in_dst: int, caplog, ): """Check the content replayer in normal conditions with some objects already in the dst objstorage. When check_dst is True, expect those not to be neither retrieved from the src objstorage nor pushed in the dst objstorage. """ contents = _fill_objstorage_and_kafka( kafka_server, kafka_prefix, objstorages["src"] ) # add some objects in the dst objstorage for i, (sha1, content) in enumerate(contents.items()): if i >= NUM_CONTENTS_DST: break objstorages["dst"].add(content, obj_id=sha1) caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") result = invoke( "replay", "--stop-after-objects", str(NUM_CONTENTS), "--check-dst" if check_dst else "--no-check-dst", journal_client={ "cls": "kafka", "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output retrieved = 0 stored = 0 in_dst = 0 for record in caplog.records: logtext = record.getMessage() if "retrieved" in logtext: retrieved += 1 elif "stored" in logtext: stored += 1 elif "in dst" in logtext: in_dst += 1 assert ( retrieved == expected_copied and stored == expected_copied and in_dst == expected_in_dst ), "Unexpected amount of objects copied, see the captured log for details" for (sha1, content) in contents.items(): assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content class FlakyObjStorage(InMemoryObjStorage): """Flaky objstorage Any 'get', 'add' or 'in' (i.e. '__contains__()') operation will fail according to configured 'failures'. 'failures' is expected to be a dict which keys are couples (operation, obj_id) and values are the number of time the operation 'operation' is expected to fail for object 'obj_id' before being performed successfully. An optional state ('state') can be also given as argument (see InMemoryObjStorage). """ def __init__(self, *args, **kwargs): state = kwargs.pop("state") self.failures_left = Counter(kwargs.pop("failures")) super().__init__(*args, **kwargs) if state: self.state = state def flaky_operation(self, op, obj_id): if self.failures_left[op, obj_id] > 0: self.failures_left[op, obj_id] -= 1 raise RuntimeError("Failed %s on %s" % (op, hash_to_hex(obj_id))) def get(self, obj_id): self.flaky_operation("get", obj_id) return super().get(obj_id) def add(self, data, obj_id=None, check_presence=True): self.flaky_operation("add", obj_id) return super().add(data, obj_id=obj_id, check_presence=check_presence) def __contains__(self, obj_id): self.flaky_operation("in", obj_id) return super().__contains__(obj_id) @_patch_objstorages(["src", "dst"]) def test_replay_content_check_dst_retry( objstorages, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], monkeypatch_retry_sleep, caplog, redis_proc, redisdb, ): """Check the content replayer with a flaky dst objstorage for 'in' operations. """ contents = _fill_objstorage_and_kafka( kafka_server, kafka_prefix, objstorages["src"] ) # build a flaky dst objstorage in which the 'in' operation for the first # NUM_CONTENT_DST objects will fail once failures = {} for i, (sha1, content) in enumerate(contents.items()): if i >= NUM_CONTENTS_DST: break objstorages["dst"].add(content, obj_id=sha1) failures["in", sha1] = 1 orig_dst = objstorages["dst"] objstorages["dst"] = FlakyObjStorage(state=orig_dst.state, failures=failures) caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") result = invoke( "replay", "--check-dst", "--stop-after-objects", str(NUM_CONTENTS), journal_client={ "cls": "kafka", "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, replayer={ "error_reporter": {"host": redis_proc.host, "port": redis_proc.port}, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output # check that exactly NUM_CONTENTS_DST 'in' operations have failed once failed_in = 0 for record in caplog.records: logtext = record.getMessage() if "Retry operation obj_in_objstorage" in logtext: failed_in += 1 elif "Retry operation" in logtext: assert False, "No other failure expected than 'in' operations" assert failed_in == NUM_CONTENTS_DST # check nothing has been reported in redis assert not redisdb.keys() # in the end, the replay process should be OK for (sha1, content) in contents.items(): assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content @_patch_objstorages(["src", "dst"]) def test_replay_content_failed_copy_retry( objstorages, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], caplog, monkeypatch_retry_sleep, redis_proc, redisdb, ): """Check the content replayer with a flaky src and dst objstorages for 'get' and 'add' operations, and a few non-recoverable failures (some objects failed to be replayed). """ contents = _fill_objstorage_and_kafka( kafka_server, kafka_prefix, objstorages["src"] ) add_failures = {} get_failures = {} definitely_failed = set() # We want to generate operations failing 1 to CONTENT_REPLAY_RETRIES times. # We generate failures for 2 different operations, get and add. num_retry_contents = 2 * CONTENT_REPLAY_RETRIES assert ( num_retry_contents < NUM_CONTENTS ), "Need to generate more test contents to properly test retry behavior" for i, sha1 in enumerate(contents): if i >= num_retry_contents: break # This generates a number of failures, up to CONTENT_REPLAY_RETRIES num_failures = (i % CONTENT_REPLAY_RETRIES) + 1 # This generates failures of add for the first CONTENT_REPLAY_RETRIES # objects, then failures of get. if i < CONTENT_REPLAY_RETRIES: add_failures["add", sha1] = num_failures else: get_failures["get", sha1] = num_failures # Only contents that have CONTENT_REPLAY_RETRIES or more are # definitely failing if num_failures >= CONTENT_REPLAY_RETRIES: definitely_failed.add(hash_to_hex(sha1)) assert add_failures assert get_failures assert definitely_failed objstorages["dst"] = FlakyObjStorage( - state=objstorages["dst"].state, failures=add_failures, + state=objstorages["dst"].state, + failures=add_failures, ) objstorages["src"] = FlakyObjStorage( - state=objstorages["src"].state, failures=get_failures, + state=objstorages["src"].state, + failures=get_failures, ) caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") result = invoke( "replay", "--stop-after-objects", str(NUM_CONTENTS), journal_client={ "cls": "kafka", "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, replayer={ "error_reporter": {"host": redis_proc.host, "port": redis_proc.port}, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output # check the logs looks as expected copied = 0 failed_put = set() failed_get = set() for record in caplog.records: logtext = record.getMessage() if "stored" in logtext: copied += 1 elif "Failed operation" in logtext: assert record.levelno == logging.ERROR assert record.args["retries"] == CONTENT_REPLAY_RETRIES assert record.args["operation"] in ("get_object", "put_object") if record.args["operation"] == "get_object": failed_get.add(record.args["obj_id"]) else: failed_put.add(record.args["obj_id"]) assert ( failed_put | failed_get == definitely_failed ), "Unexpected object copy failures; see captured log for details" # check failed objects are referenced in redis assert set(redisdb.keys()) == { f"blob:{objid}".encode() for objid in definitely_failed } # and have a consistent error report in redis for key in redisdb.keys(): report = msgpack.loads(redisdb[key]) assert report["operation"] in ("get_object", "put_object") if report["operation"] == "get_object": assert report["obj_id"] in failed_get else: assert report["obj_id"] in failed_put # check valid object are in the dst objstorage, but # failed objects are not. for (sha1, content) in contents.items(): if hash_to_hex(sha1) in definitely_failed: assert sha1 not in objstorages["dst"] continue assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content @_patch_objstorages(["src", "dst"]) def test_replay_content_objnotfound( objstorages, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], caplog, ): """Check the ContentNotFound is not considered a failure to retry""" contents = _fill_objstorage_and_kafka( kafka_server, kafka_prefix, objstorages["src"] ) # delete a few objects from the src objstorage num_contents_deleted = 5 contents_deleted = set() for i, sha1 in enumerate(contents): if i >= num_contents_deleted: break del objstorages["src"].state[sha1] contents_deleted.add(hash_to_hex(sha1)) caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") result = invoke( "replay", "--stop-after-objects", str(NUM_CONTENTS), journal_client={ "cls": "kafka", "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = 0 not_in_src = set() for record in caplog.records: logtext = record.getMessage() if "stored" in logtext: copied += 1 elif "object not found" in logtext: # Check that the object id can be recovered from logs assert record.levelno == logging.ERROR not_in_src.add(record.args["obj_id"]) elif "Retry operation" in logtext: assert False, "Not found objects should not be retried" assert ( copied == NUM_CONTENTS - num_contents_deleted ), "Unexpected number of contents copied" assert ( not_in_src == contents_deleted ), "Mismatch between deleted contents and not_in_src logs" for (sha1, content) in contents.items(): if sha1 not in objstorages["src"]: continue assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content diff --git a/swh/objstorage/replayer/tests/test_replay.py b/swh/objstorage/replayer/tests/test_replay.py index 9ebbc8f..c2af538 100644 --- a/swh/objstorage/replayer/tests/test_replay.py +++ b/swh/objstorage/replayer/tests/test_replay.py @@ -1,72 +1,73 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools from hypothesis import given, settings from hypothesis.strategies import sets from swh.journal.client import JournalClient from swh.journal.writer import get_journal_writer from swh.model.hypothesis_strategies import sha1 from swh.model.model import Content from swh.objstorage.factory import get_objstorage from swh.objstorage.replayer.replay import ( is_hash_in_bytearray, process_replay_objects_content, ) CONTENTS = [Content.from_data(f"foo{i}".encode()) for i in range(10)] + [ Content.from_data(f"forbidden foo{i}".encode(), status="hidden") for i in range(10) ] @settings(max_examples=500) @given( - sets(sha1(), min_size=0, max_size=500), sets(sha1(), min_size=10), + sets(sha1(), min_size=0, max_size=500), + sets(sha1(), min_size=10), ) def test_is_hash_in_bytearray(haystack, needles): array = b"".join(sorted(haystack)) needles |= haystack # Exhaustively test for all objects in the array for needle in needles: assert is_hash_in_bytearray(needle, array, len(haystack)) == ( needle in haystack ) def test_replay_content(kafka_server, kafka_prefix, kafka_consumer_group): objstorage1 = get_objstorage(cls="memory") objstorage2 = get_objstorage(cls="memory") writer = get_journal_writer( cls="kafka", brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, anonymize=False, ) for content in CONTENTS: - objstorage1.add(content.data) + objstorage1.add(content.data, obj_id=content.sha1) writer.write_addition("content", content) replayer = JournalClient( brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, # stop_after_objects=len(objects), ) worker_fn = functools.partial( process_replay_objects_content, src=objstorage1, dst=objstorage2 ) replayer.process(worker_fn) # only content with status visible will be copied in storage2 expected_objstorage_state = { c.sha1: c.data for c in CONTENTS if c.status == "visible" } assert expected_objstorage_state == objstorage2.state diff --git a/swh/objstorage/replayer/tests/test_statsd.py b/swh/objstorage/replayer/tests/test_statsd.py index 59499d5..bdb1d9e 100644 --- a/swh/objstorage/replayer/tests/test_statsd.py +++ b/swh/objstorage/replayer/tests/test_statsd.py @@ -1,119 +1,119 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import re import pytest from swh.journal.client import JournalClient from swh.journal.writer import get_journal_writer from swh.model.model import Content from swh.objstorage.factory import get_objstorage from swh.objstorage.replayer import replay from swh.objstorage.replayer.replay import process_replay_objects_content @pytest.fixture def statsd(monkeypatch, statsd): monkeypatch.setattr(replay, "statsd", statsd) yield statsd def test_replay_statsd(kafka_server, kafka_prefix, kafka_consumer_group, statsd): objstorage1 = get_objstorage(cls="memory") objstorage2 = get_objstorage(cls="memory") writer = get_journal_writer( cls="kafka", brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, anonymize=False, ) # Fill the source objstorage with a bunch of content object. In the end, # there should be 2 content objects for each possible replaying decision # (aka. skipped, excluded, in_dst, not_in_src, failed and copied): # contents[0:2] are properly copied # contents[2:4] are excluded # contents[4:6] are in dst # contents[6:8] are hidden contents = [ Content.from_data( f"foo{i}".encode(), status="hidden" if 6 <= i < 8 else "visible" ) for i in range(8) ] for content in contents: - objstorage1.add(content.data) + objstorage1.add(content.data, obj_id=content.sha1) writer.write_addition("content", content) excluded = [c.sha1 for c in contents[2:4]] def exclude_fn(cnt_d): return cnt_d["sha1"] in excluded for content in contents[4:6]: - objstorage2.add(content.data) + objstorage2.add(content.data, obj_id=content.sha1) replayer = JournalClient( brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, # stop_after_objects=len(objects), ) worker_fn = functools.partial( process_replay_objects_content, src=objstorage1, dst=objstorage2, exclude_fn=exclude_fn, ) replayer.process(worker_fn) # We cannot expect any order from replayed objects, so statsd reports won't # be sorted according to contents, so we just count the expected occurrence # of each statsd message. prefix = "swh_content_replayer" expected_reports = { # 4 because 2 for the copied objects + 2 for the in_dst ones f"^{prefix}_retries_total:1[|]c[|]#attempt:1,operation:obj_in_objstorage$": 4, f"^{prefix}_retries_total:1[|]c[|]#attempt:1,operation:get_object$": 2, f"^{prefix}_retries_total:1[|]c[|]#attempt:1,operation:put_object$": 2, f"^{prefix}_duration_seconds:[0-9]+[.][0-9]+[|]ms[|]#request:get$": 2, f"^{prefix}_duration_seconds:[0-9]+[.][0-9]+[|]ms[|]#request:put$": 2, f"^{prefix}_bytes:4[|]c$": 2, } decisions = ("copied", "skipped", "excluded", "in_dst", "not_in_src", "failed") decision_re = ( "^swh_content_replayer_operations_total:1[|]c[|]#decision:(?P" + "|".join(decisions) + ")(?P,.+)?$" ) operations = dict.fromkeys(decisions, 0) reports = dict.fromkeys(expected_reports, 0) for report in (r.decode() for r in statsd.socket.payloads): m = re.match(decision_re, report) if m: operations[m.group("decision")] += 1 else: for expected in expected_reports: m = re.match(expected, report) if m: reports[expected] += 1 assert reports == expected_reports assert operations["skipped"] == 2 assert operations["excluded"] == 2 assert operations["in_dst"] == 2 assert operations["copied"] == 2 # TODO: assert operations["not_in_src"] == 0 assert operations["failed"] == 0 diff --git a/tox.ini b/tox.ini index 8394d86..0ba992c 100644 --- a/tox.ini +++ b/tox.ini @@ -1,73 +1,74 @@ [tox] envlist=black,flake8,mypy,py3 [testenv] extras = testing deps = pytest-cov commands = pytest --doctest-modules \ {envsitepackagesdir}/swh/objstorage/replayer \ --cov={envsitepackagesdir}/swh/objstorage/replayer \ --cov-branch {posargs} [testenv:black] skip_install = true deps = - black==19.10b0 + black==22.3.0 commands = {envpython} -m black --check swh [testenv:flake8] skip_install = true deps = - flake8 + flake8==4.0.1 + flake8-bugbear==22.3.23 commands = {envpython} -m flake8 [testenv:mypy] extras = testing deps = - mypy==0.920 + mypy==0.942 commands = mypy swh # build documentation outside swh-environment using the current # git HEAD of swh-docs, is executed on CI for each diff to prevent # breaking doc build [testenv:sphinx] whitelist_externals = make usedevelop = true extras = testing deps = # fetch and install swh-docs in develop mode -e git+https://forge.softwareheritage.org/source/swh-docs#egg=swh.docs setenv = SWH_PACKAGE_DOC_TOX_BUILD = 1 # turn warnings into errors SPHINXOPTS = -W commands = make -I ../.tox/sphinx/src/swh-docs/swh/ -C docs # build documentation only inside swh-environment using local state # of swh-docs package [testenv:sphinx-dev] whitelist_externals = make usedevelop = true extras = testing deps = # install swh-docs in develop mode -e ../swh-docs setenv = SWH_PACKAGE_DOC_TOX_BUILD = 1 # turn warnings into errors SPHINXOPTS = -W commands = make -I ../.tox/sphinx-dev/src/swh-docs/swh/ -C docs