diff --git a/swh/journal/cli.py b/swh/journal/cli.py index c2a6f60..e0d3090 100644 --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -1,103 +1,105 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import logging import os from swh.core import config from swh.storage import get_storage from swh.journal.publisher import JournalPublisher from swh.journal.replay import StorageReplayer CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) @click.group(context_settings=CONTEXT_SETTINGS) @click.option('--config-file', '-C', default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.") @click.option('--log-level', '-l', default='INFO', type=click.Choice(logging._nameToLevel.keys()), help="Log level (default to INFO)") @click.pass_context def cli(ctx, config_file, log_level): """Software Heritage Scheduler CLI interface Default to use the the local scheduler instance (plugged to the main scheduler db). """ if not config_file: config_file = os.environ.get('SWH_CONFIG_FILENAME') if not config_file: raise ValueError('You must either pass a config-file parameter ' 'or set SWH_CONFIG_FILENAME to target ' 'the config-file') if not os.path.exists(config_file): raise ValueError('%s does not exist' % config_file) conf = config.read(config_file) ctx.ensure_object(dict) - logger = logging.getLogger(__name__) - logger.setLevel(log_level) + logging.basicConfig( + level=log_level, + format='%(asctime)s %(levelname)s %(name)s %(message)s', + ) _log = logging.getLogger('kafka') _log.setLevel(logging.INFO) ctx.obj['config'] = conf ctx.obj['loglevel'] = log_level @cli.command() @click.pass_context def publisher(ctx): """Manipulate publisher """ conf = ctx.obj['config'] publisher = JournalPublisher(conf) try: while True: publisher.poll() except KeyboardInterrupt: ctx.exit(0) @cli.command() @click.option('--max-messages', '-m', default=None, type=int, help='Maximum number of objects to replay. Default is to ' 'run forever.') @click.option('--broker', 'brokers', type=str, multiple=True, help='Kafka broker to connect to.') @click.option('--prefix', type=str, default='swh.journal.objects', help='Prefix of Kafka topic names to read from.') @click.option('--consumer-id', type=str, help='Name of the consumer/group id for reading from Kafka.') @click.pass_context def replay(ctx, brokers, prefix, consumer_id, max_messages): """Fill a new storage by reading a journal. """ conf = ctx.obj['config'] storage = get_storage(**conf.pop('storage')) replayer = StorageReplayer(brokers, prefix, consumer_id) try: replayer.fill(storage, max_messages=max_messages) except KeyboardInterrupt: ctx.exit(0) else: print('Done.') def main(): return cli(auto_envvar_prefix='SWH_JOURNAL') if __name__ == '__main__': main()