diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -313,18 +313,20 @@ def copy_object(obj_id, src, dst): + hex_obj_id = hash_to_hex(obj_id) obj = '' try: with statsd.timed(CONTENT_DURATION_METRIC, tags={'request': 'get'}): obj = src.get(obj_id) - logger.debug('retrieved %s', hash_to_hex(obj_id)) + logger.debug('retrieved %(obj_id)s', {'obj_id': hex_obj_id}) with statsd.timed(CONTENT_DURATION_METRIC, tags={'request': 'put'}): dst.add(obj, obj_id=obj_id, check_presence=False) - logger.debug('copied %s', hash_to_hex(obj_id)) + logger.debug('copied %(obj_id)s', {'obj_id': hex_obj_id}) statsd.increment(CONTENT_BYTES_METRIC, len(obj)) except Exception as exc: - logger.error('Failed to copy %s: %s', hash_to_hex(obj_id), exc) + logger.error('Failed to copy %(obj_id)s: %(exc)s', + {'obj_id': hex_obj_id, 'exc': str(exc)}) raise return len(obj) diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -194,6 +194,46 @@ assert objstorages['dst'].get(sha1) == content +@_patch_objstorages(['src', 'dst']) +def test_replay_content_structured_log( + objstorages, + storage, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + caplog): + (_, kafka_port) = kafka_server + kafka_prefix += '.swh.journal.objects' + + contents = _fill_objstorage_and_kafka( + kafka_port, kafka_prefix, objstorages) + + caplog.set_level(logging.DEBUG, 'swh.journal.replay') + + expected_obj_ids = set(hash_to_hex(sha1) for sha1 in contents) + + result = invoke(False, [ + 'content-replay', + '--broker', '127.0.0.1:%d' % kafka_port, + '--group-id', kafka_consumer_group, + '--prefix', kafka_prefix, + '--max-messages', str(NUM_CONTENTS), + ]) + expected = r'Done.\n' + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + copied = set() + for record in caplog.records: + logtext = record.getMessage() + if 'copied' in logtext: + copied.add(record.args['obj_id']) + + assert copied == expected_obj_ids, ( + "Mismatched logging; see captured log output for details." + ) + + @_patch_objstorages(['src', 'dst']) def test_replay_content_static_group_id( objstorages, @@ -436,7 +476,7 @@ assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = 0 - not_in_src = 0 + not_in_src = set() for record in caplog.records: logtext = record.getMessage() if 'copied' in logtext: @@ -444,12 +484,14 @@ elif 'object not found' in logtext: # Check that the object id can be recovered from logs assert record.levelno == logging.ERROR - assert record.args[0] in contents_deleted - not_in_src += 1 + not_in_src.add(record.args['obj_id']) - assert (copied == NUM_CONTENTS - num_contents_deleted - and not_in_src == num_contents_deleted), ( - "Unexpected amount of objects copied, see the captured log for details" + assert copied == NUM_CONTENTS - num_contents_deleted, ( + "Unexpected number of contents copied" + ) + + assert not_in_src == contents_deleted, ( + "Mismatch between deleted contents and not_in_src logs" ) for (sha1, content) in contents.items():