diff --git a/swh/journal/cli.py b/swh/journal/cli.py --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -8,7 +8,10 @@ import os from swh.core import config +from swh.storage import get_storage + from swh.journal.publisher import JournalPublisher +from swh.journal.replay import StorageReplayer CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) @@ -66,6 +69,32 @@ ctx.exit(0) +@cli.command() +@click.option('--max-messages', '-m', default=None, type=int, + help='Maximum number of objects to replay. Default is to ' + 'run forever.') +@click.option('--broker', 'brokers', type=str, multiple=True, + help='Kafka broker to connect to.') +@click.option('--prefix', type=str, default='swh.journal.objects', + help='Prefix of Kafka topic names to read from.') +@click.option('--consumer-id', type=str, + help='Name of the consumer/group id for reading from Kafka.') +@click.pass_context +def replay(ctx, brokers, prefix, consumer_id, max_messages): + """Fill a new storage by reading a journal. + + """ + conf = ctx.obj['config'] + storage = get_storage(**conf.pop('storage')) + replayer = StorageReplayer(brokers, prefix, consumer_id) + try: + replayer.fill(storage, max_messages=max_messages) + except KeyboardInterrupt: + ctx.exit(0) + else: + print('Done.') + + def main(): return cli(auto_envvar_prefix='SWH_JOURNAL') diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -38,7 +38,7 @@ for object_type in object_types], ) - def fill(self, storage, max_messages): + def fill(self, storage, max_messages=None): num = 0 for message in self.consumer: object_type = message.topic.split('.')[-1] @@ -49,13 +49,13 @@ self.insert_object(storage, object_type, message.value) num += 1 - if num >= max_messages: + if max_messages and num >= max_messages: break return num def insert_object(self, storage, object_type, object_): if object_type in ('content', 'directory', 'revision', 'release', - 'origin'): + 'snapshot', 'origin'): if object_type == 'content': # TODO: we don't write contents in Kafka, so we need to # find a way to insert them somehow. diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py new file mode 100644 --- /dev/null +++ b/swh/journal/tests/test_cli.py @@ -0,0 +1,87 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import re +import tempfile +from subprocess import Popen +from typing import Tuple +from unittest.mock import patch + +from click.testing import CliRunner +from kafka import KafkaProducer +import pytest + +from swh.storage.in_memory import Storage + +from swh.journal.cli import cli +from swh.journal.serializers import key_to_kafka, value_to_kafka + + +CLI_CONFIG = ''' +storage: + cls: memory + args: {} +''' + + +@pytest.fixture +def storage(): + """An instance of swh.storage.in_memory.Storage that gets injected + into the CLI functions.""" + storage = Storage() + with patch('swh.journal.cli.get_storage') as get_storage_mock: + get_storage_mock.return_value = storage + yield storage + + +def invoke(catch_exceptions, args): + runner = CliRunner() + with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: + config_fd.write(CLI_CONFIG) + config_fd.seek(0) + args = ['-C' + config_fd.name, '-l', 'DEBUG'] + args + result = runner.invoke(cli, args) + if not catch_exceptions and result.exception: + print(result.output) + raise result.exception + return result + + +def test_replay( + storage: Storage, + kafka_prefix: str, + kafka_server: Tuple[Popen, int]): + (_, port) = kafka_server + kafka_prefix += '.swh.journal.objects' + + producer = KafkaProducer( + bootstrap_servers='localhost:{}'.format(port), + key_serializer=key_to_kafka, + value_serializer=value_to_kafka, + client_id='test-producer', + ) + + snapshot = {'id': b'foo', 'branches': { + b'HEAD': { + 'target_type': 'revision', + 'target': b'bar', + } + }} + producer.send( + topic=kafka_prefix+'.snapshot', key=snapshot['id'], value=snapshot) + + result = invoke(False, [ + 'replay', + '--broker', 'localhost:%d' % port, + '--consumer-id', 'test-cli-consumer', + '--prefix', kafka_prefix, + '--max-messages', '1', + ]) + expected = r'Done.\n' + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + assert storage.snapshot_get(snapshot['id']) == { + **snapshot, 'next_branch': None}