diff --git a/swh/objstorage/replayer/replay.py b/swh/objstorage/replayer/replay.py --- a/swh/objstorage/replayer/replay.py +++ b/swh/objstorage/replayer/replay.py @@ -16,6 +16,7 @@ from tenacity import ( retry, + retry_base, retry_if_exception_type, stop_after_attempt, wait_random_exponential, @@ -85,58 +86,44 @@ class ReplayError(Exception): """An error occurred during the replay of an object""" - def __init__(self, operation, *, obj_id, exc): - self.operation = operation + def __init__(self, *, obj_id, exc): self.obj_id = hash_to_hex(obj_id) self.exc = exc def __str__(self): - return "ReplayError(doing %s, %s, %s)" % (self.operation, self.obj_id, self.exc) + return "ReplayError(%s, %s)" % (self.obj_id, self.exc) def log_replay_retry(retry_state, sleep=None, last_result=None): """Log a retry of the content replayer""" - try: - exc = retry_state.outcome.exception() - attempt_number = retry_state.attempt_number - except AttributeError: - # tenacity < 5.0 - exc = last_result.exception() - attempt_number = retry_state.statistics["attempt_number"] - + exc = retry_state.outcome.exception() + operation = retry_state.fn.__name__ logger.debug( "Retry operation %(operation)s on %(obj_id)s: %(exc)s", - {"operation": exc.operation, "obj_id": exc.obj_id, "exc": str(exc.exc)}, - ) - - statsd.increment( - CONTENT_RETRY_METRIC, - tags={"operation": exc.operation, "attempt": str(attempt_number),}, + {"operation": operation, "obj_id": exc.obj_id, "exc": str(exc.exc)}, ) def log_replay_error(retry_state): """Log a replay error to sentry""" - try: - exc = retry_state.outcome.exception() - except AttributeError: - # tenacity < 5.0 - exc = retry_state.exception() + exc = retry_state.outcome.exception() with push_scope() as scope: - scope.set_tag("operation", exc.operation) + scope.set_tag("operation", retry_state.fn.__name__) scope.set_extra("obj_id", exc.obj_id) capture_exception(exc.exc) + error_context = { + "obj_id": exc.obj_id, + "operation": retry_state.fn.__name__, + "exc": str(exc.exc), + "retries": retry_state.attempt_number, + } + logger.error( "Failed operation %(operation)s on %(obj_id)s after %(retries)s" " retries: %(exc)s", - { - "obj_id": exc.obj_id, - "operation": exc.operation, - "exc": str(exc.exc), - "retries": retry_state.attempt_number, - }, + error_context, ) return None @@ -144,8 +131,24 @@ CONTENT_REPLAY_RETRIES = 3 + +class retry_log_if_success(retry_base): + """Log in statsd the number of attempts required to succeed""" + + def __call__(self, retry_state): + if not retry_state.outcome.failed: + statsd.increment( + CONTENT_RETRY_METRIC, + tags={ + "operation": retry_state.fn.__name__, + "attempt": str(retry_state.attempt_number), + }, + ) + return False + + content_replay_retry = retry( - retry=retry_if_exception_type(ReplayError), + retry=retry_if_exception_type(ReplayError) | retry_log_if_success(), stop=stop_after_attempt(CONTENT_REPLAY_RETRIES), wait=wait_random_exponential(multiplier=1, max=60), before_sleep=log_replay_retry, @@ -154,26 +157,39 @@ @content_replay_retry -def copy_object(obj_id, src, dst): - hex_obj_id = hash_to_hex(obj_id) - obj = "" +def get_object(objstorage, obj_id): try: with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "get"}): - obj = src.get(obj_id) - logger.debug("retrieved %(obj_id)s", {"obj_id": hex_obj_id}) - - with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "put"}): - dst.add(obj, obj_id=obj_id, check_presence=False) - logger.debug("copied %(obj_id)s", {"obj_id": hex_obj_id}) - statsd.increment(CONTENT_BYTES_METRIC, len(obj)) + obj = objstorage.get(obj_id) + logger.debug("retrieved %(obj_id)s", {"obj_id": hash_to_hex(obj_id)}) + return obj except ObjNotFoundError: logger.error( - "Failed to copy %(obj_id)s: object not found", {"obj_id": hex_obj_id} + "Failed to retrieve %(obj_id)s: object not found", + {"obj_id": hash_to_hex(obj_id)}, ) raise except Exception as exc: - raise ReplayError("copy", obj_id=obj_id, exc=exc) from None - return len(obj) + raise ReplayError(obj_id=obj_id, exc=exc) from None + + +@content_replay_retry +def put_object(objstorage, obj_id, obj): + try: + with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "put"}): + obj = objstorage.add(obj, obj_id, check_presence=False) + logger.debug("stored %(obj_id)s", {"obj_id": hash_to_hex(obj_id)}) + except Exception as exc: + raise ReplayError(obj_id=obj_id, exc=exc) from None + + +def copy_object(obj_id, src, dst): + obj = get_object(src, obj_id) + if obj is not None: + put_object(dst, obj_id, obj) + statsd.increment(CONTENT_BYTES_METRIC, len(obj)) + return len(obj) + return 0 @content_replay_retry @@ -182,7 +198,7 @@ try: return obj_id in dst except Exception as exc: - raise ReplayError("in_dst", obj_id=obj_id, exc=exc) from None + raise ReplayError(obj_id=obj_id, exc=exc) from None def process_replay_objects_content( diff --git a/swh/objstorage/replayer/tests/test_cli.py b/swh/objstorage/replayer/tests/test_cli.py --- a/swh/objstorage/replayer/tests/test_cli.py +++ b/swh/objstorage/replayer/tests/test_cli.py @@ -35,9 +35,10 @@ @pytest.fixture def monkeypatch_retry_sleep(monkeypatch): - from swh.objstorage.replayer.replay import copy_object, obj_in_objstorage + from swh.objstorage.replayer.replay import get_object, obj_in_objstorage, put_object - monkeypatch.setattr(copy_object.retry, "sleep", lambda x: None) + monkeypatch.setattr(get_object.retry, "sleep", lambda x: None) + monkeypatch.setattr(put_object.retry, "sleep", lambda x: None) monkeypatch.setattr(obj_in_objstorage.retry, "sleep", lambda x: None) @@ -179,7 +180,7 @@ copied = set() for record in caplog.records: logtext = record.getMessage() - if "copied" in logtext: + if "stored" in logtext: copied.add(record.args["obj_id"]) assert ( @@ -327,17 +328,22 @@ assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output - copied = 0 + retrieved = 0 + stored = 0 in_dst = 0 for record in caplog.records: logtext = record.getMessage() - if "copied" in logtext: - copied += 1 + if "retrieved" in logtext: + retrieved += 1 + elif "stored" in logtext: + stored += 1 elif "in dst" in logtext: in_dst += 1 assert ( - copied == expected_copied and in_dst == expected_in_dst + retrieved == expected_copied + and stored == expected_copied + and in_dst == expected_in_dst ), "Unexpected amount of objects copied, see the captured log for details" for (sha1, content) in contents.items(): @@ -378,27 +384,28 @@ kafka_consumer_group: str, kafka_server: Tuple[Popen, int], monkeypatch_retry_sleep, + caplog, ): contents = _fill_objstorage_and_kafka( kafka_server, kafka_prefix, objstorages["src"] ) + failures = {} for i, (sha1, content) in enumerate(contents.items()): if i >= NUM_CONTENTS_DST: break - objstorages["dst"].add(content, obj_id=sha1) failures["in", sha1] = 1 - orig_dst = objstorages["dst"] objstorages["dst"] = FlakyObjStorage(state=orig_dst.state, failures=failures) + caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") result = invoke( "replay", + "--check-dst", "--stop-after-objects", str(NUM_CONTENTS), - "--check-dst", journal_config={ "brokers": kafka_server, "group_id": kafka_consumer_group, @@ -409,6 +416,17 @@ assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + # check that exactly NUM_CONTENTS_DST 'in' operations have failed once + failed_in = 0 + for record in caplog.records: + logtext = record.getMessage() + if "Retry operation obj_in_objstorage" in logtext: + failed_in += 1 + elif "Retry operation" in logtext: + assert False, "No other failure expected than 'in' operations" + assert failed_in == NUM_CONTENTS_DST + + # in the end, the replay process should be OK for (sha1, content) in contents.items(): assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content @@ -459,6 +477,9 @@ if num_failures >= CONTENT_REPLAY_RETRIES: definitely_failed.add(hash_to_hex(sha1)) + assert add_failures + assert get_failures + assert definitely_failed objstorages["dst"] = FlakyObjStorage( state=objstorages["dst"].state, failures=add_failures, ) @@ -486,7 +507,7 @@ actually_failed = set() for record in caplog.records: logtext = record.getMessage() - if "copied" in logtext: + if "stored" in logtext: copied += 1 elif "Failed operation" in logtext: assert record.levelno == logging.ERROR @@ -521,11 +542,9 @@ num_contents_deleted = 5 contents_deleted = set() - for i, sha1 in enumerate(contents): if i >= num_contents_deleted: break - del objstorages["src"].state[sha1] contents_deleted.add(hash_to_hex(sha1)) @@ -549,12 +568,14 @@ not_in_src = set() for record in caplog.records: logtext = record.getMessage() - if "copied" in logtext: + if "stored" in logtext: copied += 1 elif "object not found" in logtext: # Check that the object id can be recovered from logs assert record.levelno == logging.ERROR not_in_src.add(record.args["obj_id"]) + elif "Retry operation" in logtext: + assert False, "Not found objects should not be retried" assert ( copied == NUM_CONTENTS - num_contents_deleted diff --git a/swh/objstorage/replayer/tests/test_statsd.py b/swh/objstorage/replayer/tests/test_statsd.py --- a/swh/objstorage/replayer/tests/test_statsd.py +++ b/swh/objstorage/replayer/tests/test_statsd.py @@ -80,6 +80,10 @@ # of each statsd message. prefix = "swh_content_replayer" expected_reports = { + # 4 because 2 for the copied objects + 2 for the in_dst ones + f"^{prefix}_retries_total:1[|]c[|]#attempt:1,operation:obj_in_objstorage$": 4, + f"^{prefix}_retries_total:1[|]c[|]#attempt:1,operation:get_object$": 2, + f"^{prefix}_retries_total:1[|]c[|]#attempt:1,operation:put_object$": 2, f"^{prefix}_duration_seconds:[0-9]+[.][0-9]+[|]ms[|]#request:get$": 2, f"^{prefix}_duration_seconds:[0-9]+[.][0-9]+[|]ms[|]#request:put$": 2, f"^{prefix}_bytes:4[|]c$": 2,