Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/cli.py
# Copyright (C) 2016-2019 The Software Heritage developers | # Copyright (C) 2016-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import click | import click | ||||
import functools | import functools | ||||
import logging | import logging | ||||
import os | import os | ||||
from swh.core import config | from swh.core import config | ||||
from swh.core.cli import CONTEXT_SETTINGS | from swh.core.cli import CONTEXT_SETTINGS | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.objstorage import get_objstorage | from swh.objstorage import get_objstorage | ||||
from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES | from swh.journal.client import JournalClient | ||||
from swh.journal.replay import process_replay_objects | from swh.journal.replay import process_replay_objects | ||||
from swh.journal.replay import process_replay_objects_content | from swh.journal.replay import process_replay_objects_content | ||||
from swh.journal.backfill import JournalBackfiller | from swh.journal.backfill import JournalBackfiller | ||||
@click.group(name='journal', context_settings=CONTEXT_SETTINGS) | @click.group(name='journal', context_settings=CONTEXT_SETTINGS) | ||||
@click.option('--config-file', '-C', default=None, | @click.option('--config-file', '-C', default=None, | ||||
type=click.Path(exists=True, dir_okay=False,), | type=click.Path(exists=True, dir_okay=False,), | ||||
Show All 20 Lines | def cli(ctx, config_file): | ||||
log_level = ctx.obj.get('log_level', logging.INFO) | log_level = ctx.obj.get('log_level', logging.INFO) | ||||
logging.root.setLevel(log_level) | logging.root.setLevel(log_level) | ||||
logging.getLogger('kafka').setLevel(logging.INFO) | logging.getLogger('kafka').setLevel(logging.INFO) | ||||
ctx.obj['config'] = conf | ctx.obj['config'] = conf | ||||
def get_journal_client(ctx, brokers, prefix, group_id, | def get_journal_client(ctx, brokers, prefix, group_id, object_types=None): | ||||
object_types=ACCEPTED_OBJECT_TYPES): | |||||
conf = ctx.obj['config'] | conf = ctx.obj['config'] | ||||
if brokers is None: | if not brokers: | ||||
brokers = conf.get('journal', {}).get('brokers') | brokers = conf.get('journal', {}).get('brokers') | ||||
if not brokers: | if not brokers: | ||||
ctx.fail('You must specify at least one kafka broker.') | ctx.fail('You must specify at least one kafka broker.') | ||||
if not isinstance(brokers, (list, tuple)): | if not isinstance(brokers, (list, tuple)): | ||||
brokers = [brokers] | brokers = [brokers] | ||||
if prefix is None: | if prefix is None: | ||||
prefix = conf.get('journal', {}).get('prefix') | prefix = conf.get('journal', {}).get('prefix') | ||||
if group_id is None: | if group_id is None: | ||||
group_id = conf.get('journal', {}).get('group_id') | group_id = conf.get('journal', {}).get('group_id') | ||||
return JournalClient( | kwargs = dict(brokers=brokers, group_id=group_id, prefix=prefix) | ||||
brokers=brokers, group_id=group_id, prefix=prefix, | if object_types: | ||||
object_types=object_types) | kwargs['object_types'] = object_types | ||||
return JournalClient(**kwargs) | |||||
@cli.command() | @cli.command() | ||||
@click.option('--max-messages', '-m', default=None, type=int, | @click.option('--max-messages', '-m', default=None, type=int, | ||||
help='Maximum number of objects to replay. Default is to ' | help='Maximum number of objects to replay. Default is to ' | ||||
'run forever.') | 'run forever.') | ||||
@click.option('--broker', 'brokers', type=str, multiple=True, | @click.option('--broker', 'brokers', type=str, multiple=True, | ||||
hidden=True, # prefer config file | hidden=True, # prefer config file | ||||
▲ Show 20 Lines • Show All 96 Lines • ▼ Show 20 Lines | except KeyError: | ||||
ctx.fail('You must have a source objstorage configured in ' | ctx.fail('You must have a source objstorage configured in ' | ||||
'your config file.') | 'your config file.') | ||||
try: | try: | ||||
objstorage_dst = get_objstorage(**conf.pop('objstorage_dst')) | objstorage_dst = get_objstorage(**conf.pop('objstorage_dst')) | ||||
except KeyError: | except KeyError: | ||||
ctx.fail('You must have a destination objstorage configured ' | ctx.fail('You must have a destination objstorage configured ' | ||||
'in your config file.') | 'in your config file.') | ||||
client = get_journal_client(ctx, brokers, prefix, group_id) | client = get_journal_client(ctx, brokers, prefix, group_id, | ||||
object_types=('content',)) | |||||
worker_fn = functools.partial(process_replay_objects_content, | worker_fn = functools.partial(process_replay_objects_content, | ||||
src=objstorage_src, | src=objstorage_src, | ||||
dst=objstorage_dst) | dst=objstorage_dst) | ||||
try: | try: | ||||
nb_messages = 0 | nb_messages = 0 | ||||
while True: | while True: | ||||
nb_messages += client.process(worker_fn) | nb_messages += client.process(worker_fn) | ||||
Show All 14 Lines |