diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -8,12 +8,16 @@ from time import time from typing import Callable, Dict, List, Optional +from sentry_sdk import capture_exception, push_scope try: from systemd.daemon import notify except ImportError: notify = None -from tenacity import retry, stop_after_attempt, wait_random_exponential +from tenacity import ( + retry, retry_if_exception_type, stop_after_attempt, + wait_random_exponential, +) from swh.core.statsd import statsd from swh.model.identifiers import normalize_timestamp @@ -29,6 +33,7 @@ GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total" +CONTENT_RETRY_METRIC = "swh_content_replayer_retries_total" CONTENT_BYTES_METRIC = "swh_content_replayer_bytes" CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds" @@ -312,6 +317,62 @@ return get_hash(left) == hash_ +class ReplayError(Exception): + """An error occurred during the replay of an object""" + def __init__(self, operation, *, obj_id, exc): + self.operation = operation + self.obj_id = hash_to_hex(obj_id) + self.exc = exc + + def __str__(self): + return "ReplayError(doing %s, %s, %s)" % ( + self.operation, self.obj_id, self.exc + ) + + +def log_replay_retry(retry_obj, sleep, last_result): + """Log a retry of the content replayer""" + exc = last_result.exception() + logger.debug('Retry operation %(operation)s on %(obj_id)s: %(exc)s', + {'operation': exc.operation, 'obj_id': exc.obj_id, + 'exc': str(exc.exc)}) + + statsd.increment(CONTENT_RETRY_METRIC, tags={ + 'operation': exc.operation, + 'attempt': str(retry_obj.statistics['attempt_number']), + }) + + +def log_replay_error(last_attempt): + """Log a replay error to sentry""" + exc = last_attempt.exception() + with push_scope() as scope: + scope.set_tag('operation', exc.operation) + scope.set_extra('obj_id', exc.obj_id) + capture_exception(exc.exc) + + logger.error( + 'Failed operation %(operation)s on %(obj_id)s after %(retries)s' + ' retries: %(exc)s', { + 'obj_id': exc.obj_id, 'operation': exc.operation, + 'exc': str(exc.exc), 'retries': last_attempt.attempt_number, + }) + + return None + + +CONTENT_REPLAY_RETRIES = 3 + +content_replay_retry = retry( + retry=retry_if_exception_type(ReplayError), + stop=stop_after_attempt(CONTENT_REPLAY_RETRIES), + wait=wait_random_exponential(multiplier=1, max=60), + before_sleep=log_replay_retry, + retry_error_callback=log_replay_error, +) + + +@content_replay_retry def copy_object(obj_id, src, dst): hex_obj_id = hash_to_hex(obj_id) obj = '' @@ -324,19 +385,22 @@ dst.add(obj, obj_id=obj_id, check_presence=False) logger.debug('copied %(obj_id)s', {'obj_id': hex_obj_id}) statsd.increment(CONTENT_BYTES_METRIC, len(obj)) - except Exception as exc: - logger.error('Failed to copy %(obj_id)s: %(exc)s', - {'obj_id': hex_obj_id, 'exc': str(exc)}) + except ObjNotFoundError: + logger.error('Failed to copy %(obj_id)s: object not found', + {'obj_id': hex_obj_id}) raise + except Exception as exc: + raise ReplayError('copy', obj_id=obj_id, exc=exc) from None return len(obj) -@retry(stop=stop_after_attempt(3), - reraise=True, - wait=wait_random_exponential(multiplier=1, max=60)) +@content_replay_retry def obj_in_objstorage(obj_id, dst): """Check if an object is already in an objstorage, tenaciously""" - return obj_id in dst + try: + return obj_id in dst + except Exception as exc: + raise ReplayError('in_dst', obj_id=obj_id, exc=exc) from None def process_replay_objects_content( @@ -395,6 +459,7 @@ """ vol = [] nb_skipped = 0 + nb_failures = 0 t0 = time() for (object_type, objects) in all_objects.items(): @@ -431,9 +496,14 @@ statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "not_in_src"}) else: - vol.append(copied) - statsd.increment(CONTENT_OPERATIONS_METRIC, - tags={"decision": "copied"}) + if copied is None: + nb_failures += 1 + statsd.increment(CONTENT_OPERATIONS_METRIC, + tags={"decision": "failed"}) + else: + vol.append(copied) + statsd.increment(CONTENT_OPERATIONS_METRIC, + tags={"decision": "copied"}) dt = time() - t0 logger.info( @@ -442,7 +512,7 @@ len(vol), dt, len(vol)/dt, sum(vol)/1024/1024/dt, - len([x for x in vol if not x]), + nb_failures, nb_skipped) if notify: diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -21,6 +21,7 @@ from swh.storage import get_storage from swh.journal.cli import cli +from swh.journal.replay import CONTENT_REPLAY_RETRIES from swh.journal.serializers import key_to_kafka, value_to_kafka @@ -58,6 +59,13 @@ yield storage +@pytest.fixture +def monkeypatch_retry_sleep(monkeypatch): + from swh.journal.replay import copy_object, obj_in_objstorage + monkeypatch.setattr(copy_object.retry, 'sleep', lambda x: None) + monkeypatch.setattr(obj_in_objstorage.retry, 'sleep', lambda x: None) + + def invoke(catch_exceptions, args, env=None): runner = CliRunner() with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: @@ -417,7 +425,8 @@ storage, kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int]): + kafka_server: Tuple[Popen, int], + monkeypatch_retry_sleep): (_, kafka_port) = kafka_server kafka_prefix += '.swh.journal.objects' @@ -453,6 +462,98 @@ assert objstorages['dst'].get(sha1) == content +@_patch_objstorages(['src', 'dst']) +def test_replay_content_failed_copy_retry( + objstorages, + storage, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + caplog, + monkeypatch_retry_sleep): + (_, kafka_port) = kafka_server + kafka_prefix += '.swh.journal.objects' + + contents = _fill_objstorage_and_kafka( + kafka_port, kafka_prefix, objstorages) + + add_failures = {} + get_failures = {} + definitely_failed = set() + + # We want to generate operations failing 1 to CONTENT_REPLAY_RETRIES times. + # We generate failures for 2 different operations, get and add. + num_retry_contents = 2 * CONTENT_REPLAY_RETRIES + + assert num_retry_contents < NUM_CONTENTS, ( + "Need to generate more test contents to properly test retry behavior" + ) + + for i, sha1 in enumerate(contents): + if i >= num_retry_contents: + break + + # This generates a number of failures, up to CONTENT_REPLAY_RETRIES + num_failures = (i % CONTENT_REPLAY_RETRIES) + 1 + + # This generates failures of add for the first CONTENT_REPLAY_RETRIES + # objects, then failures of get. + if i < CONTENT_REPLAY_RETRIES: + add_failures['add', sha1] = num_failures + else: + get_failures['get', sha1] = num_failures + + # Only contents that have CONTENT_REPLAY_RETRIES or more are + # definitely failing + if num_failures >= CONTENT_REPLAY_RETRIES: + definitely_failed.add(hash_to_hex(sha1)) + + objstorages['dst'] = FlakyObjStorage( + state=objstorages['dst'].state, + failures=add_failures, + ) + objstorages['src'] = FlakyObjStorage( + state=objstorages['src'].state, + failures=get_failures, + ) + + caplog.set_level(logging.DEBUG, 'swh.journal.replay') + + result = invoke(False, [ + 'content-replay', + '--broker', '127.0.0.1:%d' % kafka_port, + '--group-id', kafka_consumer_group, + '--prefix', kafka_prefix, + '--max-messages', str(NUM_CONTENTS), + ]) + expected = r'Done.\n' + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + copied = 0 + actually_failed = set() + for record in caplog.records: + logtext = record.getMessage() + if 'copied' in logtext: + copied += 1 + elif 'Failed operation' in logtext: + assert record.levelno == logging.ERROR + assert record.args['retries'] == CONTENT_REPLAY_RETRIES + actually_failed.add(record.args['obj_id']) + + assert actually_failed == definitely_failed, ( + 'Unexpected object copy failures; see captured log for details' + ) + + for (sha1, content) in contents.items(): + if hash_to_hex(sha1) in definitely_failed: + assert sha1 not in objstorages['dst'] + continue + + assert sha1 in objstorages['dst'], sha1 + assert objstorages['dst'].get(sha1) == content + + @_patch_objstorages(['src', 'dst']) def test_replay_content_objnotfound( objstorages,