Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/cli.py
# Copyright (C) 2016-2019 The Software Heritage developers | # Copyright (C) 2016-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import click | import click | ||||
import logging | import logging | ||||
import os | import os | ||||
from swh.core import config | from swh.core import config | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.journal.publisher import JournalPublisher | |||||
from swh.journal.replay import StorageReplayer | from swh.journal.replay import StorageReplayer | ||||
from swh.journal.backfill import JournalBackfiller | |||||
CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) | CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) | ||||
@click.group(context_settings=CONTEXT_SETTINGS) | @click.group(context_settings=CONTEXT_SETTINGS) | ||||
@click.option('--config-file', '-C', default=None, | @click.option('--config-file', '-C', default=None, | ||||
type=click.Path(exists=True, dir_okay=False,), | type=click.Path(exists=True, dir_okay=False,), | ||||
help="Configuration file.") | help="Configuration file.") | ||||
Show All 29 Lines | def cli(ctx, config_file, log_level): | ||||
_log = logging.getLogger('kafka') | _log = logging.getLogger('kafka') | ||||
_log.setLevel(logging.INFO) | _log.setLevel(logging.INFO) | ||||
ctx.obj['config'] = conf | ctx.obj['config'] = conf | ||||
ctx.obj['loglevel'] = log_level | ctx.obj['loglevel'] = log_level | ||||
@cli.command() | @cli.command() | ||||
@click.pass_context | |||||
def publisher(ctx): | |||||
"""Manipulate publisher | |||||
""" | |||||
conf = ctx.obj['config'] | |||||
publisher = JournalPublisher(conf) | |||||
try: | |||||
while True: | |||||
publisher.poll() | |||||
except KeyboardInterrupt: | |||||
ctx.exit(0) | |||||
@cli.command() | |||||
@click.option('--max-messages', '-m', default=None, type=int, | @click.option('--max-messages', '-m', default=None, type=int, | ||||
help='Maximum number of objects to replay. Default is to ' | help='Maximum number of objects to replay. Default is to ' | ||||
'run forever.') | 'run forever.') | ||||
@click.option('--broker', 'brokers', type=str, multiple=True, | @click.option('--broker', 'brokers', type=str, multiple=True, | ||||
help='Kafka broker to connect to.') | help='Kafka broker to connect to.') | ||||
@click.option('--prefix', type=str, default='swh.journal.objects', | @click.option('--prefix', type=str, default='swh.journal.objects', | ||||
help='Prefix of Kafka topic names to read from.') | help='Prefix of Kafka topic names to read from.') | ||||
@click.option('--consumer-id', type=str, | @click.option('--consumer-id', type=str, | ||||
Show All 9 Lines | def replay(ctx, brokers, prefix, consumer_id, max_messages): | ||||
try: | try: | ||||
replayer.fill(storage, max_messages=max_messages) | replayer.fill(storage, max_messages=max_messages) | ||||
except KeyboardInterrupt: | except KeyboardInterrupt: | ||||
ctx.exit(0) | ctx.exit(0) | ||||
else: | else: | ||||
print('Done.') | print('Done.') | ||||
@cli.command() | |||||
@click.argument('object_type') | |||||
@click.option('--start-object', default=None) | |||||
@click.option('--end-object', default=None) | |||||
@click.option('--dry-run', is_flag=True, default=False) | |||||
@click.pass_context | |||||
def backfiller(ctx, object_type, start_object, end_object, dry_run): | |||||
"""Manipulate backfiller | |||||
""" | |||||
conf = ctx.obj['config'] | |||||
backfiller = JournalBackfiller(conf) | |||||
try: | |||||
backfiller.run( | |||||
object_type=object_type, | |||||
start_object=start_object, end_object=end_object, | |||||
dry_run=dry_run) | |||||
except KeyboardInterrupt: | |||||
ctx.exit(0) | |||||
def main(): | def main(): | ||||
return cli(auto_envvar_prefix='SWH_JOURNAL') | return cli(auto_envvar_prefix='SWH_JOURNAL') | ||||
if __name__ == '__main__': | if __name__ == '__main__': | ||||
main() | main() |