diff --git a/swh/journal/cli.py b/swh/journal/cli.py index f4e8acf..80ea7d0 100644 --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -1,118 +1,145 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import functools import logging import os from swh.core import config +from swh.core.cli import CONTEXT_SETTINGS from swh.storage import get_storage from swh.journal.client import JournalClient from swh.journal.replay import process_replay_objects from swh.journal.backfill import JournalBackfiller -CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) - @click.group(name='journal', context_settings=CONTEXT_SETTINGS) @click.option('--config-file', '-C', default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.") -@click.option('--log-level', '-l', default='INFO', - type=click.Choice(logging._nameToLevel.keys()), - help="Log level (default to INFO)") @click.pass_context -def cli(ctx, config_file, log_level): - """Software Heritage Scheduler CLI interface +def cli(ctx, config_file): + """Software Heritage Journal tools. - Default to use the the local scheduler instance (plugged to the - main scheduler db). + The journal is a persistent logger of changes to the archive, with + publish-subscribe support. """ if not config_file: config_file = os.environ.get('SWH_CONFIG_FILENAME') - if not config_file: - raise ValueError('You must either pass a config-file parameter ' - 'or set SWH_CONFIG_FILENAME to target ' - 'the config-file') - if not os.path.exists(config_file): - raise ValueError('%s does not exist' % config_file) + if config_file: + if not os.path.exists(config_file): + raise ValueError('%s does not exist' % config_file) + conf = config.read(config_file) + else: + conf = {} - conf = config.read(config_file) ctx.ensure_object(dict) - logging.basicConfig( - level=log_level, - format='%(asctime)s %(levelname)s %(name)s %(message)s', - ) - + log_level = ctx.obj.get('log_level', logging.INFO) + logging.root.setLevel(log_level) logging.getLogger('kafka').setLevel(logging.INFO) ctx.obj['config'] = conf - ctx.obj['loglevel'] = log_level @cli.command() @click.option('--max-messages', '-m', default=None, type=int, help='Maximum number of objects to replay. Default is to ' 'run forever.') @click.option('--broker', 'brokers', type=str, multiple=True, + hidden=True, # prefer config file help='Kafka broker to connect to.') -@click.option('--prefix', type=str, default='swh.journal.objects', +@click.option('--prefix', type=str, default=None, + hidden=True, # prefer config file help='Prefix of Kafka topic names to read from.') -@click.option('--consumer-id', type=str, +@click.option('--group-id', '--consumer-id', type=str, + hidden=True, # prefer config file help='Name of the consumer/group id for reading from Kafka.') @click.pass_context -def replay(ctx, brokers, prefix, consumer_id, max_messages): - """Fill a new storage by reading a journal. +def replay(ctx, brokers, prefix, group_id, max_messages): + """Fill a Storage by reading a Journal. + There can be several 'replayers' filling a Storage as long as they use + the same `group-id`. """ - conf = ctx.obj['config'] logger = logging.getLogger(__name__) - logger.setLevel(ctx.obj['loglevel']) - storage = get_storage(**conf.pop('storage')) - client = JournalClient(brokers, prefix, consumer_id) + conf = ctx.obj['config'] + try: + storage = get_storage(**conf.pop('storage')) + except KeyError: + ctx.fail('You must have a storage configured in your config file.') + + if brokers is None: + brokers = conf.get('journal', {}).get('brokers') + if not brokers: + ctx.fail('You must specify at least one kafka broker.') + if not isinstance(brokers, (list, tuple)): + brokers = [brokers] + + if prefix is None: + prefix = conf.get('journal', {}).get('prefix') + + if group_id is None: + group_id = conf.get('journal', {}).get('group_id') + + client = JournalClient(brokers=brokers, group_id=group_id, prefix=prefix) worker_fn = functools.partial(process_replay_objects, storage=storage) + try: nb_messages = 0 while not max_messages or nb_messages < max_messages: nb_messages += client.process(worker_fn) logger.info('Processed %d messages.' % nb_messages) except KeyboardInterrupt: ctx.exit(0) else: print('Done.') @cli.command() @click.argument('object_type') @click.option('--start-object', default=None) @click.option('--end-object', default=None) @click.option('--dry-run', is_flag=True, default=False) @click.pass_context def backfiller(ctx, object_type, start_object, end_object, dry_run): - """Manipulate backfiller + """Run the backfiller + + The backfiller list objects from a Storage and produce journal entries from + there. + + Typically used to rebuild a journal or compensate for missing objects in a + journal (eg. due to a downtime of this later). + + The configuration file requires the following entries: + - brokers: a list of kafka endpoints (the journal) in which entries will be + added. + - storage_dbconn: URL to connect to the storage DB. + - prefix: the prefix of the topics (topics will be .). + - client_id: the kafka client ID. """ conf = ctx.obj['config'] backfiller = JournalBackfiller(conf) try: backfiller.run( object_type=object_type, start_object=start_object, end_object=end_object, dry_run=dry_run) except KeyboardInterrupt: ctx.exit(0) def main(): + logging.basicConfig() return cli(auto_envvar_prefix='SWH_JOURNAL') if __name__ == '__main__': main() diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py index 8f236d3..6530571 100644 --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -1,87 +1,87 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re import tempfile from subprocess import Popen from typing import Tuple from unittest.mock import patch from click.testing import CliRunner from kafka import KafkaProducer import pytest from swh.storage.in_memory import Storage from swh.journal.cli import cli from swh.journal.serializers import key_to_kafka, value_to_kafka CLI_CONFIG = ''' storage: cls: memory args: {} ''' @pytest.fixture def storage(): """An instance of swh.storage.in_memory.Storage that gets injected into the CLI functions.""" storage = Storage() with patch('swh.journal.cli.get_storage') as get_storage_mock: get_storage_mock.return_value = storage yield storage def invoke(catch_exceptions, args): runner = CliRunner() with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: config_fd.write(CLI_CONFIG) config_fd.seek(0) - args = ['-C' + config_fd.name, '-l', 'DEBUG'] + args + args = ['-C' + config_fd.name] + args result = runner.invoke(cli, args) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test_replay( storage: Storage, kafka_prefix: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' producer = KafkaProducer( bootstrap_servers='localhost:{}'.format(port), key_serializer=key_to_kafka, value_serializer=value_to_kafka, client_id='test-producer', ) snapshot = {'id': b'foo', 'branches': { b'HEAD': { 'target_type': 'revision', 'target': b'bar', } }} producer.send( topic=kafka_prefix+'.snapshot', key=snapshot['id'], value=snapshot) result = invoke(False, [ 'replay', '--broker', 'localhost:%d' % port, - '--consumer-id', 'test-cli-consumer', + '--group-id', 'test-cli-consumer', '--prefix', kafka_prefix, '--max-messages', '1', ]) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output assert storage.snapshot_get(snapshot['id']) == { **snapshot, 'next_branch': None}