Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/mypy.ini b/mypy.ini
index 453537f..624eb85 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -1,21 +1,24 @@
[mypy]
namespace_packages = True
warn_unused_ignores = True
# 3rd party libraries without stubs (yet)
[mypy-confluent_kafka.*]
ignore_missing_imports = True
+[mypy-msgpack.*]
+ignore_missing_imports = True
+
[mypy-pkg_resources.*]
ignore_missing_imports = True
[mypy-pytest.*]
ignore_missing_imports = True
[mypy-systemd.daemon.*]
ignore_missing_imports = True
[mypy-tenacity.*]
ignore_missing_imports = True
diff --git a/requirements-test.txt b/requirements-test.txt
index bbd486c..db5bb6e 100644
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,2 +1,4 @@
pytest
+pytest-redis
types-pyyaml
+types-redis
diff --git a/requirements.txt b/requirements.txt
index 1cde173..a9a9521 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,3 +1,4 @@
# Add here external Python modules dependencies, one per line. Module names
# should match https://pypi.python.org/pypi names. For the full spec or
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
+redis
diff --git a/swh/objstorage/replayer/cli.py b/swh/objstorage/replayer/cli.py
index 673a344..bd04c59 100644
--- a/swh/objstorage/replayer/cli.py
+++ b/swh/objstorage/replayer/cli.py
@@ -1,137 +1,142 @@
# Copyright (C) 2016-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import logging
import click
try:
from systemd.daemon import notify
except ImportError:
notify = None
from swh.objstorage.cli import objstorage_cli_group
@objstorage_cli_group.command("replay")
@click.option(
"--stop-after-objects",
"-n",
default=None,
type=int,
help="Stop after processing this many objects. Default is to run forever.",
)
@click.option(
"--exclude-sha1-file",
default=None,
type=click.File("rb"),
help="File containing a sorted array of hashes to be excluded.",
)
@click.option(
"--check-dst/--no-check-dst",
default=True,
help="Check whether the destination contains the object before copying.",
)
@click.pass_context
def content_replay(ctx, stop_after_objects, exclude_sha1_file, check_dst):
"""Fill a destination Object Storage using a journal stream.
This is typically used for a mirror configuration, by reading a Journal
and retrieving objects from an existing source ObjStorage.
There can be several 'replayers' filling a given ObjStorage as long as they
use the same ``group-id``. You can use the ``KAFKA_GROUP_INSTANCE_ID``
environment variable to use KIP-345 static group membership.
This service retrieves object ids to copy from the 'content' topic. It will
only copy object's content if the object's description in the kafka
nmessage has the status:visible set.
``--exclude-sha1-file`` may be used to exclude some hashes to speed-up the
replay in case many of the contents are already in the destination
objstorage. It must contain a concatenation of all (sha1) hashes,
and it must be sorted.
This file will not be fully loaded into memory at any given time,
so it can be arbitrarily large.
``--check-dst`` sets whether the replayer should check in the destination
ObjStorage before copying an object. You can turn that off if you know
you're copying to an empty ObjStorage.
"""
import functools
import mmap
from swh.journal.client import get_journal_client
from swh.model.model import SHA1_SIZE
from swh.objstorage.factory import get_objstorage
from swh.objstorage.replayer.replay import (
is_hash_in_bytearray,
process_replay_objects_content,
)
conf = ctx.obj["config"]
try:
objstorage_src = get_objstorage(**conf.pop("objstorage"))
except KeyError:
ctx.fail("You must have a source objstorage configured in " "your config file.")
try:
objstorage_dst = get_objstorage(**conf.pop("objstorage_dst"))
except KeyError:
ctx.fail(
"You must have a destination objstorage configured " "in your config file."
)
if exclude_sha1_file:
map_ = mmap.mmap(exclude_sha1_file.fileno(), 0, prot=mmap.PROT_READ)
if map_.size() % SHA1_SIZE != 0:
ctx.fail(
"--exclude-sha1 must link to a file whose size is an "
"exact multiple of %d bytes." % SHA1_SIZE
)
nb_excluded_hashes = int(map_.size() / SHA1_SIZE)
def exclude_fn(obj):
return is_hash_in_bytearray(obj["sha1"], map_, nb_excluded_hashes)
else:
exclude_fn = None
journal_cfg = conf["journal_client"]
journal_cfg.setdefault("cls", "kafka")
+ if "error_reporter" in journal_cfg:
+ from redis import Redis
+ from swh.objstorage.replayer import replay
+ replay.REPORTER = Redis(**journal_cfg.pop("error_reporter")).set
+
client = get_journal_client(
**journal_cfg, stop_after_objects=stop_after_objects, object_types=("content",),
)
worker_fn = functools.partial(
process_replay_objects_content,
src=objstorage_src,
dst=objstorage_dst,
exclude_fn=exclude_fn,
check_dst=check_dst,
)
if notify:
notify("READY=1")
try:
client.process(worker_fn)
except KeyboardInterrupt:
ctx.exit(0)
else:
print("Done.")
finally:
if notify:
notify("STOPPING=1")
client.close()
def main():
logging.basicConfig()
return objstorage_cli_group(auto_envvar_prefix="SWH_OBJSTORAGE")
if __name__ == "__main__":
main()
diff --git a/swh/objstorage/replayer/replay.py b/swh/objstorage/replayer/replay.py
index 3f218fa..2bdec24 100644
--- a/swh/objstorage/replayer/replay.py
+++ b/swh/objstorage/replayer/replay.py
@@ -1,323 +1,331 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from time import time
from typing import Callable, Dict, List, Optional
+import msgpack
from sentry_sdk import capture_exception, push_scope
try:
from systemd.daemon import notify
except ImportError:
notify = None
from tenacity import (
retry,
retry_base,
retry_if_exception_type,
stop_after_attempt,
wait_random_exponential,
)
from swh.core.statsd import statsd
from swh.model.hashutil import hash_to_hex
from swh.model.model import SHA1_SIZE
from swh.objstorage.objstorage import ID_HASH_ALGO, ObjNotFoundError, ObjStorage
logger = logging.getLogger(__name__)
+REPORTER = None
CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total"
CONTENT_RETRY_METRIC = "swh_content_replayer_retries_total"
CONTENT_BYTES_METRIC = "swh_content_replayer_bytes"
CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds"
def is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=SHA1_SIZE):
"""
Checks if the given hash is in the provided `array`. The array must be
a *sorted* list of sha1 hashes, and contain `nb_hashes` hashes
(so its size must by `nb_hashes*hash_size` bytes).
Args:
hash_ (bytes): the hash to look for
array (bytes): a sorted concatenated array of hashes (may be of
any type supporting slice indexing, eg. :class:`mmap.mmap`)
nb_hashes (int): number of hashes in the array
hash_size (int): size of a hash (defaults to 20, for SHA1)
Example:
>>> import os
>>> hash1 = os.urandom(20)
>>> hash2 = os.urandom(20)
>>> hash3 = os.urandom(20)
>>> array = b''.join(sorted([hash1, hash2]))
>>> is_hash_in_bytearray(hash1, array, 2)
True
>>> is_hash_in_bytearray(hash2, array, 2)
True
>>> is_hash_in_bytearray(hash3, array, 2)
False
"""
if len(hash_) != hash_size:
raise ValueError("hash_ does not match the provided hash_size.")
def get_hash(position):
return array[position * hash_size : (position + 1) * hash_size]
# Regular dichotomy:
left = 0
right = nb_hashes
while left < right - 1:
middle = int((right + left) / 2)
pivot = get_hash(middle)
if pivot == hash_:
return True
elif pivot < hash_:
left = middle
else:
right = middle
return get_hash(left) == hash_
class ReplayError(Exception):
"""An error occurred during the replay of an object"""
def __init__(self, *, obj_id, exc):
self.obj_id = hash_to_hex(obj_id)
self.exc = exc
def __str__(self):
return "ReplayError(%s, %s)" % (self.obj_id, self.exc)
def log_replay_retry(retry_state, sleep=None, last_result=None):
"""Log a retry of the content replayer"""
exc = retry_state.outcome.exception()
operation = retry_state.fn.__name__
logger.debug(
"Retry operation %(operation)s on %(obj_id)s: %(exc)s",
{"operation": operation, "obj_id": exc.obj_id, "exc": str(exc.exc)},
)
def log_replay_error(retry_state):
"""Log a replay error to sentry"""
exc = retry_state.outcome.exception()
with push_scope() as scope:
scope.set_tag("operation", retry_state.fn.__name__)
scope.set_extra("obj_id", exc.obj_id)
capture_exception(exc.exc)
error_context = {
"obj_id": exc.obj_id,
"operation": retry_state.fn.__name__,
"exc": str(exc.exc),
"retries": retry_state.attempt_number,
}
logger.error(
"Failed operation %(operation)s on %(obj_id)s after %(retries)s"
" retries: %(exc)s",
error_context,
)
+ # if we have a global error (redis) reporter
+ if REPORTER is not None:
+ oid = f"blob:{exc.obj_id}"
+ msg = msgpack.dumps(error_context)
+ REPORTER(oid, msg)
+
return None
CONTENT_REPLAY_RETRIES = 3
class retry_log_if_success(retry_base):
"""Log in statsd the number of attempts required to succeed"""
def __call__(self, retry_state):
if not retry_state.outcome.failed:
statsd.increment(
CONTENT_RETRY_METRIC,
tags={
"operation": retry_state.fn.__name__,
"attempt": str(retry_state.attempt_number),
},
)
return False
content_replay_retry = retry(
retry=retry_if_exception_type(ReplayError) | retry_log_if_success(),
stop=stop_after_attempt(CONTENT_REPLAY_RETRIES),
wait=wait_random_exponential(multiplier=1, max=60),
before_sleep=log_replay_retry,
retry_error_callback=log_replay_error,
)
@content_replay_retry
def get_object(objstorage, obj_id):
try:
with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "get"}):
obj = objstorage.get(obj_id)
logger.debug("retrieved %(obj_id)s", {"obj_id": hash_to_hex(obj_id)})
return obj
except ObjNotFoundError:
logger.error(
"Failed to retrieve %(obj_id)s: object not found",
{"obj_id": hash_to_hex(obj_id)},
)
raise
except Exception as exc:
raise ReplayError(obj_id=obj_id, exc=exc) from None
@content_replay_retry
def put_object(objstorage, obj_id, obj):
try:
with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "put"}):
obj = objstorage.add(obj, obj_id, check_presence=False)
logger.debug("stored %(obj_id)s", {"obj_id": hash_to_hex(obj_id)})
except Exception as exc:
raise ReplayError(obj_id=obj_id, exc=exc) from None
def copy_object(obj_id, src, dst):
obj = get_object(src, obj_id)
if obj is not None:
put_object(dst, obj_id, obj)
statsd.increment(CONTENT_BYTES_METRIC, len(obj))
return len(obj)
return 0
@content_replay_retry
def obj_in_objstorage(obj_id, dst):
"""Check if an object is already in an objstorage, tenaciously"""
try:
return obj_id in dst
except Exception as exc:
raise ReplayError(obj_id=obj_id, exc=exc) from None
def process_replay_objects_content(
all_objects: Dict[str, List[dict]],
*,
src: ObjStorage,
dst: ObjStorage,
exclude_fn: Optional[Callable[[dict], bool]] = None,
check_dst: bool = True,
):
"""
Takes a list of records from Kafka (see
:py:func:`swh.journal.client.JournalClient.process`) and copies them
from the `src` objstorage to the `dst` objstorage, if:
* `obj['status']` is `'visible'`
* `exclude_fn(obj)` is `False` (if `exclude_fn` is provided)
* `obj['sha1'] not in dst` (if `check_dst` is True)
Args:
all_objects: Objects passed by the Kafka client. Most importantly,
`all_objects['content'][*]['sha1']` is the sha1 hash of each
content.
src: An object storage (see :py:func:`swh.objstorage.get_objstorage`)
dst: An object storage (see :py:func:`swh.objstorage.get_objstorage`)
exclude_fn: Determines whether an object should be copied.
check_dst: Determines whether we should check the destination
objstorage before copying.
Example:
>>> from swh.objstorage.factory import get_objstorage
>>> src = get_objstorage('memory')
>>> dst = get_objstorage('memory')
>>> id1 = src.add(b'foo bar')
>>> id2 = src.add(b'baz qux')
>>> kafka_partitions = {
... 'content': [
... {
... 'sha1': id1,
... 'status': 'visible',
... },
... {
... 'sha1': id2,
... 'status': 'visible',
... },
... ]
... }
>>> process_replay_objects_content(
... kafka_partitions, src=src, dst=dst,
... exclude_fn=lambda obj: obj['sha1'] == id1)
>>> id1 in dst
False
>>> id2 in dst
True
"""
vol = []
nb_skipped = 0
nb_failures = 0
t0 = time()
for (object_type, objects) in all_objects.items():
if object_type != "content":
logger.warning(
"Received a series of %s, this should not happen", object_type
)
continue
for obj in objects:
obj_id = obj[ID_HASH_ALGO]
if obj["status"] != "visible":
nb_skipped += 1
logger.debug(
"skipped %s (status=%s)", hash_to_hex(obj_id), obj["status"]
)
statsd.increment(
CONTENT_OPERATIONS_METRIC,
tags={"decision": "skipped", "status": obj["status"]},
)
elif exclude_fn and exclude_fn(obj):
nb_skipped += 1
logger.debug("skipped %s (manually excluded)", hash_to_hex(obj_id))
statsd.increment(
CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"}
)
elif check_dst and obj_in_objstorage(obj_id, dst):
nb_skipped += 1
logger.debug("skipped %s (in dst)", hash_to_hex(obj_id))
statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "in_dst"})
else:
try:
copied = copy_object(obj_id, src, dst)
except ObjNotFoundError:
nb_skipped += 1
statsd.increment(
CONTENT_OPERATIONS_METRIC, tags={"decision": "not_in_src"}
)
else:
if copied is None:
nb_failures += 1
statsd.increment(
CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"}
)
else:
vol.append(copied)
statsd.increment(
CONTENT_OPERATIONS_METRIC, tags={"decision": "copied"}
)
dt = time() - t0
logger.info(
"processed %s content objects in %.1fsec "
"(%.1f obj/sec, %.1fMB/sec) - %d failed - %d skipped",
len(vol),
dt,
len(vol) / dt,
sum(vol) / 1024 / 1024 / dt,
nb_failures,
nb_skipped,
)
if notify:
notify("WATCHDOG=1")
diff --git a/swh/objstorage/replayer/tests/test_cli.py b/swh/objstorage/replayer/tests/test_cli.py
index acccb38..0dbe0e9 100644
--- a/swh/objstorage/replayer/tests/test_cli.py
+++ b/swh/objstorage/replayer/tests/test_cli.py
@@ -1,634 +1,666 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import Counter
import copy
import functools
import logging
import re
from subprocess import Popen
import tempfile
from typing import Tuple
from unittest.mock import patch
from click.testing import CliRunner
from confluent_kafka import Producer
+import msgpack
import pytest
import yaml
from swh.journal.serializers import key_to_kafka
from swh.model.hashutil import hash_to_hex
from swh.objstorage.backends.in_memory import InMemoryObjStorage
from swh.objstorage.replayer.cli import objstorage_cli_group
from swh.objstorage.replayer.replay import CONTENT_REPLAY_RETRIES
logger = logging.getLogger(__name__)
CLI_CONFIG = {
"objstorage": {"cls": "mocked", "name": "src",},
"objstorage_dst": {"cls": "mocked", "name": "dst",},
}
@pytest.fixture
def monkeypatch_retry_sleep(monkeypatch):
from swh.objstorage.replayer.replay import get_object, obj_in_objstorage, put_object
monkeypatch.setattr(get_object.retry, "sleep", lambda x: None)
monkeypatch.setattr(put_object.retry, "sleep", lambda x: None)
monkeypatch.setattr(obj_in_objstorage.retry, "sleep", lambda x: None)
def _patch_objstorages(names):
objstorages = {name: InMemoryObjStorage() for name in names}
def get_mock_objstorage(cls, **args):
assert cls == "mocked", cls
return objstorages[args["name"]]
def decorator(f):
@functools.wraps(f)
@patch("swh.objstorage.factory.get_objstorage")
def newf(get_objstorage_mock, *args, **kwargs):
get_objstorage_mock.side_effect = get_mock_objstorage
f(*args, objstorages=objstorages, **kwargs)
return newf
return decorator
def invoke(*args, env=None, journal_config=None):
config = copy.deepcopy(CLI_CONFIG)
if journal_config:
config["journal_client"] = journal_config
runner = CliRunner()
with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd:
yaml.dump(config, config_fd)
config_fd.seek(0)
args = ["-C" + config_fd.name] + list(args)
return runner.invoke(
objstorage_cli_group, args, obj={"log_level": logging.DEBUG}, env=env,
)
def test_replay_help():
result = invoke("replay", "--help",)
expected = (
r"^\s*Usage: objstorage replay \[OPTIONS\]\s+"
r"Fill a destination Object Storage.*"
)
assert result.exit_code == 0, result.output
assert re.match(expected, result.output, re.MULTILINE), result.output
NUM_CONTENTS = 10
def _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorage):
producer = Producer(
{
"bootstrap.servers": kafka_server,
"client.id": "test-producer",
"acks": "all",
}
)
contents = {}
for i in range(NUM_CONTENTS):
content = b"\x00" * 19 + bytes([i])
sha1 = objstorage.add(content)
contents[sha1] = content
producer.produce(
topic=kafka_prefix + ".content",
key=key_to_kafka(sha1),
value=key_to_kafka({"sha1": sha1, "status": "visible",}),
)
producer.flush()
return contents
@_patch_objstorages(["src", "dst"])
def test_replay_content(
objstorages,
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int],
):
"""Check the content replayer in normal conditions"""
contents = _fill_objstorage_and_kafka(
kafka_server, kafka_prefix, objstorages["src"]
)
result = invoke(
"replay",
"--stop-after-objects",
str(NUM_CONTENTS),
journal_config={
"brokers": kafka_server,
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
for (sha1, content) in contents.items():
assert sha1 in objstorages["dst"], sha1
assert objstorages["dst"].get(sha1) == content
@_patch_objstorages(["src", "dst"])
def test_replay_content_structured_log(
objstorages,
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int],
caplog,
):
"""Check the logs produced by the content replayer in normal conditions"""
contents = _fill_objstorage_and_kafka(
kafka_server, kafka_prefix, objstorages["src"]
)
caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay")
expected_obj_ids = set(hash_to_hex(sha1) for sha1 in contents)
result = invoke(
"replay",
"--stop-after-objects",
str(NUM_CONTENTS),
journal_config={
"brokers": kafka_server,
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
copied = set()
for record in caplog.records:
logtext = record.getMessage()
if "stored" in logtext:
copied.add(record.args["obj_id"])
assert (
copied == expected_obj_ids
), "Mismatched logging; see captured log output for details."
@_patch_objstorages(["src", "dst"])
def test_replay_content_static_group_id(
objstorages,
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int],
caplog,
):
"""Check the content replayer in normal conditions
with KAFKA_GROUP_INSTANCE_ID set
"""
contents = _fill_objstorage_and_kafka(
kafka_server, kafka_prefix, objstorages["src"]
)
# Setup log capture to fish the consumer settings out of the log messages
caplog.set_level(logging.DEBUG, "swh.journal.client")
result = invoke(
"replay",
"--stop-after-objects",
str(NUM_CONTENTS),
env={"KAFKA_GROUP_INSTANCE_ID": "static-group-instance-id"},
journal_config={
"brokers": kafka_server,
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
consumer_settings = None
for record in caplog.records:
if "Consumer settings" in record.message:
consumer_settings = record.args
break
assert consumer_settings is not None, (
"Failed to get consumer settings out of the consumer log. "
"See log capture for details."
)
assert consumer_settings["group.instance.id"] == "static-group-instance-id"
assert consumer_settings["session.timeout.ms"] == 60 * 10 * 1000
assert consumer_settings["max.poll.interval.ms"] == 90 * 10 * 1000
for (sha1, content) in contents.items():
assert sha1 in objstorages["dst"], sha1
assert objstorages["dst"].get(sha1) == content
@_patch_objstorages(["src", "dst"])
def test_replay_content_exclude(
objstorages,
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int],
):
"""Check the content replayer in normal conditions
with a exclusion file (--exclude-sha1-file)
"""
contents = _fill_objstorage_and_kafka(
kafka_server, kafka_prefix, objstorages["src"]
)
excluded_contents = list(contents)[0::2] # picking half of them
with tempfile.NamedTemporaryFile(mode="w+b") as fd:
fd.write(b"".join(sorted(excluded_contents)))
fd.seek(0)
result = invoke(
"replay",
"--stop-after-objects",
str(NUM_CONTENTS),
"--exclude-sha1-file",
fd.name,
journal_config={
"brokers": kafka_server,
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
for (sha1, content) in contents.items():
if sha1 in excluded_contents:
assert sha1 not in objstorages["dst"], sha1
else:
assert sha1 in objstorages["dst"], sha1
assert objstorages["dst"].get(sha1) == content
NUM_CONTENTS_DST = 5
@_patch_objstorages(["src", "dst"])
@pytest.mark.parametrize(
"check_dst,expected_copied,expected_in_dst",
[
(True, NUM_CONTENTS - NUM_CONTENTS_DST, NUM_CONTENTS_DST),
(False, NUM_CONTENTS, 0),
],
)
def test_replay_content_check_dst(
objstorages,
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int],
check_dst: bool,
expected_copied: int,
expected_in_dst: int,
caplog,
):
"""Check the content replayer in normal conditions
with some objects already in the dst objstorage.
When check_dst is True, expect those not to be neither retrieved from the
src objstorage nor pushed in the dst objstorage.
"""
contents = _fill_objstorage_and_kafka(
kafka_server, kafka_prefix, objstorages["src"]
)
# add some objects in the dst objstorage
for i, (sha1, content) in enumerate(contents.items()):
if i >= NUM_CONTENTS_DST:
break
objstorages["dst"].add(content, obj_id=sha1)
caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay")
result = invoke(
"replay",
"--stop-after-objects",
str(NUM_CONTENTS),
"--check-dst" if check_dst else "--no-check-dst",
journal_config={
"brokers": kafka_server,
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
retrieved = 0
stored = 0
in_dst = 0
for record in caplog.records:
logtext = record.getMessage()
if "retrieved" in logtext:
retrieved += 1
elif "stored" in logtext:
stored += 1
elif "in dst" in logtext:
in_dst += 1
assert (
retrieved == expected_copied
and stored == expected_copied
and in_dst == expected_in_dst
), "Unexpected amount of objects copied, see the captured log for details"
for (sha1, content) in contents.items():
assert sha1 in objstorages["dst"], sha1
assert objstorages["dst"].get(sha1) == content
class FlakyObjStorage(InMemoryObjStorage):
"""Flaky objstorage
Any 'get', 'add' or 'in' (i.e. '__contains__()') operation will fail
according to configured 'failures'.
'failures' is expected to be a dict which keys are couples (operation,
obj_id) and values are the number of time the operation 'operation' is
expected to fail for object 'obj_id' before being performed successfully.
An optional state ('state') can be also given as argument (see
InMemoryObjStorage).
"""
def __init__(self, *args, **kwargs):
state = kwargs.pop("state")
self.failures_left = Counter(kwargs.pop("failures"))
super().__init__(*args, **kwargs)
if state:
self.state = state
def flaky_operation(self, op, obj_id):
if self.failures_left[op, obj_id] > 0:
self.failures_left[op, obj_id] -= 1
raise RuntimeError("Failed %s on %s" % (op, hash_to_hex(obj_id)))
def get(self, obj_id):
self.flaky_operation("get", obj_id)
return super().get(obj_id)
def add(self, data, obj_id=None, check_presence=True):
self.flaky_operation("add", obj_id)
return super().add(data, obj_id=obj_id, check_presence=check_presence)
def __contains__(self, obj_id):
self.flaky_operation("in", obj_id)
return super().__contains__(obj_id)
@_patch_objstorages(["src", "dst"])
def test_replay_content_check_dst_retry(
objstorages,
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int],
monkeypatch_retry_sleep,
caplog,
+ redis_proc,
+ redisdb,
):
"""Check the content replayer with a flaky dst objstorage
for 'in' operations.
"""
contents = _fill_objstorage_and_kafka(
kafka_server, kafka_prefix, objstorages["src"]
)
# build a flaky dst objstorage in which the 'in' operation for the first
# NUM_CONTENT_DST objects will fail once
failures = {}
for i, (sha1, content) in enumerate(contents.items()):
if i >= NUM_CONTENTS_DST:
break
objstorages["dst"].add(content, obj_id=sha1)
failures["in", sha1] = 1
orig_dst = objstorages["dst"]
objstorages["dst"] = FlakyObjStorage(state=orig_dst.state, failures=failures)
caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay")
result = invoke(
"replay",
"--check-dst",
"--stop-after-objects",
str(NUM_CONTENTS),
journal_config={
"brokers": kafka_server,
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
+ "error_reporter": {"host": redis_proc.host, "port": redis_proc.port},
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
# check that exactly NUM_CONTENTS_DST 'in' operations have failed once
failed_in = 0
for record in caplog.records:
logtext = record.getMessage()
if "Retry operation obj_in_objstorage" in logtext:
failed_in += 1
elif "Retry operation" in logtext:
assert False, "No other failure expected than 'in' operations"
assert failed_in == NUM_CONTENTS_DST
+ # check nothing has been reported in redis
+ assert not redisdb.keys()
+
# in the end, the replay process should be OK
for (sha1, content) in contents.items():
assert sha1 in objstorages["dst"], sha1
assert objstorages["dst"].get(sha1) == content
@_patch_objstorages(["src", "dst"])
def test_replay_content_failed_copy_retry(
objstorages,
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int],
caplog,
monkeypatch_retry_sleep,
+ redis_proc,
+ redisdb,
):
"""Check the content replayer with a flaky src and dst objstorages
- for 'get' and 'add' operations.
+ for 'get' and 'add' operations, and a few non-recoverable failures (some
+ objects failed to be replayed).
+
"""
contents = _fill_objstorage_and_kafka(
kafka_server, kafka_prefix, objstorages["src"]
)
add_failures = {}
get_failures = {}
definitely_failed = set()
# We want to generate operations failing 1 to CONTENT_REPLAY_RETRIES times.
# We generate failures for 2 different operations, get and add.
num_retry_contents = 2 * CONTENT_REPLAY_RETRIES
assert (
num_retry_contents < NUM_CONTENTS
), "Need to generate more test contents to properly test retry behavior"
for i, sha1 in enumerate(contents):
if i >= num_retry_contents:
break
# This generates a number of failures, up to CONTENT_REPLAY_RETRIES
num_failures = (i % CONTENT_REPLAY_RETRIES) + 1
# This generates failures of add for the first CONTENT_REPLAY_RETRIES
# objects, then failures of get.
if i < CONTENT_REPLAY_RETRIES:
add_failures["add", sha1] = num_failures
else:
get_failures["get", sha1] = num_failures
# Only contents that have CONTENT_REPLAY_RETRIES or more are
# definitely failing
if num_failures >= CONTENT_REPLAY_RETRIES:
definitely_failed.add(hash_to_hex(sha1))
assert add_failures
assert get_failures
assert definitely_failed
objstorages["dst"] = FlakyObjStorage(
state=objstorages["dst"].state, failures=add_failures,
)
objstorages["src"] = FlakyObjStorage(
state=objstorages["src"].state, failures=get_failures,
)
caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay")
result = invoke(
"replay",
"--stop-after-objects",
str(NUM_CONTENTS),
journal_config={
"brokers": kafka_server,
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
+ "error_reporter": {"host": redis_proc.host, "port": redis_proc.port},
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
+ # check the logs looks as expected
copied = 0
- actually_failed = set()
+ failed_put = set()
+ failed_get = set()
for record in caplog.records:
logtext = record.getMessage()
if "stored" in logtext:
copied += 1
elif "Failed operation" in logtext:
assert record.levelno == logging.ERROR
assert record.args["retries"] == CONTENT_REPLAY_RETRIES
- actually_failed.add(record.args["obj_id"])
-
+ assert record.args["operation"] in ("get_object", "put_object")
+ if record.args["operation"] == "get_object":
+ failed_get.add(record.args["obj_id"])
+ else:
+ failed_put.add(record.args["obj_id"])
assert (
- actually_failed == definitely_failed
+ failed_put | failed_get == definitely_failed
), "Unexpected object copy failures; see captured log for details"
+ # check failed objects are referenced in redis
+ assert set(redisdb.keys()) == {
+ f"blob:{objid}".encode() for objid in definitely_failed
+ }
+ # and have a consistent error report in redis
+ for key in redisdb.keys():
+ report = msgpack.loads(redisdb[key])
+ assert report["operation"] in ("get_object", "put_object")
+ if report["operation"] == "get_object":
+ assert report["obj_id"] in failed_get
+ else:
+ assert report["obj_id"] in failed_put
+
+ # check valid object are in the dst objstorage, but
+ # failed objects are not.
for (sha1, content) in contents.items():
if hash_to_hex(sha1) in definitely_failed:
assert sha1 not in objstorages["dst"]
continue
assert sha1 in objstorages["dst"], sha1
assert objstorages["dst"].get(sha1) == content
@_patch_objstorages(["src", "dst"])
def test_replay_content_objnotfound(
objstorages,
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int],
caplog,
):
"""Check the ContentNotFound is not considered a failure to retry"""
contents = _fill_objstorage_and_kafka(
kafka_server, kafka_prefix, objstorages["src"]
)
# delete a few objects from the src objstorage
num_contents_deleted = 5
contents_deleted = set()
for i, sha1 in enumerate(contents):
if i >= num_contents_deleted:
break
del objstorages["src"].state[sha1]
contents_deleted.add(hash_to_hex(sha1))
caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay")
result = invoke(
"replay",
"--stop-after-objects",
str(NUM_CONTENTS),
journal_config={
"brokers": kafka_server,
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
copied = 0
not_in_src = set()
for record in caplog.records:
logtext = record.getMessage()
if "stored" in logtext:
copied += 1
elif "object not found" in logtext:
# Check that the object id can be recovered from logs
assert record.levelno == logging.ERROR
not_in_src.add(record.args["obj_id"])
elif "Retry operation" in logtext:
assert False, "Not found objects should not be retried"
assert (
copied == NUM_CONTENTS - num_contents_deleted
), "Unexpected number of contents copied"
assert (
not_in_src == contents_deleted
), "Mismatch between deleted contents and not_in_src logs"
for (sha1, content) in contents.items():
if sha1 not in objstorages["src"]:
continue
assert sha1 in objstorages["dst"], sha1
assert objstorages["dst"].get(sha1) == content

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 3:18 PM (5 d, 3 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3259474

Event Timeline