Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9749576
D3071.id10929.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
38 KB
Subscribers
None
D3071.id10929.diff
View Options
diff --git a/.gitignore b/.gitignore
--- a/.gitignore
+++ b/.gitignore
@@ -11,3 +11,4 @@
dist/
version.txt
.mypy_cache/
+.hypothesis/
diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -5,11 +5,17 @@
# 3rd party libraries without stubs (yet)
+[mypy-confluent_kafka.*]
+ignore_missing_imports = True
+
[mypy-pkg_resources.*]
ignore_missing_imports = True
[mypy-pytest.*]
ignore_missing_imports = True
-# [mypy-add_your_lib_here.*]
-# ignore_missing_imports = True
+[mypy-systemd.daemon.*]
+ignore_missing_imports = True
+
+[mypy-tenacity.*]
+ignore_missing_imports = True
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,2 +1,4 @@
# Add here internal Software Heritage dependencies, one per line.
-swh.core
+swh.core[http]
+swh.objstorage >= 0.0.43
+swh.journal >= 0.0.31
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,4 +1,3 @@
# Add here external Python modules dependencies, one per line. Module names
# should match https://pypi.python.org/pypi names. For the full spec or
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
-vcversioner
diff --git a/setup.py b/setup.py
--- a/setup.py
+++ b/setup.py
@@ -1,5 +1,5 @@
#!/usr/bin/env python3
-# Copyright (C) 2019 The Software Heritage developers
+# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -39,24 +39,24 @@
# Full sample:
# https://forge.softwareheritage.org/diffusion/DCORE/browse/master/setup.py
setup(
- name="swh.<module-name>", # example: swh.loader.pypi
- description="Software Heritage <Module's intent>",
+ name="swh.objstorage.replayer", # example: swh.loader.pypi
+ description="Software Heritage content replayer",
long_description=long_description,
long_description_content_type="text/markdown",
python_requires=">=3.7",
author="Software Heritage developers",
author_email="swh-devel@inria.fr",
- url="https://forge.softwareheritage.org/diffusion/<module-git-code>",
+ url="https://forge.softwareheritage.org/diffusion/swh-objstorage-replayer",
packages=find_packages(), # packages's modules
install_requires=parse_requirements() + parse_requirements("swh"),
tests_require=parse_requirements("test"),
- setup_requires=["vcversioner"],
+ setup_requires=["setuptools_scm"],
extras_require={"testing": parse_requirements("test")},
- vcversioner={},
+ use_scm_version=True,
include_package_data=True,
entry_points="""
[swh.cli.subcommands]
- <cli-name>=swh.<module>.cli:cli
+ content-replayer=swh.objstorage.replayer.cli:cli
""",
classifiers=[
"Programming Language :: Python :: 3",
diff --git a/swh/foo/bar.py b/swh/foo/bar.py
deleted file mode 100644
--- a/swh/foo/bar.py
+++ /dev/null
@@ -1,4 +0,0 @@
-# Copyright (C) 2019 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
diff --git a/swh/foo/cli.py b/swh/foo/cli.py
deleted file mode 100644
--- a/swh/foo/cli.py
+++ /dev/null
@@ -1,18 +0,0 @@
-import click
-
-from swh.core.cli import CONTEXT_SETTINGS
-
-
-@click.group(name="foo", context_settings=CONTEXT_SETTINGS)
-@click.pass_context
-def cli(ctx):
- """Foo main command.
- """
-
-
-@cli.command()
-@click.option("--bar", help="Something")
-@click.pass_context
-def bar(ctx, bar):
- """Do something."""
- click.echo("bar")
diff --git a/swh/objstorage/__init__.py b/swh/objstorage/__init__.py
new file mode 100644
--- /dev/null
+++ b/swh/objstorage/__init__.py
@@ -0,0 +1,7 @@
+from pkgutil import extend_path
+from typing import Iterable
+
+__path__: Iterable[str] = extend_path(__path__, __name__)
+
+# for BW compat
+from swh.objstorage.factory import * # noqa
diff --git a/swh/foo/__init__.py b/swh/objstorage/replayer/__init__.py
rename from swh/foo/__init__.py
rename to swh/objstorage/replayer/__init__.py
diff --git a/swh/objstorage/replayer/cli.py b/swh/objstorage/replayer/cli.py
new file mode 100644
--- /dev/null
+++ b/swh/objstorage/replayer/cli.py
@@ -0,0 +1,133 @@
+# Copyright (C) 2016-2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import functools
+import logging
+import mmap
+
+import click
+
+try:
+ from systemd.daemon import notify
+except ImportError:
+ notify = None
+
+from swh.model.model import SHA1_SIZE
+from swh.journal.client import get_journal_client
+from swh.objstorage import get_objstorage
+from swh.objstorage.cli import cli
+
+from swh.objstorage.replayer.replay import is_hash_in_bytearray
+from swh.objstorage.replayer.replay import process_replay_objects_content
+
+
+@cli.command("replay")
+@click.option(
+ "--stop-after-objects",
+ "-n",
+ default=None,
+ type=int,
+ help="Stop after processing this many objects. Default is to run forever.",
+)
+@click.option(
+ "--exclude-sha1-file",
+ default=None,
+ type=click.File("rb"),
+ help="File containing a sorted array of hashes to be excluded.",
+)
+@click.option(
+ "--check-dst/--no-check-dst",
+ default=True,
+ help="Check whether the destination contains the object before copying.",
+)
+@click.pass_context
+def content_replay(ctx, stop_after_objects, exclude_sha1_file, check_dst):
+ """Fill a destination Object Storage using a journal stream.
+
+ This is typically used for a mirror configuration, by reading a Journal
+ and retrieving objects from an existing source ObjStorage.
+
+ There can be several 'replayers' filling a given ObjStorage as long as they
+ use the same ``group-id``. You can use the ``KAFKA_GROUP_INSTANCE_ID``
+ environment variable to use KIP-345 static group membership.
+
+ This service retrieves object ids to copy from the 'content' topic. It will
+ only copy object's content if the object's description in the kafka
+ nmessage has the status:visible set.
+
+ ``--exclude-sha1-file`` may be used to exclude some hashes to speed-up the
+ replay in case many of the contents are already in the destination
+ objstorage. It must contain a concatenation of all (sha1) hashes,
+ and it must be sorted.
+ This file will not be fully loaded into memory at any given time,
+ so it can be arbitrarily large.
+
+ ``--check-dst`` sets whether the replayer should check in the destination
+ ObjStorage before copying an object. You can turn that off if you know
+ you're copying to an empty ObjStorage.
+ """
+ conf = ctx.obj["config"]
+ try:
+ objstorage_src = get_objstorage(**conf.pop("objstorage"))
+ except KeyError:
+ ctx.fail("You must have a source objstorage configured in " "your config file.")
+ try:
+ objstorage_dst = get_objstorage(**conf.pop("objstorage_dst"))
+ except KeyError:
+ ctx.fail(
+ "You must have a destination objstorage configured " "in your config file."
+ )
+
+ if exclude_sha1_file:
+ map_ = mmap.mmap(exclude_sha1_file.fileno(), 0, prot=mmap.PROT_READ)
+ if map_.size() % SHA1_SIZE != 0:
+ ctx.fail(
+ "--exclude-sha1 must link to a file whose size is an "
+ "exact multiple of %d bytes." % SHA1_SIZE
+ )
+ nb_excluded_hashes = int(map_.size() / SHA1_SIZE)
+
+ def exclude_fn(obj):
+ return is_hash_in_bytearray(obj["sha1"], map_, nb_excluded_hashes)
+
+ else:
+ exclude_fn = None
+
+ client = get_journal_client(
+ "kafka",
+ **conf["journal_client"],
+ stop_after_objects=stop_after_objects,
+ object_types=("content",),
+ )
+ worker_fn = functools.partial(
+ process_replay_objects_content,
+ src=objstorage_src,
+ dst=objstorage_dst,
+ exclude_fn=exclude_fn,
+ check_dst=check_dst,
+ )
+
+ if notify:
+ notify("READY=1")
+
+ try:
+ client.process(worker_fn)
+ except KeyboardInterrupt:
+ ctx.exit(0)
+ else:
+ print("Done.")
+ finally:
+ if notify:
+ notify("STOPPING=1")
+ client.close()
+
+
+def main():
+ logging.basicConfig()
+ return cli(auto_envvar_prefix="SWH_OBJSTORAGE")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/swh/foo/py.typed b/swh/objstorage/replayer/py.typed
rename from swh/foo/py.typed
rename to swh/objstorage/replayer/py.typed
diff --git a/swh/objstorage/replayer/replay.py b/swh/objstorage/replayer/replay.py
new file mode 100644
--- /dev/null
+++ b/swh/objstorage/replayer/replay.py
@@ -0,0 +1,303 @@
+# Copyright (C) 2019-2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import logging
+from time import time
+from typing import Callable, Dict, List, Optional
+
+from sentry_sdk import capture_exception, push_scope
+
+try:
+ from systemd.daemon import notify
+except ImportError:
+ notify = None
+
+from tenacity import (
+ retry,
+ retry_if_exception_type,
+ stop_after_attempt,
+ wait_random_exponential,
+)
+
+from swh.core.statsd import statsd
+from swh.model.hashutil import hash_to_hex
+from swh.model.model import SHA1_SIZE
+
+from swh.objstorage.objstorage import (
+ ID_HASH_ALGO,
+ ObjNotFoundError,
+ ObjStorage,
+)
+
+logger = logging.getLogger(__name__)
+
+CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total"
+CONTENT_RETRY_METRIC = "swh_content_replayer_retries_total"
+CONTENT_BYTES_METRIC = "swh_content_replayer_bytes"
+CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds"
+
+
+def is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=SHA1_SIZE):
+ """
+ Checks if the given hash is in the provided `array`. The array must be
+ a *sorted* list of sha1 hashes, and contain `nb_hashes` hashes
+ (so its size must by `nb_hashes*hash_size` bytes).
+
+ Args:
+ hash_ (bytes): the hash to look for
+ array (bytes): a sorted concatenated array of hashes (may be of
+ any type supporting slice indexing, eg. :class:`mmap.mmap`)
+ nb_hashes (int): number of hashes in the array
+ hash_size (int): size of a hash (defaults to 20, for SHA1)
+
+ Example:
+
+ >>> import os
+ >>> hash1 = os.urandom(20)
+ >>> hash2 = os.urandom(20)
+ >>> hash3 = os.urandom(20)
+ >>> array = b''.join(sorted([hash1, hash2]))
+ >>> is_hash_in_bytearray(hash1, array, 2)
+ True
+ >>> is_hash_in_bytearray(hash2, array, 2)
+ True
+ >>> is_hash_in_bytearray(hash3, array, 2)
+ False
+ """
+ if len(hash_) != hash_size:
+ raise ValueError("hash_ does not match the provided hash_size.")
+
+ def get_hash(position):
+ return array[position * hash_size : (position + 1) * hash_size]
+
+ # Regular dichotomy:
+ left = 0
+ right = nb_hashes
+ while left < right - 1:
+ middle = int((right + left) / 2)
+ pivot = get_hash(middle)
+ if pivot == hash_:
+ return True
+ elif pivot < hash_:
+ left = middle
+ else:
+ right = middle
+ return get_hash(left) == hash_
+
+
+class ReplayError(Exception):
+ """An error occurred during the replay of an object"""
+
+ def __init__(self, operation, *, obj_id, exc):
+ self.operation = operation
+ self.obj_id = hash_to_hex(obj_id)
+ self.exc = exc
+
+ def __str__(self):
+ return "ReplayError(doing %s, %s, %s)" % (self.operation, self.obj_id, self.exc)
+
+
+def log_replay_retry(retry_obj, sleep, last_result):
+ """Log a retry of the content replayer"""
+ exc = last_result.exception()
+ logger.debug(
+ "Retry operation %(operation)s on %(obj_id)s: %(exc)s",
+ {"operation": exc.operation, "obj_id": exc.obj_id, "exc": str(exc.exc)},
+ )
+
+ statsd.increment(
+ CONTENT_RETRY_METRIC,
+ tags={
+ "operation": exc.operation,
+ "attempt": str(retry_obj.statistics["attempt_number"]),
+ },
+ )
+
+
+def log_replay_error(last_attempt):
+ """Log a replay error to sentry"""
+ exc = last_attempt.exception()
+ with push_scope() as scope:
+ scope.set_tag("operation", exc.operation)
+ scope.set_extra("obj_id", exc.obj_id)
+ capture_exception(exc.exc)
+
+ logger.error(
+ "Failed operation %(operation)s on %(obj_id)s after %(retries)s"
+ " retries: %(exc)s",
+ {
+ "obj_id": exc.obj_id,
+ "operation": exc.operation,
+ "exc": str(exc.exc),
+ "retries": last_attempt.attempt_number,
+ },
+ )
+
+ return None
+
+
+CONTENT_REPLAY_RETRIES = 3
+
+content_replay_retry = retry(
+ retry=retry_if_exception_type(ReplayError),
+ stop=stop_after_attempt(CONTENT_REPLAY_RETRIES),
+ wait=wait_random_exponential(multiplier=1, max=60),
+ before_sleep=log_replay_retry,
+ retry_error_callback=log_replay_error,
+)
+
+
+@content_replay_retry
+def copy_object(obj_id, src, dst):
+ hex_obj_id = hash_to_hex(obj_id)
+ obj = ""
+ try:
+ with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "get"}):
+ obj = src.get(obj_id)
+ logger.debug("retrieved %(obj_id)s", {"obj_id": hex_obj_id})
+
+ with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "put"}):
+ dst.add(obj, obj_id=obj_id, check_presence=False)
+ logger.debug("copied %(obj_id)s", {"obj_id": hex_obj_id})
+ statsd.increment(CONTENT_BYTES_METRIC, len(obj))
+ except ObjNotFoundError:
+ logger.error(
+ "Failed to copy %(obj_id)s: object not found", {"obj_id": hex_obj_id}
+ )
+ raise
+ except Exception as exc:
+ raise ReplayError("copy", obj_id=obj_id, exc=exc) from None
+ return len(obj)
+
+
+@content_replay_retry
+def obj_in_objstorage(obj_id, dst):
+ """Check if an object is already in an objstorage, tenaciously"""
+ try:
+ return obj_id in dst
+ except Exception as exc:
+ raise ReplayError("in_dst", obj_id=obj_id, exc=exc) from None
+
+
+def process_replay_objects_content(
+ all_objects: Dict[str, List[dict]],
+ *,
+ src: ObjStorage,
+ dst: ObjStorage,
+ exclude_fn: Optional[Callable[[dict], bool]] = None,
+ check_dst: bool = True,
+):
+ """
+ Takes a list of records from Kafka (see
+ :py:func:`swh.journal.client.JournalClient.process`) and copies them
+ from the `src` objstorage to the `dst` objstorage, if:
+
+ * `obj['status']` is `'visible'`
+ * `exclude_fn(obj)` is `False` (if `exclude_fn` is provided)
+ * `obj['sha1'] not in dst` (if `check_dst` is True)
+
+ Args:
+ all_objects: Objects passed by the Kafka client. Most importantly,
+ `all_objects['content'][*]['sha1']` is the sha1 hash of each
+ content.
+ src: An object storage (see :py:func:`swh.objstorage.get_objstorage`)
+ dst: An object storage (see :py:func:`swh.objstorage.get_objstorage`)
+ exclude_fn: Determines whether an object should be copied.
+ check_dst: Determines whether we should check the destination
+ objstorage before copying.
+
+ Example:
+
+ >>> from swh.objstorage import get_objstorage
+ >>> src = get_objstorage('memory', {})
+ >>> dst = get_objstorage('memory', {})
+ >>> id1 = src.add(b'foo bar')
+ >>> id2 = src.add(b'baz qux')
+ >>> kafka_partitions = {
+ ... 'content': [
+ ... {
+ ... 'sha1': id1,
+ ... 'status': 'visible',
+ ... },
+ ... {
+ ... 'sha1': id2,
+ ... 'status': 'visible',
+ ... },
+ ... ]
+ ... }
+ >>> process_replay_objects_content(
+ ... kafka_partitions, src=src, dst=dst,
+ ... exclude_fn=lambda obj: obj['sha1'] == id1)
+ >>> id1 in dst
+ False
+ >>> id2 in dst
+ True
+ """
+ vol = []
+ nb_skipped = 0
+ nb_failures = 0
+ t0 = time()
+
+ for (object_type, objects) in all_objects.items():
+ if object_type != "content":
+ logger.warning(
+ "Received a series of %s, this should not happen", object_type
+ )
+ continue
+ for obj in objects:
+ obj_id = obj[ID_HASH_ALGO]
+ if obj["status"] != "visible":
+ nb_skipped += 1
+ logger.debug(
+ "skipped %s (status=%s)", hash_to_hex(obj_id), obj["status"]
+ )
+ statsd.increment(
+ CONTENT_OPERATIONS_METRIC,
+ tags={"decision": "skipped", "status": obj["status"]},
+ )
+ elif exclude_fn and exclude_fn(obj):
+ nb_skipped += 1
+ logger.debug("skipped %s (manually excluded)", hash_to_hex(obj_id))
+ statsd.increment(
+ CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"}
+ )
+ elif check_dst and obj_in_objstorage(obj_id, dst):
+ nb_skipped += 1
+ logger.debug("skipped %s (in dst)", hash_to_hex(obj_id))
+ statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "in_dst"})
+ else:
+ try:
+ copied = copy_object(obj_id, src, dst)
+ except ObjNotFoundError:
+ nb_skipped += 1
+ statsd.increment(
+ CONTENT_OPERATIONS_METRIC, tags={"decision": "not_in_src"}
+ )
+ else:
+ if copied is None:
+ nb_failures += 1
+ statsd.increment(
+ CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"}
+ )
+ else:
+ vol.append(copied)
+ statsd.increment(
+ CONTENT_OPERATIONS_METRIC, tags={"decision": "copied"}
+ )
+
+ dt = time() - t0
+ logger.info(
+ "processed %s content objects in %.1fsec "
+ "(%.1f obj/sec, %.1fMB/sec) - %d failed - %d skipped",
+ len(vol),
+ dt,
+ len(vol) / dt,
+ sum(vol) / 1024 / 1024 / dt,
+ nb_failures,
+ nb_skipped,
+ )
+
+ if notify:
+ notify("WATCHDOG=1")
diff --git a/swh/foo/tests/__init__.py b/swh/objstorage/replayer/tests/__init__.py
rename from swh/foo/tests/__init__.py
rename to swh/objstorage/replayer/tests/__init__.py
diff --git a/swh/objstorage/replayer/tests/test_cli.py b/swh/objstorage/replayer/tests/test_cli.py
new file mode 100644
--- /dev/null
+++ b/swh/objstorage/replayer/tests/test_cli.py
@@ -0,0 +1,563 @@
+# Copyright (C) 2019-2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from collections import Counter
+import copy
+import functools
+import logging
+import re
+import tempfile
+from subprocess import Popen
+from typing import Tuple
+from unittest.mock import patch
+
+from click.testing import CliRunner
+from confluent_kafka import Producer
+import pytest
+import yaml
+
+from swh.journal.serializers import key_to_kafka
+from swh.model.hashutil import hash_to_hex
+from swh.objstorage.backends.in_memory import InMemoryObjStorage
+from swh.objstorage.replayer.cli import cli
+from swh.objstorage.replayer.replay import CONTENT_REPLAY_RETRIES
+
+
+logger = logging.getLogger(__name__)
+
+
+CLI_CONFIG = {
+ "storage": {"cls": "memory",},
+ "objstorage": {"cls": "mocked", "name": "src",},
+ "objstorage_dst": {"cls": "mocked", "name": "dst",},
+}
+
+
+@pytest.fixture
+def monkeypatch_retry_sleep(monkeypatch):
+ from swh.objstorage.replayer.replay import copy_object, obj_in_objstorage
+
+ monkeypatch.setattr(copy_object.retry, "sleep", lambda x: None)
+ monkeypatch.setattr(obj_in_objstorage.retry, "sleep", lambda x: None)
+
+
+def _patch_objstorages(names):
+ objstorages = {name: InMemoryObjStorage() for name in names}
+
+ def get_mock_objstorage(cls, **args):
+ assert cls == "mocked", cls
+ return objstorages[args["name"]]
+
+ def decorator(f):
+ @functools.wraps(f)
+ @patch("swh.objstorage.replayer.cli.get_objstorage")
+ def newf(get_objstorage_mock, *args, **kwargs):
+ get_objstorage_mock.side_effect = get_mock_objstorage
+ f(*args, objstorages=objstorages, **kwargs)
+
+ return newf
+
+ return decorator
+
+
+def invoke(*args, env=None, journal_config=None):
+ config = copy.deepcopy(CLI_CONFIG)
+ if journal_config:
+ config["journal_client"] = journal_config
+
+ runner = CliRunner()
+ with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd:
+ yaml.dump(config, config_fd)
+ config_fd.seek(0)
+ args = ["-C" + config_fd.name] + list(args)
+ return runner.invoke(cli, args, obj={"log_level": logging.DEBUG}, env=env,)
+
+
+def test_replay_help():
+ result = invoke("replay", "--help",)
+ expected = (
+ r"^\s*Usage: objstorage replay \[OPTIONS\]\s+"
+ r"Fill a destination Object Storage.*"
+ )
+ assert result.exit_code == 0, result.output
+ assert re.match(expected, result.output, re.MULTILINE), result.output
+
+
+NUM_CONTENTS = 10
+
+
+def _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages):
+ producer = Producer(
+ {
+ "bootstrap.servers": kafka_server,
+ "client.id": "test-producer",
+ "acks": "all",
+ }
+ )
+
+ contents = {}
+ for i in range(NUM_CONTENTS):
+ content = b"\x00" * 19 + bytes([i])
+ sha1 = objstorages["src"].add(content)
+ contents[sha1] = content
+ producer.produce(
+ topic=kafka_prefix + ".content",
+ key=key_to_kafka(sha1),
+ value=key_to_kafka({"sha1": sha1, "status": "visible",}),
+ )
+
+ producer.flush()
+
+ return contents
+
+
+@_patch_objstorages(["src", "dst"])
+def test_replay_content(
+ objstorages,
+ kafka_prefix: str,
+ kafka_consumer_group: str,
+ kafka_server: Tuple[Popen, int],
+):
+
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
+
+ result = invoke(
+ "replay",
+ "--stop-after-objects",
+ str(NUM_CONTENTS),
+ journal_config={
+ "brokers": kafka_server,
+ "group_id": kafka_consumer_group,
+ "prefix": kafka_prefix,
+ },
+ )
+
+ expected = r"Done.\n"
+ assert result.exit_code == 0, result.output
+ assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
+
+ for (sha1, content) in contents.items():
+ assert sha1 in objstorages["dst"], sha1
+ assert objstorages["dst"].get(sha1) == content
+
+
+@_patch_objstorages(["src", "dst"])
+def test_replay_content_structured_log(
+ objstorages,
+ kafka_prefix: str,
+ kafka_consumer_group: str,
+ kafka_server: Tuple[Popen, int],
+ caplog,
+):
+
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
+
+ caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay")
+
+ expected_obj_ids = set(hash_to_hex(sha1) for sha1 in contents)
+
+ result = invoke(
+ "replay",
+ "--stop-after-objects",
+ str(NUM_CONTENTS),
+ journal_config={
+ "brokers": kafka_server,
+ "group_id": kafka_consumer_group,
+ "prefix": kafka_prefix,
+ },
+ )
+ expected = r"Done.\n"
+ assert result.exit_code == 0, result.output
+ assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
+
+ copied = set()
+ for record in caplog.records:
+ logtext = record.getMessage()
+ if "copied" in logtext:
+ copied.add(record.args["obj_id"])
+
+ assert (
+ copied == expected_obj_ids
+ ), "Mismatched logging; see captured log output for details."
+
+
+@_patch_objstorages(["src", "dst"])
+def test_replay_content_static_group_id(
+ objstorages,
+ storage,
+ kafka_prefix: str,
+ kafka_consumer_group: str,
+ kafka_server: Tuple[Popen, int],
+ caplog,
+):
+
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
+
+ # Setup log capture to fish the consumer settings out of the log messages
+ caplog.set_level(logging.DEBUG, "swh.journal.client")
+
+ result = invoke(
+ "replay",
+ "--stop-after-objects",
+ str(NUM_CONTENTS),
+ env={"KAFKA_GROUP_INSTANCE_ID": "static-group-instance-id"},
+ journal_config={
+ "brokers": kafka_server,
+ "group_id": kafka_consumer_group,
+ "prefix": kafka_prefix,
+ },
+ )
+ expected = r"Done.\n"
+ assert result.exit_code == 0, result.output
+ assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
+
+ consumer_settings = None
+ for record in caplog.records:
+ if "Consumer settings" in record.message:
+ consumer_settings = record.args
+ break
+
+ assert consumer_settings is not None, (
+ "Failed to get consumer settings out of the consumer log. "
+ "See log capture for details."
+ )
+ assert consumer_settings["group.instance.id"] == "static-group-instance-id"
+ assert consumer_settings["session.timeout.ms"] == 60 * 10 * 1000
+ assert consumer_settings["max.poll.interval.ms"] == 90 * 10 * 1000
+
+ for (sha1, content) in contents.items():
+ assert sha1 in objstorages["dst"], sha1
+ assert objstorages["dst"].get(sha1) == content
+
+
+@_patch_objstorages(["src", "dst"])
+def test_replay_content_exclude(
+ objstorages,
+ storage,
+ kafka_prefix: str,
+ kafka_consumer_group: str,
+ kafka_server: Tuple[Popen, int],
+):
+
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
+
+ excluded_contents = list(contents)[0::2] # picking half of them
+ with tempfile.NamedTemporaryFile(mode="w+b") as fd:
+ fd.write(b"".join(sorted(excluded_contents)))
+
+ fd.seek(0)
+
+ result = invoke(
+ "replay",
+ "--stop-after-objects",
+ str(NUM_CONTENTS),
+ "--exclude-sha1-file",
+ fd.name,
+ journal_config={
+ "brokers": kafka_server,
+ "group_id": kafka_consumer_group,
+ "prefix": kafka_prefix,
+ },
+ )
+ expected = r"Done.\n"
+ assert result.exit_code == 0, result.output
+ assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
+
+ for (sha1, content) in contents.items():
+ if sha1 in excluded_contents:
+ assert sha1 not in objstorages["dst"], sha1
+ else:
+ assert sha1 in objstorages["dst"], sha1
+ assert objstorages["dst"].get(sha1) == content
+
+
+NUM_CONTENTS_DST = 5
+
+
+@_patch_objstorages(["src", "dst"])
+@pytest.mark.parametrize(
+ "check_dst,expected_copied,expected_in_dst",
+ [
+ (True, NUM_CONTENTS - NUM_CONTENTS_DST, NUM_CONTENTS_DST),
+ (False, NUM_CONTENTS, 0),
+ ],
+)
+def test_replay_content_check_dst(
+ objstorages,
+ storage,
+ kafka_prefix: str,
+ kafka_consumer_group: str,
+ kafka_server: Tuple[Popen, int],
+ check_dst: bool,
+ expected_copied: int,
+ expected_in_dst: int,
+ caplog,
+):
+
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
+
+ for i, (sha1, content) in enumerate(contents.items()):
+ if i >= NUM_CONTENTS_DST:
+ break
+
+ objstorages["dst"].add(content, obj_id=sha1)
+
+ caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay")
+
+ result = invoke(
+ "replay",
+ "--stop-after-objects",
+ str(NUM_CONTENTS),
+ "--check-dst" if check_dst else "--no-check-dst",
+ journal_config={
+ "brokers": kafka_server,
+ "group_id": kafka_consumer_group,
+ "prefix": kafka_prefix,
+ },
+ )
+ expected = r"Done.\n"
+ assert result.exit_code == 0, result.output
+ assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
+
+ copied = 0
+ in_dst = 0
+ for record in caplog.records:
+ logtext = record.getMessage()
+ if "copied" in logtext:
+ copied += 1
+ elif "in dst" in logtext:
+ in_dst += 1
+
+ assert (
+ copied == expected_copied and in_dst == expected_in_dst
+ ), "Unexpected amount of objects copied, see the captured log for details"
+
+ for (sha1, content) in contents.items():
+ assert sha1 in objstorages["dst"], sha1
+ assert objstorages["dst"].get(sha1) == content
+
+
+class FlakyObjStorage(InMemoryObjStorage):
+ def __init__(self, *args, **kwargs):
+ state = kwargs.pop("state")
+ self.failures_left = Counter(kwargs.pop("failures"))
+ super().__init__(*args, **kwargs)
+ if state:
+ self.state = state
+
+ def flaky_operation(self, op, obj_id):
+ if self.failures_left[op, obj_id] > 0:
+ self.failures_left[op, obj_id] -= 1
+ raise RuntimeError("Failed %s on %s" % (op, hash_to_hex(obj_id)))
+
+ def get(self, obj_id):
+ self.flaky_operation("get", obj_id)
+ return super().get(obj_id)
+
+ def add(self, data, obj_id=None, check_presence=True):
+ self.flaky_operation("add", obj_id)
+ return super().add(data, obj_id=obj_id, check_presence=check_presence)
+
+ def __contains__(self, obj_id):
+ self.flaky_operation("in", obj_id)
+ return super().__contains__(obj_id)
+
+
+@_patch_objstorages(["src", "dst"])
+def test_replay_content_check_dst_retry(
+ objstorages,
+ storage,
+ kafka_prefix: str,
+ kafka_consumer_group: str,
+ kafka_server: Tuple[Popen, int],
+ monkeypatch_retry_sleep,
+):
+
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
+
+ failures = {}
+ for i, (sha1, content) in enumerate(contents.items()):
+ if i >= NUM_CONTENTS_DST:
+ break
+
+ objstorages["dst"].add(content, obj_id=sha1)
+ failures["in", sha1] = 1
+
+ orig_dst = objstorages["dst"]
+ objstorages["dst"] = FlakyObjStorage(state=orig_dst.state, failures=failures)
+
+ result = invoke(
+ "replay",
+ "--stop-after-objects",
+ str(NUM_CONTENTS),
+ "--check-dst",
+ journal_config={
+ "brokers": kafka_server,
+ "group_id": kafka_consumer_group,
+ "prefix": kafka_prefix,
+ },
+ )
+ expected = r"Done.\n"
+ assert result.exit_code == 0, result.output
+ assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
+
+ for (sha1, content) in contents.items():
+ assert sha1 in objstorages["dst"], sha1
+ assert objstorages["dst"].get(sha1) == content
+
+
+@_patch_objstorages(["src", "dst"])
+def test_replay_content_failed_copy_retry(
+ objstorages,
+ storage,
+ kafka_prefix: str,
+ kafka_consumer_group: str,
+ kafka_server: Tuple[Popen, int],
+ caplog,
+ monkeypatch_retry_sleep,
+):
+
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
+
+ add_failures = {}
+ get_failures = {}
+ definitely_failed = set()
+
+ # We want to generate operations failing 1 to CONTENT_REPLAY_RETRIES times.
+ # We generate failures for 2 different operations, get and add.
+ num_retry_contents = 2 * CONTENT_REPLAY_RETRIES
+
+ assert (
+ num_retry_contents < NUM_CONTENTS
+ ), "Need to generate more test contents to properly test retry behavior"
+
+ for i, sha1 in enumerate(contents):
+ if i >= num_retry_contents:
+ break
+
+ # This generates a number of failures, up to CONTENT_REPLAY_RETRIES
+ num_failures = (i % CONTENT_REPLAY_RETRIES) + 1
+
+ # This generates failures of add for the first CONTENT_REPLAY_RETRIES
+ # objects, then failures of get.
+ if i < CONTENT_REPLAY_RETRIES:
+ add_failures["add", sha1] = num_failures
+ else:
+ get_failures["get", sha1] = num_failures
+
+ # Only contents that have CONTENT_REPLAY_RETRIES or more are
+ # definitely failing
+ if num_failures >= CONTENT_REPLAY_RETRIES:
+ definitely_failed.add(hash_to_hex(sha1))
+
+ objstorages["dst"] = FlakyObjStorage(
+ state=objstorages["dst"].state, failures=add_failures,
+ )
+ objstorages["src"] = FlakyObjStorage(
+ state=objstorages["src"].state, failures=get_failures,
+ )
+
+ caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay")
+
+ result = invoke(
+ "replay",
+ "--stop-after-objects",
+ str(NUM_CONTENTS),
+ journal_config={
+ "brokers": kafka_server,
+ "group_id": kafka_consumer_group,
+ "prefix": kafka_prefix,
+ },
+ )
+ expected = r"Done.\n"
+ assert result.exit_code == 0, result.output
+ assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
+
+ copied = 0
+ actually_failed = set()
+ for record in caplog.records:
+ logtext = record.getMessage()
+ if "copied" in logtext:
+ copied += 1
+ elif "Failed operation" in logtext:
+ assert record.levelno == logging.ERROR
+ assert record.args["retries"] == CONTENT_REPLAY_RETRIES
+ actually_failed.add(record.args["obj_id"])
+
+ assert (
+ actually_failed == definitely_failed
+ ), "Unexpected object copy failures; see captured log for details"
+
+ for (sha1, content) in contents.items():
+ if hash_to_hex(sha1) in definitely_failed:
+ assert sha1 not in objstorages["dst"]
+ continue
+
+ assert sha1 in objstorages["dst"], sha1
+ assert objstorages["dst"].get(sha1) == content
+
+
+@_patch_objstorages(["src", "dst"])
+def test_replay_content_objnotfound(
+ objstorages,
+ storage,
+ kafka_prefix: str,
+ kafka_consumer_group: str,
+ kafka_server: Tuple[Popen, int],
+ caplog,
+):
+
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
+
+ num_contents_deleted = 5
+ contents_deleted = set()
+
+ for i, sha1 in enumerate(contents):
+ if i >= num_contents_deleted:
+ break
+
+ del objstorages["src"].state[sha1]
+ contents_deleted.add(hash_to_hex(sha1))
+
+ caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay")
+
+ result = invoke(
+ "replay",
+ "--stop-after-objects",
+ str(NUM_CONTENTS),
+ journal_config={
+ "brokers": kafka_server,
+ "group_id": kafka_consumer_group,
+ "prefix": kafka_prefix,
+ },
+ )
+ expected = r"Done.\n"
+ assert result.exit_code == 0, result.output
+ assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
+
+ copied = 0
+ not_in_src = set()
+ for record in caplog.records:
+ logtext = record.getMessage()
+ if "copied" in logtext:
+ copied += 1
+ elif "object not found" in logtext:
+ # Check that the object id can be recovered from logs
+ assert record.levelno == logging.ERROR
+ not_in_src.add(record.args["obj_id"])
+
+ assert (
+ copied == NUM_CONTENTS - num_contents_deleted
+ ), "Unexpected number of contents copied"
+
+ assert (
+ not_in_src == contents_deleted
+ ), "Mismatch between deleted contents and not_in_src logs"
+
+ for (sha1, content) in contents.items():
+ if sha1 not in objstorages["src"]:
+ continue
+ assert sha1 in objstorages["dst"], sha1
+ assert objstorages["dst"].get(sha1) == content
diff --git a/swh/objstorage/replayer/tests/test_replay.py b/swh/objstorage/replayer/tests/test_replay.py
new file mode 100644
--- /dev/null
+++ b/swh/objstorage/replayer/tests/test_replay.py
@@ -0,0 +1,71 @@
+# Copyright (C) 2019-2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import functools
+
+from hypothesis import given, settings, HealthCheck
+from hypothesis.strategies import lists, sets, binary
+
+from swh.model.hypothesis_strategies import present_contents
+from swh.objstorage import get_objstorage
+from swh.objstorage.replayer.replay import (
+ is_hash_in_bytearray,
+ process_replay_objects_content,
+)
+from swh.journal.tests.utils import MockedJournalClient, MockedKafkaWriter
+
+
+hash_strategy = binary(min_size=20, max_size=20)
+
+
+@settings(max_examples=500)
+@given(
+ sets(hash_strategy, min_size=0, max_size=500), sets(hash_strategy, min_size=10),
+)
+def test_is_hash_in_bytearray(haystack, needles):
+ array = b"".join(sorted(haystack))
+ needles |= haystack # Exhaustively test for all objects in the array
+ for needle in needles:
+ assert is_hash_in_bytearray(needle, array, len(haystack)) == (
+ needle in haystack
+ )
+
+
+@given(lists(present_contents(), min_size=1))
+@settings(suppress_health_check=[HealthCheck.too_slow])
+def test_replay_content(objects):
+
+ queue = []
+ replayer = MockedJournalClient(queue)
+ writer = MockedKafkaWriter(queue)
+
+ objstorage1 = get_objstorage(cls="memory", args={})
+ objstorage2 = get_objstorage(cls="memory", args={})
+
+ contents = []
+ for obj in objects:
+ objstorage1.add(obj.data)
+ contents.append(obj)
+ writer.write_addition("content", obj)
+
+ # Bail out early if we didn't insert any relevant objects...
+ queue_size = len(queue)
+ assert queue_size != 0, "No test objects found; hypothesis strategy bug?"
+
+ assert replayer.stop_after_objects is None
+ replayer.stop_after_objects = queue_size
+
+ worker_fn = functools.partial(
+ process_replay_objects_content, src=objstorage1, dst=objstorage2
+ )
+
+ replayer.process(worker_fn)
+
+ # only content with status visible will be copied in storage2
+ expected_objstorage_state = {
+ c.sha1: c.data for c in contents if c.status == "visible"
+ }
+
+ assert expected_objstorage_state == objstorage2.state
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sun, Aug 24, 5:58 PM (1 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3221907
Attached To
D3071: Initial version of the swh.objstorage.replayer package
Event Timeline
Log In to Comment