diff --git a/swh/journal/cli.py b/swh/journal/cli.py --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -137,6 +137,9 @@ @cli.command() +@click.option('--max-messages', '-m', default=None, type=int, + help='Maximum number of objects to replay. Default is to ' + 'run forever.') @click.option('--concurrency', type=int, default=8, help='Concurrentcy level.') @@ -150,7 +153,7 @@ help='Name of the consumer/group id for reading from Kafka.' '(deprecated, use the config file instead)') @click.pass_context -def content_replay(ctx, concurrency, brokers, prefix, group_id): +def content_replay(ctx, max_messages, concurrency, brokers, prefix, group_id): """Fill a destination Object Storage (typically a mirror) by reading a Journal and retrieving objects from an existing source ObjStorage. @@ -184,13 +187,13 @@ try: nb_messages = 0 - while True: + while not max_messages or nb_messages < max_messages: nb_messages += client.process(worker_fn) logger.info('Processed %d messages.' % nb_messages) except KeyboardInterrupt: ctx.exit(0) else: - logger.info('Done.') + print('Done.') def main(): diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -13,6 +13,7 @@ from kafka import KafkaProducer import pytest +from swh.objstorage.backends.in_memory import InMemoryObjStorage from swh.storage.in_memory import Storage from swh.journal.cli import cli @@ -23,6 +24,14 @@ storage: cls: memory args: {} +objstorage_src: + cls: mocked + args: + name: src +objstorage_dst: + cls: mocked + args: + name: dst ''' @@ -87,4 +96,57 @@ **snapshot, 'next_branch': None} -# TODO: write a test for the content-replay command +@patch('swh.journal.cli.get_objstorage') +def test_replay_content( + get_objstorage_mock, + storage: Storage, + kafka_prefix: str, + kafka_server: Tuple[Popen, int]): + (_, port) = kafka_server + kafka_prefix += '.swh.journal.objects' + + producer = KafkaProducer( + bootstrap_servers='localhost:{}'.format(port), + key_serializer=key_to_kafka, + value_serializer=value_to_kafka, + client_id='test-producer', + ) + + objstorage_src = InMemoryObjStorage() + objstorage_dst = InMemoryObjStorage() + + contents = {} + for i in range(10): + content = b'\x00'*19 + bytes([i]) + sha1 = objstorage_src.add(content) + contents[sha1] = content + producer.send(topic=kafka_prefix+'.content', key=sha1, value={ + 'sha1': sha1, + 'status': 'visible', + }) + + def get_mock_objstorage(cls, args): + assert cls == 'mocked', cls + if args['name'] == 'src': + return objstorage_src + elif args['name'] == 'dst': + return objstorage_dst + else: + assert False, args + + get_objstorage_mock.side_effect = get_mock_objstorage + + result = invoke(False, [ + 'content-replay', + '--broker', 'localhost:%d' % port, + '--group-id', 'test-cli-consumer', + '--prefix', kafka_prefix, + '--max-messages', '2', + ]) + expected = r'Done.\n' + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + for (sha1, content) in contents.items(): + assert sha1 in objstorage_dst, sha1 + assert objstorage_dst.get(sha1) == content