Page MenuHomeSoftware Heritage

D3071.id10929.diff
No OneTemporary

D3071.id10929.diff

diff --git a/.gitignore b/.gitignore
--- a/.gitignore
+++ b/.gitignore
@@ -11,3 +11,4 @@
dist/
version.txt
.mypy_cache/
+.hypothesis/
diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -5,11 +5,17 @@
# 3rd party libraries without stubs (yet)
+[mypy-confluent_kafka.*]
+ignore_missing_imports = True
+
[mypy-pkg_resources.*]
ignore_missing_imports = True
[mypy-pytest.*]
ignore_missing_imports = True
-# [mypy-add_your_lib_here.*]
-# ignore_missing_imports = True
+[mypy-systemd.daemon.*]
+ignore_missing_imports = True
+
+[mypy-tenacity.*]
+ignore_missing_imports = True
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,2 +1,4 @@
# Add here internal Software Heritage dependencies, one per line.
-swh.core
+swh.core[http]
+swh.objstorage >= 0.0.43
+swh.journal >= 0.0.31
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,4 +1,3 @@
# Add here external Python modules dependencies, one per line. Module names
# should match https://pypi.python.org/pypi names. For the full spec or
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
-vcversioner
diff --git a/setup.py b/setup.py
--- a/setup.py
+++ b/setup.py
@@ -1,5 +1,5 @@
#!/usr/bin/env python3
-# Copyright (C) 2019 The Software Heritage developers
+# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -39,24 +39,24 @@
# Full sample:
# https://forge.softwareheritage.org/diffusion/DCORE/browse/master/setup.py
setup(
- name="swh.<module-name>", # example: swh.loader.pypi
- description="Software Heritage <Module's intent>",
+ name="swh.objstorage.replayer", # example: swh.loader.pypi
+ description="Software Heritage content replayer",
long_description=long_description,
long_description_content_type="text/markdown",
python_requires=">=3.7",
author="Software Heritage developers",
author_email="swh-devel@inria.fr",
- url="https://forge.softwareheritage.org/diffusion/<module-git-code>",
+ url="https://forge.softwareheritage.org/diffusion/swh-objstorage-replayer",
packages=find_packages(), # packages's modules
install_requires=parse_requirements() + parse_requirements("swh"),
tests_require=parse_requirements("test"),
- setup_requires=["vcversioner"],
+ setup_requires=["setuptools_scm"],
extras_require={"testing": parse_requirements("test")},
- vcversioner={},
+ use_scm_version=True,
include_package_data=True,
entry_points="""
[swh.cli.subcommands]
- <cli-name>=swh.<module>.cli:cli
+ content-replayer=swh.objstorage.replayer.cli:cli
""",
classifiers=[
"Programming Language :: Python :: 3",
diff --git a/swh/foo/bar.py b/swh/foo/bar.py
deleted file mode 100644
--- a/swh/foo/bar.py
+++ /dev/null
@@ -1,4 +0,0 @@
-# Copyright (C) 2019 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
diff --git a/swh/foo/cli.py b/swh/foo/cli.py
deleted file mode 100644
--- a/swh/foo/cli.py
+++ /dev/null
@@ -1,18 +0,0 @@
-import click
-
-from swh.core.cli import CONTEXT_SETTINGS
-
-
-@click.group(name="foo", context_settings=CONTEXT_SETTINGS)
-@click.pass_context
-def cli(ctx):
- """Foo main command.
- """
-
-
-@cli.command()
-@click.option("--bar", help="Something")
-@click.pass_context
-def bar(ctx, bar):
- """Do something."""
- click.echo("bar")
diff --git a/swh/objstorage/__init__.py b/swh/objstorage/__init__.py
new file mode 100644
--- /dev/null
+++ b/swh/objstorage/__init__.py
@@ -0,0 +1,7 @@
+from pkgutil import extend_path
+from typing import Iterable
+
+__path__: Iterable[str] = extend_path(__path__, __name__)
+
+# for BW compat
+from swh.objstorage.factory import * # noqa
diff --git a/swh/foo/__init__.py b/swh/objstorage/replayer/__init__.py
rename from swh/foo/__init__.py
rename to swh/objstorage/replayer/__init__.py
diff --git a/swh/objstorage/replayer/cli.py b/swh/objstorage/replayer/cli.py
new file mode 100644
--- /dev/null
+++ b/swh/objstorage/replayer/cli.py
@@ -0,0 +1,133 @@
+# Copyright (C) 2016-2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import functools
+import logging
+import mmap
+
+import click
+
+try:
+ from systemd.daemon import notify
+except ImportError:
+ notify = None
+
+from swh.model.model import SHA1_SIZE
+from swh.journal.client import get_journal_client
+from swh.objstorage import get_objstorage
+from swh.objstorage.cli import cli
+
+from swh.objstorage.replayer.replay import is_hash_in_bytearray
+from swh.objstorage.replayer.replay import process_replay_objects_content
+
+
+@cli.command("replay")
+@click.option(
+ "--stop-after-objects",
+ "-n",
+ default=None,
+ type=int,
+ help="Stop after processing this many objects. Default is to run forever.",
+)
+@click.option(
+ "--exclude-sha1-file",
+ default=None,
+ type=click.File("rb"),
+ help="File containing a sorted array of hashes to be excluded.",
+)
+@click.option(
+ "--check-dst/--no-check-dst",
+ default=True,
+ help="Check whether the destination contains the object before copying.",
+)
+@click.pass_context
+def content_replay(ctx, stop_after_objects, exclude_sha1_file, check_dst):
+ """Fill a destination Object Storage using a journal stream.
+
+ This is typically used for a mirror configuration, by reading a Journal
+ and retrieving objects from an existing source ObjStorage.
+
+ There can be several 'replayers' filling a given ObjStorage as long as they
+ use the same ``group-id``. You can use the ``KAFKA_GROUP_INSTANCE_ID``
+ environment variable to use KIP-345 static group membership.
+
+ This service retrieves object ids to copy from the 'content' topic. It will
+ only copy object's content if the object's description in the kafka
+ nmessage has the status:visible set.
+
+ ``--exclude-sha1-file`` may be used to exclude some hashes to speed-up the
+ replay in case many of the contents are already in the destination
+ objstorage. It must contain a concatenation of all (sha1) hashes,
+ and it must be sorted.
+ This file will not be fully loaded into memory at any given time,
+ so it can be arbitrarily large.
+
+ ``--check-dst`` sets whether the replayer should check in the destination
+ ObjStorage before copying an object. You can turn that off if you know
+ you're copying to an empty ObjStorage.
+ """
+ conf = ctx.obj["config"]
+ try:
+ objstorage_src = get_objstorage(**conf.pop("objstorage"))
+ except KeyError:
+ ctx.fail("You must have a source objstorage configured in " "your config file.")
+ try:
+ objstorage_dst = get_objstorage(**conf.pop("objstorage_dst"))
+ except KeyError:
+ ctx.fail(
+ "You must have a destination objstorage configured " "in your config file."
+ )
+
+ if exclude_sha1_file:
+ map_ = mmap.mmap(exclude_sha1_file.fileno(), 0, prot=mmap.PROT_READ)
+ if map_.size() % SHA1_SIZE != 0:
+ ctx.fail(
+ "--exclude-sha1 must link to a file whose size is an "
+ "exact multiple of %d bytes." % SHA1_SIZE
+ )
+ nb_excluded_hashes = int(map_.size() / SHA1_SIZE)
+
+ def exclude_fn(obj):
+ return is_hash_in_bytearray(obj["sha1"], map_, nb_excluded_hashes)
+
+ else:
+ exclude_fn = None
+
+ client = get_journal_client(
+ "kafka",
+ **conf["journal_client"],
+ stop_after_objects=stop_after_objects,
+ object_types=("content",),
+ )
+ worker_fn = functools.partial(
+ process_replay_objects_content,
+ src=objstorage_src,
+ dst=objstorage_dst,
+ exclude_fn=exclude_fn,
+ check_dst=check_dst,
+ )
+
+ if notify:
+ notify("READY=1")
+
+ try:
+ client.process(worker_fn)
+ except KeyboardInterrupt:
+ ctx.exit(0)
+ else:
+ print("Done.")
+ finally:
+ if notify:
+ notify("STOPPING=1")
+ client.close()
+
+
+def main():
+ logging.basicConfig()
+ return cli(auto_envvar_prefix="SWH_OBJSTORAGE")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/swh/foo/py.typed b/swh/objstorage/replayer/py.typed
rename from swh/foo/py.typed
rename to swh/objstorage/replayer/py.typed
diff --git a/swh/objstorage/replayer/replay.py b/swh/objstorage/replayer/replay.py
new file mode 100644
--- /dev/null
+++ b/swh/objstorage/replayer/replay.py
@@ -0,0 +1,303 @@
+# Copyright (C) 2019-2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import logging
+from time import time
+from typing import Callable, Dict, List, Optional
+
+from sentry_sdk import capture_exception, push_scope
+
+try:
+ from systemd.daemon import notify
+except ImportError:
+ notify = None
+
+from tenacity import (
+ retry,
+ retry_if_exception_type,
+ stop_after_attempt,
+ wait_random_exponential,
+)
+
+from swh.core.statsd import statsd
+from swh.model.hashutil import hash_to_hex
+from swh.model.model import SHA1_SIZE
+
+from swh.objstorage.objstorage import (
+ ID_HASH_ALGO,
+ ObjNotFoundError,
+ ObjStorage,
+)
+
+logger = logging.getLogger(__name__)
+
+CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total"
+CONTENT_RETRY_METRIC = "swh_content_replayer_retries_total"
+CONTENT_BYTES_METRIC = "swh_content_replayer_bytes"
+CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds"
+
+
+def is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=SHA1_SIZE):
+ """
+ Checks if the given hash is in the provided `array`. The array must be
+ a *sorted* list of sha1 hashes, and contain `nb_hashes` hashes
+ (so its size must by `nb_hashes*hash_size` bytes).
+
+ Args:
+ hash_ (bytes): the hash to look for
+ array (bytes): a sorted concatenated array of hashes (may be of
+ any type supporting slice indexing, eg. :class:`mmap.mmap`)
+ nb_hashes (int): number of hashes in the array
+ hash_size (int): size of a hash (defaults to 20, for SHA1)
+
+ Example:
+
+ >>> import os
+ >>> hash1 = os.urandom(20)
+ >>> hash2 = os.urandom(20)
+ >>> hash3 = os.urandom(20)
+ >>> array = b''.join(sorted([hash1, hash2]))
+ >>> is_hash_in_bytearray(hash1, array, 2)
+ True
+ >>> is_hash_in_bytearray(hash2, array, 2)
+ True
+ >>> is_hash_in_bytearray(hash3, array, 2)
+ False
+ """
+ if len(hash_) != hash_size:
+ raise ValueError("hash_ does not match the provided hash_size.")
+
+ def get_hash(position):
+ return array[position * hash_size : (position + 1) * hash_size]
+
+ # Regular dichotomy:
+ left = 0
+ right = nb_hashes
+ while left < right - 1:
+ middle = int((right + left) / 2)
+ pivot = get_hash(middle)
+ if pivot == hash_:
+ return True
+ elif pivot < hash_:
+ left = middle
+ else:
+ right = middle
+ return get_hash(left) == hash_
+
+
+class ReplayError(Exception):
+ """An error occurred during the replay of an object"""
+
+ def __init__(self, operation, *, obj_id, exc):
+ self.operation = operation
+ self.obj_id = hash_to_hex(obj_id)
+ self.exc = exc
+
+ def __str__(self):
+ return "ReplayError(doing %s, %s, %s)" % (self.operation, self.obj_id, self.exc)
+
+
+def log_replay_retry(retry_obj, sleep, last_result):
+ """Log a retry of the content replayer"""
+ exc = last_result.exception()
+ logger.debug(
+ "Retry operation %(operation)s on %(obj_id)s: %(exc)s",
+ {"operation": exc.operation, "obj_id": exc.obj_id, "exc": str(exc.exc)},
+ )
+
+ statsd.increment(
+ CONTENT_RETRY_METRIC,
+ tags={
+ "operation": exc.operation,
+ "attempt": str(retry_obj.statistics["attempt_number"]),
+ },
+ )
+
+
+def log_replay_error(last_attempt):
+ """Log a replay error to sentry"""
+ exc = last_attempt.exception()
+ with push_scope() as scope:
+ scope.set_tag("operation", exc.operation)
+ scope.set_extra("obj_id", exc.obj_id)
+ capture_exception(exc.exc)
+
+ logger.error(
+ "Failed operation %(operation)s on %(obj_id)s after %(retries)s"
+ " retries: %(exc)s",
+ {
+ "obj_id": exc.obj_id,
+ "operation": exc.operation,
+ "exc": str(exc.exc),
+ "retries": last_attempt.attempt_number,
+ },
+ )
+
+ return None
+
+
+CONTENT_REPLAY_RETRIES = 3
+
+content_replay_retry = retry(
+ retry=retry_if_exception_type(ReplayError),
+ stop=stop_after_attempt(CONTENT_REPLAY_RETRIES),
+ wait=wait_random_exponential(multiplier=1, max=60),
+ before_sleep=log_replay_retry,
+ retry_error_callback=log_replay_error,
+)
+
+
+@content_replay_retry
+def copy_object(obj_id, src, dst):
+ hex_obj_id = hash_to_hex(obj_id)
+ obj = ""
+ try:
+ with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "get"}):
+ obj = src.get(obj_id)
+ logger.debug("retrieved %(obj_id)s", {"obj_id": hex_obj_id})
+
+ with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "put"}):
+ dst.add(obj, obj_id=obj_id, check_presence=False)
+ logger.debug("copied %(obj_id)s", {"obj_id": hex_obj_id})
+ statsd.increment(CONTENT_BYTES_METRIC, len(obj))
+ except ObjNotFoundError:
+ logger.error(
+ "Failed to copy %(obj_id)s: object not found", {"obj_id": hex_obj_id}
+ )
+ raise
+ except Exception as exc:
+ raise ReplayError("copy", obj_id=obj_id, exc=exc) from None
+ return len(obj)
+
+
+@content_replay_retry
+def obj_in_objstorage(obj_id, dst):
+ """Check if an object is already in an objstorage, tenaciously"""
+ try:
+ return obj_id in dst
+ except Exception as exc:
+ raise ReplayError("in_dst", obj_id=obj_id, exc=exc) from None
+
+
+def process_replay_objects_content(
+ all_objects: Dict[str, List[dict]],
+ *,
+ src: ObjStorage,
+ dst: ObjStorage,
+ exclude_fn: Optional[Callable[[dict], bool]] = None,
+ check_dst: bool = True,
+):
+ """
+ Takes a list of records from Kafka (see
+ :py:func:`swh.journal.client.JournalClient.process`) and copies them
+ from the `src` objstorage to the `dst` objstorage, if:
+
+ * `obj['status']` is `'visible'`
+ * `exclude_fn(obj)` is `False` (if `exclude_fn` is provided)
+ * `obj['sha1'] not in dst` (if `check_dst` is True)
+
+ Args:
+ all_objects: Objects passed by the Kafka client. Most importantly,
+ `all_objects['content'][*]['sha1']` is the sha1 hash of each
+ content.
+ src: An object storage (see :py:func:`swh.objstorage.get_objstorage`)
+ dst: An object storage (see :py:func:`swh.objstorage.get_objstorage`)
+ exclude_fn: Determines whether an object should be copied.
+ check_dst: Determines whether we should check the destination
+ objstorage before copying.
+
+ Example:
+
+ >>> from swh.objstorage import get_objstorage
+ >>> src = get_objstorage('memory', {})
+ >>> dst = get_objstorage('memory', {})
+ >>> id1 = src.add(b'foo bar')
+ >>> id2 = src.add(b'baz qux')
+ >>> kafka_partitions = {
+ ... 'content': [
+ ... {
+ ... 'sha1': id1,
+ ... 'status': 'visible',
+ ... },
+ ... {
+ ... 'sha1': id2,
+ ... 'status': 'visible',
+ ... },
+ ... ]
+ ... }
+ >>> process_replay_objects_content(
+ ... kafka_partitions, src=src, dst=dst,
+ ... exclude_fn=lambda obj: obj['sha1'] == id1)
+ >>> id1 in dst
+ False
+ >>> id2 in dst
+ True
+ """
+ vol = []
+ nb_skipped = 0
+ nb_failures = 0
+ t0 = time()
+
+ for (object_type, objects) in all_objects.items():
+ if object_type != "content":
+ logger.warning(
+ "Received a series of %s, this should not happen", object_type
+ )
+ continue
+ for obj in objects:
+ obj_id = obj[ID_HASH_ALGO]
+ if obj["status"] != "visible":
+ nb_skipped += 1
+ logger.debug(
+ "skipped %s (status=%s)", hash_to_hex(obj_id), obj["status"]
+ )
+ statsd.increment(
+ CONTENT_OPERATIONS_METRIC,
+ tags={"decision": "skipped", "status": obj["status"]},
+ )
+ elif exclude_fn and exclude_fn(obj):
+ nb_skipped += 1
+ logger.debug("skipped %s (manually excluded)", hash_to_hex(obj_id))
+ statsd.increment(
+ CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"}
+ )
+ elif check_dst and obj_in_objstorage(obj_id, dst):
+ nb_skipped += 1
+ logger.debug("skipped %s (in dst)", hash_to_hex(obj_id))
+ statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "in_dst"})
+ else:
+ try:
+ copied = copy_object(obj_id, src, dst)
+ except ObjNotFoundError:
+ nb_skipped += 1
+ statsd.increment(
+ CONTENT_OPERATIONS_METRIC, tags={"decision": "not_in_src"}
+ )
+ else:
+ if copied is None:
+ nb_failures += 1
+ statsd.increment(
+ CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"}
+ )
+ else:
+ vol.append(copied)
+ statsd.increment(
+ CONTENT_OPERATIONS_METRIC, tags={"decision": "copied"}
+ )
+
+ dt = time() - t0
+ logger.info(
+ "processed %s content objects in %.1fsec "
+ "(%.1f obj/sec, %.1fMB/sec) - %d failed - %d skipped",
+ len(vol),
+ dt,
+ len(vol) / dt,
+ sum(vol) / 1024 / 1024 / dt,
+ nb_failures,
+ nb_skipped,
+ )
+
+ if notify:
+ notify("WATCHDOG=1")
diff --git a/swh/foo/tests/__init__.py b/swh/objstorage/replayer/tests/__init__.py
rename from swh/foo/tests/__init__.py
rename to swh/objstorage/replayer/tests/__init__.py
diff --git a/swh/objstorage/replayer/tests/test_cli.py b/swh/objstorage/replayer/tests/test_cli.py
new file mode 100644
--- /dev/null
+++ b/swh/objstorage/replayer/tests/test_cli.py
@@ -0,0 +1,563 @@
+# Copyright (C) 2019-2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from collections import Counter
+import copy
+import functools
+import logging
+import re
+import tempfile
+from subprocess import Popen
+from typing import Tuple
+from unittest.mock import patch
+
+from click.testing import CliRunner
+from confluent_kafka import Producer
+import pytest
+import yaml
+
+from swh.journal.serializers import key_to_kafka
+from swh.model.hashutil import hash_to_hex
+from swh.objstorage.backends.in_memory import InMemoryObjStorage
+from swh.objstorage.replayer.cli import cli
+from swh.objstorage.replayer.replay import CONTENT_REPLAY_RETRIES
+
+
+logger = logging.getLogger(__name__)
+
+
+CLI_CONFIG = {
+ "storage": {"cls": "memory",},
+ "objstorage": {"cls": "mocked", "name": "src",},
+ "objstorage_dst": {"cls": "mocked", "name": "dst",},
+}
+
+
+@pytest.fixture
+def monkeypatch_retry_sleep(monkeypatch):
+ from swh.objstorage.replayer.replay import copy_object, obj_in_objstorage
+
+ monkeypatch.setattr(copy_object.retry, "sleep", lambda x: None)
+ monkeypatch.setattr(obj_in_objstorage.retry, "sleep", lambda x: None)
+
+
+def _patch_objstorages(names):
+ objstorages = {name: InMemoryObjStorage() for name in names}
+
+ def get_mock_objstorage(cls, **args):
+ assert cls == "mocked", cls
+ return objstorages[args["name"]]
+
+ def decorator(f):
+ @functools.wraps(f)
+ @patch("swh.objstorage.replayer.cli.get_objstorage")
+ def newf(get_objstorage_mock, *args, **kwargs):
+ get_objstorage_mock.side_effect = get_mock_objstorage
+ f(*args, objstorages=objstorages, **kwargs)
+
+ return newf
+
+ return decorator
+
+
+def invoke(*args, env=None, journal_config=None):
+ config = copy.deepcopy(CLI_CONFIG)
+ if journal_config:
+ config["journal_client"] = journal_config
+
+ runner = CliRunner()
+ with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd:
+ yaml.dump(config, config_fd)
+ config_fd.seek(0)
+ args = ["-C" + config_fd.name] + list(args)
+ return runner.invoke(cli, args, obj={"log_level": logging.DEBUG}, env=env,)
+
+
+def test_replay_help():
+ result = invoke("replay", "--help",)
+ expected = (
+ r"^\s*Usage: objstorage replay \[OPTIONS\]\s+"
+ r"Fill a destination Object Storage.*"
+ )
+ assert result.exit_code == 0, result.output
+ assert re.match(expected, result.output, re.MULTILINE), result.output
+
+
+NUM_CONTENTS = 10
+
+
+def _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages):
+ producer = Producer(
+ {
+ "bootstrap.servers": kafka_server,
+ "client.id": "test-producer",
+ "acks": "all",
+ }
+ )
+
+ contents = {}
+ for i in range(NUM_CONTENTS):
+ content = b"\x00" * 19 + bytes([i])
+ sha1 = objstorages["src"].add(content)
+ contents[sha1] = content
+ producer.produce(
+ topic=kafka_prefix + ".content",
+ key=key_to_kafka(sha1),
+ value=key_to_kafka({"sha1": sha1, "status": "visible",}),
+ )
+
+ producer.flush()
+
+ return contents
+
+
+@_patch_objstorages(["src", "dst"])
+def test_replay_content(
+ objstorages,
+ kafka_prefix: str,
+ kafka_consumer_group: str,
+ kafka_server: Tuple[Popen, int],
+):
+
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
+
+ result = invoke(
+ "replay",
+ "--stop-after-objects",
+ str(NUM_CONTENTS),
+ journal_config={
+ "brokers": kafka_server,
+ "group_id": kafka_consumer_group,
+ "prefix": kafka_prefix,
+ },
+ )
+
+ expected = r"Done.\n"
+ assert result.exit_code == 0, result.output
+ assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
+
+ for (sha1, content) in contents.items():
+ assert sha1 in objstorages["dst"], sha1
+ assert objstorages["dst"].get(sha1) == content
+
+
+@_patch_objstorages(["src", "dst"])
+def test_replay_content_structured_log(
+ objstorages,
+ kafka_prefix: str,
+ kafka_consumer_group: str,
+ kafka_server: Tuple[Popen, int],
+ caplog,
+):
+
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
+
+ caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay")
+
+ expected_obj_ids = set(hash_to_hex(sha1) for sha1 in contents)
+
+ result = invoke(
+ "replay",
+ "--stop-after-objects",
+ str(NUM_CONTENTS),
+ journal_config={
+ "brokers": kafka_server,
+ "group_id": kafka_consumer_group,
+ "prefix": kafka_prefix,
+ },
+ )
+ expected = r"Done.\n"
+ assert result.exit_code == 0, result.output
+ assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
+
+ copied = set()
+ for record in caplog.records:
+ logtext = record.getMessage()
+ if "copied" in logtext:
+ copied.add(record.args["obj_id"])
+
+ assert (
+ copied == expected_obj_ids
+ ), "Mismatched logging; see captured log output for details."
+
+
+@_patch_objstorages(["src", "dst"])
+def test_replay_content_static_group_id(
+ objstorages,
+ storage,
+ kafka_prefix: str,
+ kafka_consumer_group: str,
+ kafka_server: Tuple[Popen, int],
+ caplog,
+):
+
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
+
+ # Setup log capture to fish the consumer settings out of the log messages
+ caplog.set_level(logging.DEBUG, "swh.journal.client")
+
+ result = invoke(
+ "replay",
+ "--stop-after-objects",
+ str(NUM_CONTENTS),
+ env={"KAFKA_GROUP_INSTANCE_ID": "static-group-instance-id"},
+ journal_config={
+ "brokers": kafka_server,
+ "group_id": kafka_consumer_group,
+ "prefix": kafka_prefix,
+ },
+ )
+ expected = r"Done.\n"
+ assert result.exit_code == 0, result.output
+ assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
+
+ consumer_settings = None
+ for record in caplog.records:
+ if "Consumer settings" in record.message:
+ consumer_settings = record.args
+ break
+
+ assert consumer_settings is not None, (
+ "Failed to get consumer settings out of the consumer log. "
+ "See log capture for details."
+ )
+ assert consumer_settings["group.instance.id"] == "static-group-instance-id"
+ assert consumer_settings["session.timeout.ms"] == 60 * 10 * 1000
+ assert consumer_settings["max.poll.interval.ms"] == 90 * 10 * 1000
+
+ for (sha1, content) in contents.items():
+ assert sha1 in objstorages["dst"], sha1
+ assert objstorages["dst"].get(sha1) == content
+
+
+@_patch_objstorages(["src", "dst"])
+def test_replay_content_exclude(
+ objstorages,
+ storage,
+ kafka_prefix: str,
+ kafka_consumer_group: str,
+ kafka_server: Tuple[Popen, int],
+):
+
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
+
+ excluded_contents = list(contents)[0::2] # picking half of them
+ with tempfile.NamedTemporaryFile(mode="w+b") as fd:
+ fd.write(b"".join(sorted(excluded_contents)))
+
+ fd.seek(0)
+
+ result = invoke(
+ "replay",
+ "--stop-after-objects",
+ str(NUM_CONTENTS),
+ "--exclude-sha1-file",
+ fd.name,
+ journal_config={
+ "brokers": kafka_server,
+ "group_id": kafka_consumer_group,
+ "prefix": kafka_prefix,
+ },
+ )
+ expected = r"Done.\n"
+ assert result.exit_code == 0, result.output
+ assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
+
+ for (sha1, content) in contents.items():
+ if sha1 in excluded_contents:
+ assert sha1 not in objstorages["dst"], sha1
+ else:
+ assert sha1 in objstorages["dst"], sha1
+ assert objstorages["dst"].get(sha1) == content
+
+
+NUM_CONTENTS_DST = 5
+
+
+@_patch_objstorages(["src", "dst"])
+@pytest.mark.parametrize(
+ "check_dst,expected_copied,expected_in_dst",
+ [
+ (True, NUM_CONTENTS - NUM_CONTENTS_DST, NUM_CONTENTS_DST),
+ (False, NUM_CONTENTS, 0),
+ ],
+)
+def test_replay_content_check_dst(
+ objstorages,
+ storage,
+ kafka_prefix: str,
+ kafka_consumer_group: str,
+ kafka_server: Tuple[Popen, int],
+ check_dst: bool,
+ expected_copied: int,
+ expected_in_dst: int,
+ caplog,
+):
+
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
+
+ for i, (sha1, content) in enumerate(contents.items()):
+ if i >= NUM_CONTENTS_DST:
+ break
+
+ objstorages["dst"].add(content, obj_id=sha1)
+
+ caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay")
+
+ result = invoke(
+ "replay",
+ "--stop-after-objects",
+ str(NUM_CONTENTS),
+ "--check-dst" if check_dst else "--no-check-dst",
+ journal_config={
+ "brokers": kafka_server,
+ "group_id": kafka_consumer_group,
+ "prefix": kafka_prefix,
+ },
+ )
+ expected = r"Done.\n"
+ assert result.exit_code == 0, result.output
+ assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
+
+ copied = 0
+ in_dst = 0
+ for record in caplog.records:
+ logtext = record.getMessage()
+ if "copied" in logtext:
+ copied += 1
+ elif "in dst" in logtext:
+ in_dst += 1
+
+ assert (
+ copied == expected_copied and in_dst == expected_in_dst
+ ), "Unexpected amount of objects copied, see the captured log for details"
+
+ for (sha1, content) in contents.items():
+ assert sha1 in objstorages["dst"], sha1
+ assert objstorages["dst"].get(sha1) == content
+
+
+class FlakyObjStorage(InMemoryObjStorage):
+ def __init__(self, *args, **kwargs):
+ state = kwargs.pop("state")
+ self.failures_left = Counter(kwargs.pop("failures"))
+ super().__init__(*args, **kwargs)
+ if state:
+ self.state = state
+
+ def flaky_operation(self, op, obj_id):
+ if self.failures_left[op, obj_id] > 0:
+ self.failures_left[op, obj_id] -= 1
+ raise RuntimeError("Failed %s on %s" % (op, hash_to_hex(obj_id)))
+
+ def get(self, obj_id):
+ self.flaky_operation("get", obj_id)
+ return super().get(obj_id)
+
+ def add(self, data, obj_id=None, check_presence=True):
+ self.flaky_operation("add", obj_id)
+ return super().add(data, obj_id=obj_id, check_presence=check_presence)
+
+ def __contains__(self, obj_id):
+ self.flaky_operation("in", obj_id)
+ return super().__contains__(obj_id)
+
+
+@_patch_objstorages(["src", "dst"])
+def test_replay_content_check_dst_retry(
+ objstorages,
+ storage,
+ kafka_prefix: str,
+ kafka_consumer_group: str,
+ kafka_server: Tuple[Popen, int],
+ monkeypatch_retry_sleep,
+):
+
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
+
+ failures = {}
+ for i, (sha1, content) in enumerate(contents.items()):
+ if i >= NUM_CONTENTS_DST:
+ break
+
+ objstorages["dst"].add(content, obj_id=sha1)
+ failures["in", sha1] = 1
+
+ orig_dst = objstorages["dst"]
+ objstorages["dst"] = FlakyObjStorage(state=orig_dst.state, failures=failures)
+
+ result = invoke(
+ "replay",
+ "--stop-after-objects",
+ str(NUM_CONTENTS),
+ "--check-dst",
+ journal_config={
+ "brokers": kafka_server,
+ "group_id": kafka_consumer_group,
+ "prefix": kafka_prefix,
+ },
+ )
+ expected = r"Done.\n"
+ assert result.exit_code == 0, result.output
+ assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
+
+ for (sha1, content) in contents.items():
+ assert sha1 in objstorages["dst"], sha1
+ assert objstorages["dst"].get(sha1) == content
+
+
+@_patch_objstorages(["src", "dst"])
+def test_replay_content_failed_copy_retry(
+ objstorages,
+ storage,
+ kafka_prefix: str,
+ kafka_consumer_group: str,
+ kafka_server: Tuple[Popen, int],
+ caplog,
+ monkeypatch_retry_sleep,
+):
+
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
+
+ add_failures = {}
+ get_failures = {}
+ definitely_failed = set()
+
+ # We want to generate operations failing 1 to CONTENT_REPLAY_RETRIES times.
+ # We generate failures for 2 different operations, get and add.
+ num_retry_contents = 2 * CONTENT_REPLAY_RETRIES
+
+ assert (
+ num_retry_contents < NUM_CONTENTS
+ ), "Need to generate more test contents to properly test retry behavior"
+
+ for i, sha1 in enumerate(contents):
+ if i >= num_retry_contents:
+ break
+
+ # This generates a number of failures, up to CONTENT_REPLAY_RETRIES
+ num_failures = (i % CONTENT_REPLAY_RETRIES) + 1
+
+ # This generates failures of add for the first CONTENT_REPLAY_RETRIES
+ # objects, then failures of get.
+ if i < CONTENT_REPLAY_RETRIES:
+ add_failures["add", sha1] = num_failures
+ else:
+ get_failures["get", sha1] = num_failures
+
+ # Only contents that have CONTENT_REPLAY_RETRIES or more are
+ # definitely failing
+ if num_failures >= CONTENT_REPLAY_RETRIES:
+ definitely_failed.add(hash_to_hex(sha1))
+
+ objstorages["dst"] = FlakyObjStorage(
+ state=objstorages["dst"].state, failures=add_failures,
+ )
+ objstorages["src"] = FlakyObjStorage(
+ state=objstorages["src"].state, failures=get_failures,
+ )
+
+ caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay")
+
+ result = invoke(
+ "replay",
+ "--stop-after-objects",
+ str(NUM_CONTENTS),
+ journal_config={
+ "brokers": kafka_server,
+ "group_id": kafka_consumer_group,
+ "prefix": kafka_prefix,
+ },
+ )
+ expected = r"Done.\n"
+ assert result.exit_code == 0, result.output
+ assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
+
+ copied = 0
+ actually_failed = set()
+ for record in caplog.records:
+ logtext = record.getMessage()
+ if "copied" in logtext:
+ copied += 1
+ elif "Failed operation" in logtext:
+ assert record.levelno == logging.ERROR
+ assert record.args["retries"] == CONTENT_REPLAY_RETRIES
+ actually_failed.add(record.args["obj_id"])
+
+ assert (
+ actually_failed == definitely_failed
+ ), "Unexpected object copy failures; see captured log for details"
+
+ for (sha1, content) in contents.items():
+ if hash_to_hex(sha1) in definitely_failed:
+ assert sha1 not in objstorages["dst"]
+ continue
+
+ assert sha1 in objstorages["dst"], sha1
+ assert objstorages["dst"].get(sha1) == content
+
+
+@_patch_objstorages(["src", "dst"])
+def test_replay_content_objnotfound(
+ objstorages,
+ storage,
+ kafka_prefix: str,
+ kafka_consumer_group: str,
+ kafka_server: Tuple[Popen, int],
+ caplog,
+):
+
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
+
+ num_contents_deleted = 5
+ contents_deleted = set()
+
+ for i, sha1 in enumerate(contents):
+ if i >= num_contents_deleted:
+ break
+
+ del objstorages["src"].state[sha1]
+ contents_deleted.add(hash_to_hex(sha1))
+
+ caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay")
+
+ result = invoke(
+ "replay",
+ "--stop-after-objects",
+ str(NUM_CONTENTS),
+ journal_config={
+ "brokers": kafka_server,
+ "group_id": kafka_consumer_group,
+ "prefix": kafka_prefix,
+ },
+ )
+ expected = r"Done.\n"
+ assert result.exit_code == 0, result.output
+ assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
+
+ copied = 0
+ not_in_src = set()
+ for record in caplog.records:
+ logtext = record.getMessage()
+ if "copied" in logtext:
+ copied += 1
+ elif "object not found" in logtext:
+ # Check that the object id can be recovered from logs
+ assert record.levelno == logging.ERROR
+ not_in_src.add(record.args["obj_id"])
+
+ assert (
+ copied == NUM_CONTENTS - num_contents_deleted
+ ), "Unexpected number of contents copied"
+
+ assert (
+ not_in_src == contents_deleted
+ ), "Mismatch between deleted contents and not_in_src logs"
+
+ for (sha1, content) in contents.items():
+ if sha1 not in objstorages["src"]:
+ continue
+ assert sha1 in objstorages["dst"], sha1
+ assert objstorages["dst"].get(sha1) == content
diff --git a/swh/objstorage/replayer/tests/test_replay.py b/swh/objstorage/replayer/tests/test_replay.py
new file mode 100644
--- /dev/null
+++ b/swh/objstorage/replayer/tests/test_replay.py
@@ -0,0 +1,71 @@
+# Copyright (C) 2019-2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import functools
+
+from hypothesis import given, settings, HealthCheck
+from hypothesis.strategies import lists, sets, binary
+
+from swh.model.hypothesis_strategies import present_contents
+from swh.objstorage import get_objstorage
+from swh.objstorage.replayer.replay import (
+ is_hash_in_bytearray,
+ process_replay_objects_content,
+)
+from swh.journal.tests.utils import MockedJournalClient, MockedKafkaWriter
+
+
+hash_strategy = binary(min_size=20, max_size=20)
+
+
+@settings(max_examples=500)
+@given(
+ sets(hash_strategy, min_size=0, max_size=500), sets(hash_strategy, min_size=10),
+)
+def test_is_hash_in_bytearray(haystack, needles):
+ array = b"".join(sorted(haystack))
+ needles |= haystack # Exhaustively test for all objects in the array
+ for needle in needles:
+ assert is_hash_in_bytearray(needle, array, len(haystack)) == (
+ needle in haystack
+ )
+
+
+@given(lists(present_contents(), min_size=1))
+@settings(suppress_health_check=[HealthCheck.too_slow])
+def test_replay_content(objects):
+
+ queue = []
+ replayer = MockedJournalClient(queue)
+ writer = MockedKafkaWriter(queue)
+
+ objstorage1 = get_objstorage(cls="memory", args={})
+ objstorage2 = get_objstorage(cls="memory", args={})
+
+ contents = []
+ for obj in objects:
+ objstorage1.add(obj.data)
+ contents.append(obj)
+ writer.write_addition("content", obj)
+
+ # Bail out early if we didn't insert any relevant objects...
+ queue_size = len(queue)
+ assert queue_size != 0, "No test objects found; hypothesis strategy bug?"
+
+ assert replayer.stop_after_objects is None
+ replayer.stop_after_objects = queue_size
+
+ worker_fn = functools.partial(
+ process_replay_objects_content, src=objstorage1, dst=objstorage2
+ )
+
+ replayer.process(worker_fn)
+
+ # only content with status visible will be copied in storage2
+ expected_objstorage_state = {
+ c.sha1: c.data for c in contents if c.status == "visible"
+ }
+
+ assert expected_objstorage_state == objstorage2.state

File Metadata

Mime Type
text/plain
Expires
Sun, Aug 24, 5:58 PM (1 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3221907

Event Timeline