diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,6 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html +humanize redis +tenacity diff --git a/swh/objstorage/replayer/replay.py b/swh/objstorage/replayer/replay.py --- a/swh/objstorage/replayer/replay.py +++ b/swh/objstorage/replayer/replay.py @@ -1,14 +1,16 @@ -# Copyright (C) 2019-2020 The Software Heritage developers +# Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor from concurrent.futures import wait as futures_wait import logging from time import time +from traceback import format_tb from typing import Callable, Dict, List, Optional +from humanize import naturaldelta, naturalsize import msgpack from sentry_sdk import capture_exception, push_scope @@ -28,7 +30,9 @@ from swh.core.statsd import statsd from swh.model.hashutil import hash_to_hex from swh.model.model import SHA1_SIZE -from swh.objstorage.objstorage import ID_HASH_ALGO, ObjNotFoundError, ObjStorage +from swh.objstorage.constants import ID_HASH_ALGO +from swh.objstorage.exc import ObjNotFoundError +from swh.objstorage.objstorage import ObjStorage logger = logging.getLogger(__name__) REPORTER = None @@ -95,7 +99,11 @@ self.exc = exc def __str__(self): - return "ReplayError(%s, %s)" % (self.obj_id, self.exc) + return "ReplayError(%s, %r, %s)" % ( + self.obj_id, + self.exc, + format_tb(self.exc.__traceback__), + ) def log_replay_retry(retry_state, sleep=None, last_result=None): @@ -120,13 +128,13 @@ error_context = { "obj_id": exc.obj_id, "operation": retry_state.fn.__name__, - "exc": str(exc.exc), + "exc": str(exc), "retries": retry_state.attempt_number, } logger.error( "Failed operation %(operation)s on %(obj_id)s after %(retries)s" - " retries: %(exc)s", + " retries; last exception: %(exc)s", error_context, ) @@ -136,7 +144,7 @@ msg = msgpack.dumps(error_context) REPORTER(oid, msg) - return None + raise exc CONTENT_REPLAY_RETRIES = 3 @@ -271,50 +279,54 @@ >>> id2 in dst True """ - vol = [] - nb_skipped = 0 - nb_failures = 0 - t0 = time() def _copy_object(obj): - nonlocal nb_skipped - nonlocal nb_failures obj_id = obj[ID_HASH_ALGO] if obj["status"] != "visible": - nb_skipped += 1 logger.debug("skipped %s (status=%s)", hash_to_hex(obj_id), obj["status"]) statsd.increment( CONTENT_OPERATIONS_METRIC, tags={"decision": "skipped", "status": obj["status"]}, ) + return "skipped", None elif exclude_fn and exclude_fn(obj): - nb_skipped += 1 logger.debug("skipped %s (manually excluded)", hash_to_hex(obj_id)) statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"}) + return "excluded", None elif check_dst and obj_in_objstorage(obj_id, dst): - nb_skipped += 1 logger.debug("skipped %s (in dst)", hash_to_hex(obj_id)) statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "in_dst"}) + return "in_dst", None else: try: - copied = copy_object(obj_id, src, dst) + copied_bytes = copy_object(obj_id, src, dst) except ObjNotFoundError: - nb_skipped += 1 statsd.increment( CONTENT_OPERATIONS_METRIC, tags={"decision": "not_in_src"} ) + return "not_found", None + except Exception: + statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"}) + return "failed", None else: - if copied is None: - nb_failures += 1 + if copied_bytes is None: statsd.increment( CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"} ) + return "failed", None else: - vol.append(copied) statsd.increment( CONTENT_OPERATIONS_METRIC, tags={"decision": "copied"} ) + return "copied", copied_bytes + return "failed", None + + vol = 0 + stats = dict.fromkeys( + ["skipped", "excluded", "not_found", "failed", "copied", "in_dst"], 0 + ) + t0 = time() with ThreadPoolExecutor(max_workers=concurrency) as pool: futures = [] @@ -327,26 +339,33 @@ for obj in objects: futures.append(pool.submit(_copy_object, obj=obj)) - futures_wait(futures, return_when=FIRST_EXCEPTION) + futures_wait(futures) for f in futures: - if f.running(): - continue exc = f.exception() if exc: - pool.shutdown(wait=False) - f.result() + # XXX this should not happen, so it is probably wrong... raise exc + else: + state, nbytes = f.result() + if nbytes is not None: + vol += nbytes + stats[state] += 1 dt = time() - t0 logger.info( - "processed %s content objects in %.1fsec " - "(%.1f obj/sec, %.1fMB/sec) - %d failed - %d skipped", - len(vol), - dt, - len(vol) / dt, - sum(vol) / 1024 / 1024 / dt, - nb_failures, - nb_skipped, + "processed %s content objects (%s) in %s " + "(%.1f obj/sec, %s/sec) - %d copied - %d skipped - %d excluded - " + "%d not found - %d failed", + len(futures), + naturalsize(vol), + naturaldelta(dt), + len(futures) / dt, + naturalsize(vol / dt), + stats["copied"], + stats["skipped"], + stats["excluded"], + stats["not_found"], + stats["failed"], ) if notify: diff --git a/swh/objstorage/replayer/tests/test_replay_errors.py b/swh/objstorage/replayer/tests/test_replay_errors.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/replayer/tests/test_replay_errors.py @@ -0,0 +1,207 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from collections import defaultdict +import functools +from queue import Queue + +from swh.journal.client import JournalClient +from swh.journal.writer import get_journal_writer +from swh.model.model import Content +from swh.objstorage.exc import ObjNotFoundError +from swh.objstorage.factory import get_objstorage +from swh.objstorage.multiplexer.filter.filter import ObjStorageFilter +from swh.objstorage.replayer import replay +from swh.objstorage.replayer.replay import copy_object # needed for MonkeyPatch + +CONTENTS = [Content.from_data(f"foo{i}".encode()) for i in range(10)] + [ + Content.from_data(f"forbidden foo{i}".encode(), status="hidden") for i in range(10) +] + + +class FailingObjstorage(ObjStorageFilter): + def __init__(self, storage): + super().__init__(storage) + self.calls = defaultdict(lambda: 0) + self.rate = 3 + + def get(self, obj_id, *args, **kwargs): + self.calls[obj_id] += 1 + if (self.calls[obj_id] % self.rate) == 0: + return self.storage.get(obj_id, *args, **kwargs) + raise Exception("Nope") + + +class NotFoundObjstorage(ObjStorageFilter): + def get(self, obj_id, *args, **kwargs): + raise ObjNotFoundError(obj_id) + + +def prepare_test(kafka_server, kafka_prefix, kafka_consumer_group): + src_objstorage = get_objstorage(cls="memory") + + writer = get_journal_writer( + cls="kafka", + brokers=[kafka_server], + client_id="kafka_writer", + prefix=kafka_prefix, + anonymize=False, + ) + + for content in CONTENTS: + src_objstorage.add(content.data, obj_id=content.sha1) + writer.write_addition("content", content) + + replayer = JournalClient( + brokers=kafka_server, + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_on_eof=True, + ) + + return replayer, src_objstorage + + +def copy_object_q(q): + """Wrap the original copy_object function to capture (thread-local) tenacity + stats and puch them in a queue suitable for checking in a test session""" + + def wrap(obj_id, src, dst): + try: + ret = copy_object(obj_id, src, dst) + return ret + finally: + q.put(("get", obj_id, replay.get_object.retry.statistics.copy())) + q.put(("put", obj_id, replay.put_object.retry.statistics.copy())) + + return wrap + + +def test_replay_content_with_transient_errors( + kafka_server, kafka_prefix, kafka_consumer_group, monkeypatch +): + replayer, src_objstorage = prepare_test( + kafka_server, kafka_prefix, kafka_consumer_group + ) + dst_objstorage = get_objstorage(cls="memory") + src_objstorage = FailingObjstorage(src_objstorage) + + q = Queue() + monkeypatch.setattr(replay, "copy_object", copy_object_q(q)) + + worker_fn = functools.partial( + replay.process_replay_objects_content, + src=src_objstorage, + dst=dst_objstorage, + ) + replayer.process(worker_fn) + + # only content with status visible will be copied in storage2 + expected_objstorage_state = { + c.sha1: c.data for c in CONTENTS if c.status == "visible" + } + assert expected_objstorage_state == dst_objstorage.state + + stats = [q.get_nowait() for i in range(q.qsize())] + for obj_id in expected_objstorage_state: + put = next( + stat for (meth, oid, stat) in stats if oid == obj_id and meth == "put" + ) + assert put.get("attempt_number") == 1 + assert put.get("start_time") > 0 + assert put.get("idle_for") == 0 + + get = next( + stat for (meth, oid, stat) in stats if oid == obj_id and meth == "get" + ) + assert get.get("attempt_number") == 3 + assert get.get("start_time") > 0 + assert get.get("idle_for") > 0 + assert get.get("delay_since_first_attempt") > 0 + + +def test_replay_content_with_errors( + kafka_server, kafka_prefix, kafka_consumer_group, monkeypatch +): + replayer, src_objstorage = prepare_test( + kafka_server, kafka_prefix, kafka_consumer_group + ) + dst_objstorage = get_objstorage(cls="memory") + src_objstorage = FailingObjstorage(src_objstorage) + + q = Queue() + monkeypatch.setattr(replay, "copy_object", copy_object_q(q)) + monkeypatch.setattr(replay.get_object.retry.stop, "max_attempt_number", 2) + + worker_fn = functools.partial( + replay.process_replay_objects_content, + src=src_objstorage, + dst=dst_objstorage, + ) + replayer.process(worker_fn) + + # no object could be replicated + assert dst_objstorage.state == {} + + stats = [q.get_nowait() for i in range(q.qsize())] + for obj in CONTENTS: + if obj.status != "visible": + continue + + obj_id = obj.sha1 + put = next( + stat for (meth, oid, stat) in stats if oid == obj_id and meth == "put" + ) + assert put == {} + + get = next( + stat for (meth, oid, stat) in stats if oid == obj_id and meth == "get" + ) + assert get.get("attempt_number") == 2 + assert get.get("start_time") > 0 + assert get.get("idle_for") > 0 + assert get.get("delay_since_first_attempt") > 0 + + +def test_replay_content_not_found( + kafka_server, kafka_prefix, kafka_consumer_group, monkeypatch +): + replayer, src_objstorage = prepare_test( + kafka_server, kafka_prefix, kafka_consumer_group + ) + dst_objstorage = get_objstorage(cls="memory") + src_objstorage = NotFoundObjstorage(src_objstorage) + + q = Queue() + monkeypatch.setattr(replay, "copy_object", copy_object_q(q)) + + worker_fn = functools.partial( + replay.process_replay_objects_content, + src=src_objstorage, + dst=dst_objstorage, + ) + replayer.process(worker_fn) + + # no object could be replicated + assert dst_objstorage.state == {} + + stats = [q.get_nowait() for i in range(q.qsize())] + for obj in CONTENTS: + if obj.status != "visible": + continue + + obj_id = obj.sha1 + put = next( + stat for (meth, oid, stat) in stats if oid == obj_id and meth == "put" + ) + assert put == {} + + get = next( + stat for (meth, oid, stat) in stats if oid == obj_id and meth == "get" + ) + # ObjectNotFound should not be retried several times... + assert get.get("attempt_number") == 1 + assert get.get("start_time") > 0 + assert get.get("idle_for") == 0