diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -8,6 +8,9 @@ [mypy-confluent_kafka.*] ignore_missing_imports = True +[mypy-msgpack.*] +ignore_missing_imports = True + [mypy-pkg_resources.*] ignore_missing_imports = True diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,2 +1,4 @@ pytest +pytest-redis types-pyyaml +types-redis diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html +redis diff --git a/swh/objstorage/replayer/cli.py b/swh/objstorage/replayer/cli.py --- a/swh/objstorage/replayer/cli.py +++ b/swh/objstorage/replayer/cli.py @@ -102,6 +102,11 @@ journal_cfg = conf["journal_client"] journal_cfg.setdefault("cls", "kafka") + if "error_reporter" in journal_cfg: + from redis import Redis + from swh.objstorage.replayer import replay + replay.REPORTER = Redis(**journal_cfg.pop("error_reporter")).set + client = get_journal_client( **journal_cfg, stop_after_objects=stop_after_objects, object_types=("content",), ) diff --git a/swh/objstorage/replayer/replay.py b/swh/objstorage/replayer/replay.py --- a/swh/objstorage/replayer/replay.py +++ b/swh/objstorage/replayer/replay.py @@ -7,6 +7,7 @@ from time import time from typing import Callable, Dict, List, Optional +import msgpack from sentry_sdk import capture_exception, push_scope try: @@ -28,6 +29,7 @@ from swh.objstorage.objstorage import ID_HASH_ALGO, ObjNotFoundError, ObjStorage logger = logging.getLogger(__name__) +REPORTER = None CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total" CONTENT_RETRY_METRIC = "swh_content_replayer_retries_total" @@ -126,6 +128,12 @@ error_context, ) + # if we have a global error (redis) reporter + if REPORTER is not None: + oid = f"blob:{exc.obj_id}" + msg = msgpack.dumps(error_context) + REPORTER(oid, msg) + return None diff --git a/swh/objstorage/replayer/tests/test_cli.py b/swh/objstorage/replayer/tests/test_cli.py --- a/swh/objstorage/replayer/tests/test_cli.py +++ b/swh/objstorage/replayer/tests/test_cli.py @@ -15,6 +15,7 @@ from click.testing import CliRunner from confluent_kafka import Producer +import msgpack import pytest import yaml @@ -417,6 +418,8 @@ kafka_server: Tuple[Popen, int], monkeypatch_retry_sleep, caplog, + redis_proc, + redisdb, ): """Check the content replayer with a flaky dst objstorage @@ -447,6 +450,7 @@ "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, + "error_reporter": {"host": redis_proc.host, "port": redis_proc.port}, }, ) expected = r"Done.\n" @@ -463,6 +467,9 @@ assert False, "No other failure expected than 'in' operations" assert failed_in == NUM_CONTENTS_DST + # check nothing has been reported in redis + assert not redisdb.keys() + # in the end, the replay process should be OK for (sha1, content) in contents.items(): assert sha1 in objstorages["dst"], sha1 @@ -477,10 +484,14 @@ kafka_server: Tuple[Popen, int], caplog, monkeypatch_retry_sleep, + redis_proc, + redisdb, ): """Check the content replayer with a flaky src and dst objstorages - for 'get' and 'add' operations. + for 'get' and 'add' operations, and a few non-recoverable failures (some + objects failed to be replayed). + """ contents = _fill_objstorage_and_kafka( kafka_server, kafka_prefix, objstorages["src"] @@ -537,14 +548,17 @@ "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, + "error_reporter": {"host": redis_proc.host, "port": redis_proc.port}, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + # check the logs looks as expected copied = 0 - actually_failed = set() + failed_put = set() + failed_get = set() for record in caplog.records: logtext = record.getMessage() if "stored" in logtext: @@ -552,12 +566,30 @@ elif "Failed operation" in logtext: assert record.levelno == logging.ERROR assert record.args["retries"] == CONTENT_REPLAY_RETRIES - actually_failed.add(record.args["obj_id"]) - + assert record.args["operation"] in ("get_object", "put_object") + if record.args["operation"] == "get_object": + failed_get.add(record.args["obj_id"]) + else: + failed_put.add(record.args["obj_id"]) assert ( - actually_failed == definitely_failed + failed_put | failed_get == definitely_failed ), "Unexpected object copy failures; see captured log for details" + # check failed objects are referenced in redis + assert set(redisdb.keys()) == { + f"blob:{objid}".encode() for objid in definitely_failed + } + # and have a consistent error report in redis + for key in redisdb.keys(): + report = msgpack.loads(redisdb[key]) + assert report["operation"] in ("get_object", "put_object") + if report["operation"] == "get_object": + assert report["obj_id"] in failed_get + else: + assert report["obj_id"] in failed_put + + # check valid object are in the dst objstorage, but + # failed objects are not. for (sha1, content) in contents.items(): if hash_to_hex(sha1) in definitely_failed: assert sha1 not in objstorages["dst"]