diff --git a/swh/journal/cli.py b/swh/journal/cli.py --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -137,6 +137,9 @@ @cli.command() +@click.option('--max-messages', '-m', default=None, type=int, + help='Maximum number of objects to replay. Default is to ' + 'run forever.') @click.option('--concurrency', type=int, default=8, help='Concurrentcy level.') @@ -150,7 +153,7 @@ help='Name of the group id for reading from Kafka.' '(deprecated, use the config file instead)') @click.pass_context -def content_replay(ctx, concurrency, brokers, prefix, group_id): +def content_replay(ctx, max_messages, concurrency, brokers, prefix, group_id): """Fill a destination Object Storage (typically a mirror) by reading a Journal and retrieving objects from an existing source ObjStorage. @@ -184,13 +187,13 @@ try: nb_messages = 0 - while True: + while not max_messages or nb_messages < max_messages: nb_messages += client.process(worker_fn) logger.info('Processed %d messages.' % nb_messages) except KeyboardInterrupt: ctx.exit(0) else: - logger.info('Done.') + print('Done.') def main(): diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -3,6 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import functools import re import tempfile from subprocess import Popen @@ -13,6 +14,7 @@ from kafka import KafkaProducer import pytest +from swh.objstorage.backends.in_memory import InMemoryObjStorage from swh.storage.in_memory import Storage from swh.journal.cli import cli @@ -23,6 +25,14 @@ storage: cls: memory args: {} +objstorage_src: + cls: mocked + args: + name: src +objstorage_dst: + cls: mocked + args: + name: dst ''' @@ -87,4 +97,71 @@ **snapshot, 'next_branch': None} -# TODO: write a test for the content-replay command +def _patch_objstorages(names): + objstorages = {name: InMemoryObjStorage() for name in names} + + def get_mock_objstorage(cls, args): + assert cls == 'mocked', cls + return objstorages[args['name']] + + def decorator(f): + @functools.wraps(f) + @patch('swh.journal.cli.get_objstorage') + def newf(get_objstorage_mock, *args, **kwargs): + get_objstorage_mock.side_effect = get_mock_objstorage + f(*args, objstorages=objstorages, **kwargs) + + return newf + + return decorator + + +def _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages): + producer = KafkaProducer( + bootstrap_servers='localhost:{}'.format(kafka_port), + key_serializer=key_to_kafka, + value_serializer=value_to_kafka, + client_id='test-producer', + ) + + contents = {} + for i in range(10): + content = b'\x00'*19 + bytes([i]) + sha1 = objstorages['src'].add(content) + contents[sha1] = content + producer.send(topic=kafka_prefix+'.content', key=sha1, value={ + 'sha1': sha1, + 'status': 'visible', + }) + + producer.flush() + + return contents + + +@_patch_objstorages(['src', 'dst']) +def test_replay_content( + objstorages, + storage: Storage, + kafka_prefix: str, + kafka_server: Tuple[Popen, int]): + (_, kafka_port) = kafka_server + kafka_prefix += '.swh.journal.objects' + + contents = _fill_objstorage_and_kafka( + kafka_port, kafka_prefix, objstorages) + + result = invoke(False, [ + 'content-replay', + '--broker', 'localhost:%d' % kafka_port, + '--group-id', 'test-cli-consumer', + '--prefix', kafka_prefix, + '--max-messages', '10', + ]) + expected = r'Done.\n' + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + for (sha1, content) in contents.items(): + assert sha1 in objstorages['dst'], sha1 + assert objstorages['dst'].get(sha1) == content