diff --git a/swh/journal/cli.py b/swh/journal/cli.py index 456f6d9..c2a6f60 100644 --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -1,74 +1,103 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import logging import os from swh.core import config +from swh.storage import get_storage + from swh.journal.publisher import JournalPublisher +from swh.journal.replay import StorageReplayer CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) @click.group(context_settings=CONTEXT_SETTINGS) @click.option('--config-file', '-C', default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.") @click.option('--log-level', '-l', default='INFO', type=click.Choice(logging._nameToLevel.keys()), help="Log level (default to INFO)") @click.pass_context def cli(ctx, config_file, log_level): """Software Heritage Scheduler CLI interface Default to use the the local scheduler instance (plugged to the main scheduler db). """ if not config_file: config_file = os.environ.get('SWH_CONFIG_FILENAME') if not config_file: raise ValueError('You must either pass a config-file parameter ' 'or set SWH_CONFIG_FILENAME to target ' 'the config-file') if not os.path.exists(config_file): raise ValueError('%s does not exist' % config_file) conf = config.read(config_file) ctx.ensure_object(dict) logger = logging.getLogger(__name__) logger.setLevel(log_level) _log = logging.getLogger('kafka') _log.setLevel(logging.INFO) ctx.obj['config'] = conf ctx.obj['loglevel'] = log_level @cli.command() @click.pass_context def publisher(ctx): """Manipulate publisher """ conf = ctx.obj['config'] publisher = JournalPublisher(conf) try: while True: publisher.poll() except KeyboardInterrupt: ctx.exit(0) +@cli.command() +@click.option('--max-messages', '-m', default=None, type=int, + help='Maximum number of objects to replay. Default is to ' + 'run forever.') +@click.option('--broker', 'brokers', type=str, multiple=True, + help='Kafka broker to connect to.') +@click.option('--prefix', type=str, default='swh.journal.objects', + help='Prefix of Kafka topic names to read from.') +@click.option('--consumer-id', type=str, + help='Name of the consumer/group id for reading from Kafka.') +@click.pass_context +def replay(ctx, brokers, prefix, consumer_id, max_messages): + """Fill a new storage by reading a journal. + + """ + conf = ctx.obj['config'] + storage = get_storage(**conf.pop('storage')) + replayer = StorageReplayer(brokers, prefix, consumer_id) + try: + replayer.fill(storage, max_messages=max_messages) + except KeyboardInterrupt: + ctx.exit(0) + else: + print('Done.') + + def main(): return cli(auto_envvar_prefix='SWH_JOURNAL') if __name__ == '__main__': main() diff --git a/swh/journal/replay.py b/swh/journal/replay.py index 2f064c6..d3edb4a 100644 --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -1,72 +1,72 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from kafka import KafkaConsumer from .serializers import kafka_to_value logger = logging.getLogger(__name__) OBJECT_TYPES = frozenset([ 'origin', 'origin_visit', 'snapshot', 'release', 'revision', 'directory', 'content', ]) class StorageReplayer: def __init__(self, brokers, prefix, consumer_id, object_types=OBJECT_TYPES): if not set(object_types).issubset(OBJECT_TYPES): raise ValueError('Unknown object types: %s' % ', '.join( set(object_types) - OBJECT_TYPES)) self._object_types = object_types self.consumer = KafkaConsumer( bootstrap_servers=brokers, value_deserializer=kafka_to_value, auto_offset_reset='earliest', enable_auto_commit=False, group_id=consumer_id, ) self.consumer.subscribe( topics=['%s.%s' % (prefix, object_type) for object_type in object_types], ) - def fill(self, storage, max_messages): + def fill(self, storage, max_messages=None): num = 0 for message in self.consumer: object_type = message.topic.split('.')[-1] # Got a message from a topic we did not subscribe to. assert object_type in self._object_types, object_type self.insert_object(storage, object_type, message.value) num += 1 - if num >= max_messages: + if max_messages and num >= max_messages: break return num def insert_object(self, storage, object_type, object_): if object_type in ('content', 'directory', 'revision', 'release', - 'origin'): + 'snapshot', 'origin'): if object_type == 'content': # TODO: we don't write contents in Kafka, so we need to # find a way to insert them somehow. object_['status'] = 'absent' method = getattr(storage, object_type + '_add') method([object_]) elif object_type == 'origin_visit': origin_id = storage.origin_add_one(object_.pop('origin')) visit = storage.origin_visit_add( origin=origin_id, date=object_.pop('date')) storage.origin_visit_update( origin_id, visit['visit'], **object_) else: assert False diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py new file mode 100644 index 0000000..8f236d3 --- /dev/null +++ b/swh/journal/tests/test_cli.py @@ -0,0 +1,87 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import re +import tempfile +from subprocess import Popen +from typing import Tuple +from unittest.mock import patch + +from click.testing import CliRunner +from kafka import KafkaProducer +import pytest + +from swh.storage.in_memory import Storage + +from swh.journal.cli import cli +from swh.journal.serializers import key_to_kafka, value_to_kafka + + +CLI_CONFIG = ''' +storage: + cls: memory + args: {} +''' + + +@pytest.fixture +def storage(): + """An instance of swh.storage.in_memory.Storage that gets injected + into the CLI functions.""" + storage = Storage() + with patch('swh.journal.cli.get_storage') as get_storage_mock: + get_storage_mock.return_value = storage + yield storage + + +def invoke(catch_exceptions, args): + runner = CliRunner() + with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: + config_fd.write(CLI_CONFIG) + config_fd.seek(0) + args = ['-C' + config_fd.name, '-l', 'DEBUG'] + args + result = runner.invoke(cli, args) + if not catch_exceptions and result.exception: + print(result.output) + raise result.exception + return result + + +def test_replay( + storage: Storage, + kafka_prefix: str, + kafka_server: Tuple[Popen, int]): + (_, port) = kafka_server + kafka_prefix += '.swh.journal.objects' + + producer = KafkaProducer( + bootstrap_servers='localhost:{}'.format(port), + key_serializer=key_to_kafka, + value_serializer=value_to_kafka, + client_id='test-producer', + ) + + snapshot = {'id': b'foo', 'branches': { + b'HEAD': { + 'target_type': 'revision', + 'target': b'bar', + } + }} + producer.send( + topic=kafka_prefix+'.snapshot', key=snapshot['id'], value=snapshot) + + result = invoke(False, [ + 'replay', + '--broker', 'localhost:%d' % port, + '--consumer-id', 'test-cli-consumer', + '--prefix', kafka_prefix, + '--max-messages', '1', + ]) + expected = r'Done.\n' + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + assert storage.snapshot_get(snapshot['id']) == { + **snapshot, 'next_branch': None}