diff --git a/swh/objstorage/replayer/replay.py b/swh/objstorage/replayer/replay.py --- a/swh/objstorage/replayer/replay.py +++ b/swh/objstorage/replayer/replay.py @@ -16,6 +16,7 @@ from tenacity import ( retry, + retry_base, retry_if_exception_type, stop_after_attempt, wait_random_exponential, @@ -85,58 +86,44 @@ class ReplayError(Exception): """An error occurred during the replay of an object""" - def __init__(self, operation, *, obj_id, exc): - self.operation = operation + def __init__(self, *, obj_id, exc): self.obj_id = hash_to_hex(obj_id) self.exc = exc def __str__(self): - return "ReplayError(doing %s, %s, %s)" % (self.operation, self.obj_id, self.exc) + return "ReplayError(%s, %s)" % (self.obj_id, self.exc) def log_replay_retry(retry_state, sleep=None, last_result=None): """Log a retry of the content replayer""" - try: - exc = retry_state.outcome.exception() - attempt_number = retry_state.attempt_number - except AttributeError: - # tenacity < 5.0 - exc = last_result.exception() - attempt_number = retry_state.statistics["attempt_number"] - + exc = retry_state.outcome.exception() + operation = retry_state.fn.__name__ logger.debug( "Retry operation %(operation)s on %(obj_id)s: %(exc)s", - {"operation": exc.operation, "obj_id": exc.obj_id, "exc": str(exc.exc)}, - ) - - statsd.increment( - CONTENT_RETRY_METRIC, - tags={"operation": exc.operation, "attempt": str(attempt_number),}, + {"operation": operation, "obj_id": exc.obj_id, "exc": str(exc.exc)}, ) def log_replay_error(retry_state): """Log a replay error to sentry""" - try: - exc = retry_state.outcome.exception() - except AttributeError: - # tenacity < 5.0 - exc = retry_state.exception() + exc = retry_state.outcome.exception() with push_scope() as scope: - scope.set_tag("operation", exc.operation) + scope.set_tag("operation", retry_state.fn.__name__) scope.set_extra("obj_id", exc.obj_id) capture_exception(exc.exc) + error_context = { + "obj_id": exc.obj_id, + "operation": retry_state.fn.__name__, + "exc": str(exc.exc), + "retries": retry_state.attempt_number, + } + logger.error( "Failed operation %(operation)s on %(obj_id)s after %(retries)s" " retries: %(exc)s", - { - "obj_id": exc.obj_id, - "operation": exc.operation, - "exc": str(exc.exc), - "retries": retry_state.attempt_number, - }, + error_context, ) return None @@ -144,8 +131,24 @@ CONTENT_REPLAY_RETRIES = 3 + +class retry_log_if_success(retry_base): + """Log in statsd the number of attempts required to succeed""" + + def __call__(self, retry_state): + if not retry_state.outcome.failed: + statsd.increment( + CONTENT_RETRY_METRIC, + tags={ + "operation": retry_state.fn.__name__, + "attempt": str(retry_state.attempt_number), + }, + ) + return False + + content_replay_retry = retry( - retry=retry_if_exception_type(ReplayError), + retry=retry_if_exception_type(ReplayError) | retry_log_if_success(), stop=stop_after_attempt(CONTENT_REPLAY_RETRIES), wait=wait_random_exponential(multiplier=1, max=60), before_sleep=log_replay_retry, @@ -154,26 +157,38 @@ @content_replay_retry -def copy_object(obj_id, src, dst): - hex_obj_id = hash_to_hex(obj_id) - obj = "" +def get_object(objstorage, obj_id): try: with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "get"}): - obj = src.get(obj_id) - logger.debug("retrieved %(obj_id)s", {"obj_id": hex_obj_id}) - - with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "put"}): - dst.add(obj, obj_id=obj_id, check_presence=False) - logger.debug("copied %(obj_id)s", {"obj_id": hex_obj_id}) - statsd.increment(CONTENT_BYTES_METRIC, len(obj)) + obj = objstorage.get(obj_id) + logger.debug("retrieved %(obj_id)s", {"obj_id": hash_to_hex(obj_id)}) + return obj except ObjNotFoundError: logger.error( - "Failed to copy %(obj_id)s: object not found", {"obj_id": hex_obj_id} + "Failed to retrieve %(obj_id)s: object not found", + {"obj_id": hash_to_hex(obj_id)}, ) raise except Exception as exc: - raise ReplayError("copy", obj_id=obj_id, exc=exc) from None - return len(obj) + raise ReplayError(obj_id=obj_id, exc=exc) from None + + +@content_replay_retry +def put_object(objstorage, obj_id, obj): + try: + with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "get"}): + obj = objstorage.add(obj, obj_id, check_presence=False) + logger.debug("stored %(obj_id)s", {"obj_id": hash_to_hex(obj_id)}) + except Exception as exc: + raise ReplayError(obj_id=obj_id, exc=exc) from None + + +def copy_object(obj_id, src, dst): + obj = get_object(src, obj_id) + if obj is not None: + put_object(dst, obj_id, obj) + return len(obj) + return 0 @content_replay_retry @@ -182,7 +197,7 @@ try: return obj_id in dst except Exception as exc: - raise ReplayError("in_dst", obj_id=obj_id, exc=exc) from None + raise ReplayError(obj_id=obj_id, exc=exc) from None def process_replay_objects_content( diff --git a/swh/objstorage/replayer/tests/test_cli.py b/swh/objstorage/replayer/tests/test_cli.py --- a/swh/objstorage/replayer/tests/test_cli.py +++ b/swh/objstorage/replayer/tests/test_cli.py @@ -35,9 +35,10 @@ @pytest.fixture def monkeypatch_retry_sleep(monkeypatch): - from swh.objstorage.replayer.replay import copy_object, obj_in_objstorage + from swh.objstorage.replayer.replay import get_object, obj_in_objstorage, put_object - monkeypatch.setattr(copy_object.retry, "sleep", lambda x: None) + monkeypatch.setattr(get_object.retry, "sleep", lambda x: None) + monkeypatch.setattr(put_object.retry, "sleep", lambda x: None) monkeypatch.setattr(obj_in_objstorage.retry, "sleep", lambda x: None) @@ -88,7 +89,7 @@ NUM_CONTENTS = 10 -def _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages): +def _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorage): producer = Producer( { "bootstrap.servers": kafka_server, @@ -100,7 +101,7 @@ contents = {} for i in range(NUM_CONTENTS): content = b"\x00" * 19 + bytes([i]) - sha1 = objstorages["src"].add(content) + sha1 = objstorage.add(content) contents[sha1] = content producer.produce( topic=kafka_prefix + ".content", @@ -120,8 +121,11 @@ kafka_consumer_group: str, kafka_server: Tuple[Popen, int], ): + """Check the content replayer in normal conditions""" - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka( + kafka_server, kafka_prefix, objstorages["src"] + ) result = invoke( "replay", @@ -151,8 +155,11 @@ kafka_server: Tuple[Popen, int], caplog, ): + """Check the logs produced by the content replayer in normal conditions""" - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka( + kafka_server, kafka_prefix, objstorages["src"] + ) caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") @@ -175,7 +182,7 @@ copied = set() for record in caplog.records: logtext = record.getMessage() - if "copied" in logtext: + if "stored" in logtext: copied.add(record.args["obj_id"]) assert ( @@ -191,8 +198,14 @@ kafka_server: Tuple[Popen, int], caplog, ): + """Check the content replayer in normal conditions - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) + with KAFKA_GROUP_INSTANCE_ID set + """ + + contents = _fill_objstorage_and_kafka( + kafka_server, kafka_prefix, objstorages["src"] + ) # Setup log capture to fish the consumer settings out of the log messages caplog.set_level(logging.DEBUG, "swh.journal.client") @@ -238,8 +251,14 @@ kafka_consumer_group: str, kafka_server: Tuple[Popen, int], ): + """Check the content replayer in normal conditions - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) + with a exclusion file (--exclude-sha1-file) + """ + + contents = _fill_objstorage_and_kafka( + kafka_server, kafka_prefix, objstorages["src"] + ) excluded_contents = list(contents)[0::2] # picking half of them with tempfile.NamedTemporaryFile(mode="w+b") as fd: @@ -292,13 +311,22 @@ expected_in_dst: int, caplog, ): + """Check the content replayer in normal conditions + + with some objects already in the dst objstorage. + + When check_dst is True, expect those not to be neither retrieved from the + src objstorage nor pushed in the dst objstorage. + """ - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka( + kafka_server, kafka_prefix, objstorages["src"] + ) + # add some objects in the dst objstorage for i, (sha1, content) in enumerate(contents.items()): if i >= NUM_CONTENTS_DST: break - objstorages["dst"].add(content, obj_id=sha1) caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") @@ -318,17 +346,22 @@ assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output - copied = 0 + retrieved = 0 + stored = 0 in_dst = 0 for record in caplog.records: logtext = record.getMessage() - if "copied" in logtext: - copied += 1 + if "retrieved" in logtext: + retrieved += 1 + elif "stored" in logtext: + stored += 1 elif "in dst" in logtext: in_dst += 1 assert ( - copied == expected_copied and in_dst == expected_in_dst + retrieved == expected_copied + and stored == expected_copied + and in_dst == expected_in_dst ), "Unexpected amount of objects copied, see the captured log for details" for (sha1, content) in contents.items(): @@ -337,6 +370,20 @@ class FlakyObjStorage(InMemoryObjStorage): + """Flaky objstorage + + Any 'get', 'add' or 'in' (i.e. '__contains__()') operation will fail + according to configured 'failures'. + + 'failures' is expected to be a dict which keys are couples (operation, + obj_id) and values are the number of time the operation 'operation' is + expected to fail for object 'obj_id' before being performed successfully. + + An optional state ('state') can be also given as argument (see + InMemoryObjStorage). + + """ + def __init__(self, *args, **kwargs): state = kwargs.pop("state") self.failures_left = Counter(kwargs.pop("failures")) @@ -369,26 +416,33 @@ kafka_consumer_group: str, kafka_server: Tuple[Popen, int], monkeypatch_retry_sleep, + caplog, ): + """Check the content replayer with a flaky dst objstorage - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) + for 'in' operations. + """ + contents = _fill_objstorage_and_kafka( + kafka_server, kafka_prefix, objstorages["src"] + ) + # build a flaky dst objstorage in which the 'in' operation for the first + # NUM_CONTENT_DST objects will fail once failures = {} for i, (sha1, content) in enumerate(contents.items()): if i >= NUM_CONTENTS_DST: break - objstorages["dst"].add(content, obj_id=sha1) failures["in", sha1] = 1 - orig_dst = objstorages["dst"] objstorages["dst"] = FlakyObjStorage(state=orig_dst.state, failures=failures) + caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") result = invoke( "replay", + "--check-dst", "--stop-after-objects", str(NUM_CONTENTS), - "--check-dst", journal_config={ "brokers": kafka_server, "group_id": kafka_consumer_group, @@ -399,6 +453,17 @@ assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + # check that exactly NUM_CONTENTS_DST 'in' operations have failed once + failed_in = 0 + for record in caplog.records: + logtext = record.getMessage() + if "Retry operation obj_in_objstorage" in logtext: + failed_in += 1 + elif "Retry operation" in logtext: + assert False, "No other failure expected than 'in' operations" + assert failed_in == NUM_CONTENTS_DST + + # in the end, the replay process should be OK for (sha1, content) in contents.items(): assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content @@ -413,8 +478,13 @@ caplog, monkeypatch_retry_sleep, ): + """Check the content replayer with a flaky src and dst objstorages - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) + for 'get' and 'add' operations. + """ + contents = _fill_objstorage_and_kafka( + kafka_server, kafka_prefix, objstorages["src"] + ) add_failures = {} get_failures = {} @@ -447,6 +517,9 @@ if num_failures >= CONTENT_REPLAY_RETRIES: definitely_failed.add(hash_to_hex(sha1)) + assert add_failures + assert get_failures + assert definitely_failed objstorages["dst"] = FlakyObjStorage( state=objstorages["dst"].state, failures=add_failures, ) @@ -474,7 +547,7 @@ actually_failed = set() for record in caplog.records: logtext = record.getMessage() - if "copied" in logtext: + if "stored" in logtext: copied += 1 elif "Failed operation" in logtext: assert record.levelno == logging.ERROR @@ -502,16 +575,18 @@ kafka_server: Tuple[Popen, int], caplog, ): + """Check the ContentNotFound is not considered a failure to retry""" - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka( + kafka_server, kafka_prefix, objstorages["src"] + ) + # delete a few objects from the src objstorage num_contents_deleted = 5 contents_deleted = set() - for i, sha1 in enumerate(contents): if i >= num_contents_deleted: break - del objstorages["src"].state[sha1] contents_deleted.add(hash_to_hex(sha1)) @@ -535,12 +610,14 @@ not_in_src = set() for record in caplog.records: logtext = record.getMessage() - if "copied" in logtext: + if "stored" in logtext: copied += 1 elif "object not found" in logtext: # Check that the object id can be recovered from logs assert record.levelno == logging.ERROR not_in_src.add(record.args["obj_id"]) + elif "Retry operation" in logtext: + assert False, "Not found objects should not be retried" assert ( copied == NUM_CONTENTS - num_contents_deleted