Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/cli.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import functools | |||||
import click | import click | ||||
from swh.core import config | from swh.core import config | ||||
from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup | from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup | ||||
from swh.journal.cli import get_journal_client | |||||
from swh.scheduler import get_scheduler | from swh.scheduler import get_scheduler | ||||
from swh.scheduler.cli_utils import schedule_origin_batches | from swh.scheduler.cli_utils import schedule_origin_batches | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.indexer import metadata_dictionary | from swh.indexer import metadata_dictionary | ||||
from swh.indexer.journal_client import process_journal_objects | |||||
from swh.indexer.storage import get_indexer_storage | from swh.indexer.storage import get_indexer_storage | ||||
from swh.indexer.storage.api.server import load_and_check_config, app | from swh.indexer.storage.api.server import load_and_check_config, app | ||||
@click.group(name='indexer', context_settings=CONTEXT_SETTINGS, | @click.group(name='indexer', context_settings=CONTEXT_SETTINGS, | ||||
cls=AliasedGroup) | cls=AliasedGroup) | ||||
@click.option('--config-file', '-C', default=None, | @click.option('--config-file', '-C', default=None, | ||||
type=click.Path(exists=True, dir_okay=False,), | type=click.Path(exists=True, dir_okay=False,), | ||||
▲ Show 20 Lines • Show All 129 Lines • ▼ Show 20 Lines | |||||
def schedule_origin_metadata_reindex( | def schedule_origin_metadata_reindex( | ||||
ctx, origin_batch_size, tool_ids, mappings, task_type): | ctx, origin_batch_size, tool_ids, mappings, task_type): | ||||
"""Schedules indexing tasks for origins that were already indexed.""" | """Schedules indexing tasks for origins that were already indexed.""" | ||||
idx_storage = ctx.obj['indexer_storage'] | idx_storage = ctx.obj['indexer_storage'] | ||||
scheduler = ctx.obj['scheduler'] | scheduler = ctx.obj['scheduler'] | ||||
origins = list_origins_by_producer(idx_storage, mappings, tool_ids) | origins = list_origins_by_producer(idx_storage, mappings, tool_ids) | ||||
kwargs = {"policy_update": "update-dups", "parse_ids": False} | kwargs = {"policy_update": "update-dups"} | ||||
schedule_origin_batches( | schedule_origin_batches( | ||||
scheduler, task_type, origins, origin_batch_size, kwargs) | scheduler, task_type, origins, origin_batch_size, kwargs) | ||||
@cli.command('journal-client') | |||||
@click.option('--scheduler-url', '-s', default=None, | |||||
help="URL of the scheduler API") | |||||
@click.option('--origin-metadata-task-type', | |||||
default='index-origin-metadata', | |||||
help='Name of the task running the origin metadata indexer.') | |||||
@click.option('--broker', 'brokers', type=str, multiple=True, | |||||
help='Kafka broker to connect to.') | |||||
@click.option('--prefix', type=str, default=None, | |||||
help='Prefix of Kafka topic names to read from.') | |||||
@click.option('--group-id', '--consumer-id', type=str, | |||||
help='Name of the consumer/group id for reading from Kafka.') | |||||
@click.option('--max-messages', '-m', default=None, type=int, | |||||
help='Maximum number of objects to replay. Default is to ' | |||||
'run forever.') | |||||
@click.pass_context | |||||
def journal_client(ctx, scheduler_url, origin_metadata_task_type, | |||||
douardda: What is this new command supposed to do? Shouldn't it come with a usage/description (docstring)? | |||||
Done Inline ActionsI couldn't find a better name. Suggestions welcome. vlorentz: I couldn't find a better name. Suggestions welcome. | |||||
brokers, prefix, group_id, max_messages): | |||||
scheduler = _get_api( | |||||
get_scheduler, | |||||
ctx.obj['config'], | |||||
'scheduler', | |||||
scheduler_url | |||||
) | |||||
client = get_journal_client( | |||||
ctx, brokers, prefix, group_id, object_types=['origin_visit']) | |||||
worker_fn = functools.partial( | |||||
process_journal_objects, | |||||
scheduler=scheduler, | |||||
task_names={ | |||||
'origin_metadata': origin_metadata_task_type, | |||||
} | |||||
) | |||||
nb_messages = 0 | |||||
try: | |||||
while not max_messages or nb_messages < max_messages: | |||||
nb_messages += client.process(worker_fn) | |||||
print('Processed %d messages.' % nb_messages) | |||||
except KeyboardInterrupt: | |||||
ctx.exit(0) | |||||
else: | |||||
print('Done.') | |||||
@cli.command('rpc-serve') | @cli.command('rpc-serve') | ||||
@click.argument('config-path', required=1) | @click.argument('config-path', required=1) | ||||
@click.option('--host', default='0.0.0.0', help="Host to run the server") | @click.option('--host', default='0.0.0.0', help="Host to run the server") | ||||
@click.option('--port', default=5007, type=click.INT, | @click.option('--port', default=5007, type=click.INT, | ||||
help="Binding port of the server") | help="Binding port of the server") | ||||
@click.option('--debug/--nodebug', default=True, | @click.option('--debug/--nodebug', default=True, | ||||
help="Indicates if the server should run in debug mode") | help="Indicates if the server should run in debug mode") | ||||
def rpc_server(config_path, host, port, debug): | def rpc_server(config_path, host, port, debug): | ||||
Show All 16 Lines |
What is this new command supposed to do? Shouldn't it come with a usage/description (docstring)? Is journal-client a good name for it?