diff --git a/swh/journal/cli.py b/swh/journal/cli.py --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -177,9 +177,12 @@ '(deprecated, use the config file instead)') @click.option('--exclude-sha1-file', default=None, type=click.File('rb'), help='File containing a sorted array of hashes to be excluded.') +@click.option('--check-dst/--no-check-dst', default=True, + help='Check whether the destination contains the object before ' + 'copying.') @click.pass_context def content_replay(ctx, max_messages, - brokers, prefix, group_id, exclude_sha1_file): + brokers, prefix, group_id, exclude_sha1_file, check_dst): """Fill a destination Object Storage (typically a mirror) by reading a Journal and retrieving objects from an existing source ObjStorage. @@ -197,6 +200,10 @@ and it must be sorted. This file will not be fully loaded into memory at any given time, so it can be arbitrarily large. + + `--check-dst` sets whether the replayer should check in the destination + ObjStorage before copying an object. You can turn that off if you know + you're copying to an empty ObjStorage. """ logger = logging.getLogger(__name__) conf = ctx.obj['config'] @@ -226,10 +233,10 @@ client = get_journal_client( ctx, brokers=brokers, prefix=prefix, group_id=group_id, max_messages=max_messages, object_types=('content',)) - worker_fn = functools.partial(process_replay_objects_content, - src=objstorage_src, - dst=objstorage_dst, - exclude_fn=exclude_fn) + worker_fn = functools.partial( + process_replay_objects_content, + src=objstorage_src, dst=objstorage_dst, exclude_fn=exclude_fn, + check_dst=check_dst) if notify: notify('READY=1') diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -322,7 +322,7 @@ def process_replay_objects_content(all_objects, *, src, dst, - exclude_fn=None): + exclude_fn=None, check_dst=True): """ Takes a list of records from Kafka (see :py:func:`swh.journal.client.JournalClient.process`) and copies them @@ -392,6 +392,11 @@ hash_to_hex(obj_id)) statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"}) + elif check_dst and obj_id in dst: + nb_skipped += 1 + logger.debug('skipped %s (in dst)', hash_to_hex(obj_id)) + statsd.increment(CONTENT_OPERATIONS_METRIC, + tags={"decision": "in_dst"}) else: vol.append(copy_object(obj_id, src, dst)) statsd.increment(CONTENT_OPERATIONS_METRIC, diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -272,3 +272,64 @@ else: assert sha1 in objstorages['dst'], sha1 assert objstorages['dst'].get(sha1) == content + + +NUM_CONTENTS_DST = 5 + + +@_patch_objstorages(['src', 'dst']) +@pytest.mark.parametrize("check_dst,expected_copied,expected_in_dst", [ + (True, NUM_CONTENTS - NUM_CONTENTS_DST, NUM_CONTENTS_DST), + (False, NUM_CONTENTS, 0), +]) +def test_replay_content_check_dst( + objstorages, + storage, + kafka_prefix: str, + kafka_server: Tuple[Popen, int], + check_dst: bool, + expected_copied: int, + expected_in_dst: int, + caplog): + (_, kafka_port) = kafka_server + kafka_prefix += '.swh.journal.objects' + + contents = _fill_objstorage_and_kafka( + kafka_port, kafka_prefix, objstorages) + + for i, (sha1, content) in enumerate(contents.items()): + if i >= NUM_CONTENTS_DST: + break + + objstorages['dst'].add(content, obj_id=sha1) + + caplog.set_level(logging.DEBUG, 'swh.journal.replay') + + result = invoke(False, [ + 'content-replay', + '--broker', '127.0.0.1:%d' % kafka_port, + '--group-id', 'test-cli-consumer', + '--prefix', kafka_prefix, + '--max-messages', str(NUM_CONTENTS), + '--check-dst' if check_dst else '--no-check-dst', + ]) + expected = r'Done.\n' + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + copied = 0 + in_dst = 0 + for record in caplog.records: + logtext = record.getMessage() + if 'copied' in logtext: + copied += 1 + elif 'in dst' in logtext: + in_dst += 1 + + assert (copied == expected_copied and in_dst == expected_in_dst), ( + "Unexpected amount of objects copied, see the captured log for details" + ) + + for (sha1, content) in contents.items(): + assert sha1 in objstorages['dst'], sha1 + assert objstorages['dst'].get(sha1) == content