diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -19,7 +19,9 @@ from swh.model.identifiers import normalize_timestamp from swh.model.hashutil import hash_to_hex from swh.model.model import SHA1_SIZE -from swh.objstorage.objstorage import ID_HASH_ALGO, ObjStorage +from swh.objstorage.objstorage import ( + ID_HASH_ALGO, ObjNotFoundError, ObjStorage, +) from swh.storage import HashCollision logger = logging.getLogger(__name__) @@ -311,6 +313,7 @@ def copy_object(obj_id, src, dst): + obj = '' try: with statsd.timed(CONTENT_DURATION_METRIC, tags={'request': 'get'}): obj = src.get(obj_id) @@ -320,9 +323,8 @@ dst.add(obj, obj_id=obj_id, check_presence=False) logger.debug('copied %s', hash_to_hex(obj_id)) statsd.increment(CONTENT_BYTES_METRIC, len(obj)) - except Exception: - obj = '' - logger.error('Failed to copy %s', hash_to_hex(obj_id)) + except Exception as exc: + logger.error('Failed to copy %s: %s', hash_to_hex(obj_id), exc) raise return len(obj) @@ -420,9 +422,16 @@ statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "in_dst"}) else: - vol.append(copy_object(obj_id, src, dst)) - statsd.increment(CONTENT_OPERATIONS_METRIC, - tags={"decision": "copied"}) + try: + copied = copy_object(obj_id, src, dst) + except ObjNotFoundError: + nb_skipped += 1 + statsd.increment(CONTENT_OPERATIONS_METRIC, + tags={"decision": "not_in_src"}) + else: + vol.append(copied) + statsd.increment(CONTENT_OPERATIONS_METRIC, + tags={"decision": "copied"}) dt = time() - t0 logger.info( diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -16,6 +16,7 @@ from confluent_kafka import Producer import pytest +from swh.model.hashutil import hash_to_hex from swh.objstorage.backends.in_memory import InMemoryObjStorage from swh.storage import get_storage @@ -395,3 +396,64 @@ for (sha1, content) in contents.items(): assert dst_contains_failed[sha1] >= 0 assert dst_state[sha1] == content + + +@_patch_objstorages(['src', 'dst']) +def test_replay_content_objnotfound( + objstorages, + storage, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + caplog): + (_, kafka_port) = kafka_server + kafka_prefix += '.swh.journal.objects' + + contents = _fill_objstorage_and_kafka( + kafka_port, kafka_prefix, objstorages) + + num_contents_deleted = 5 + contents_deleted = set() + + for i, sha1 in enumerate(contents): + if i >= num_contents_deleted: + break + + del objstorages['src'].state[sha1] + contents_deleted.add(hash_to_hex(sha1)) + + caplog.set_level(logging.DEBUG, 'swh.journal.replay') + + result = invoke(False, [ + 'content-replay', + '--broker', '127.0.0.1:%d' % kafka_port, + '--group-id', kafka_consumer_group, + '--prefix', kafka_prefix, + '--max-messages', str(NUM_CONTENTS), + ]) + expected = r'Done.\n' + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + copied = 0 + not_in_src = 0 + for record in caplog.records: + logtext = record.getMessage() + if 'copied' in logtext: + copied += 1 + elif 'object not found' in logtext: + # Check that the object id can be recovered from logs + assert record.levelno == logging.ERROR + assert record.args[0] in contents_deleted + not_in_src += 1 + + assert (copied == NUM_CONTENTS - num_contents_deleted + and not_in_src == num_contents_deleted), ( + "Unexpected amount of objects copied, see the captured log for details" + ) + + for (sha1, content) in contents.items(): + if sha1 not in objstorages['src']: + continue + assert sha1 in objstorages['dst'], sha1 + assert objstorages['dst'].get(sha1) == content