Changeset View
Changeset View
Standalone View
Standalone View
swh/search/cli.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import functools | import functools | ||||
import click | import click | ||||
Show All 31 Lines | |||||
@cli.group('journal-client') | @cli.group('journal-client') | ||||
@click.pass_context | @click.pass_context | ||||
def journal_client(ctx): | def journal_client(ctx): | ||||
"""""" | """""" | ||||
pass | pass | ||||
@journal_client.command('objects') | @journal_client.command('objects') | ||||
@click.option('--max-messages', '-m', default=None, type=int, | @click.option('--stop-after-objects', '-s', default=None, type=int, | ||||
help='Maximum number of objects to replay. Default is to ' | help='Maximum number of objects to replay. Default is to ' | ||||
'run forever.') | 'run forever.') | ||||
@click.pass_context | @click.pass_context | ||||
def journal_client_objects(ctx, max_messages): | def journal_client_objects(ctx, stop_after_objects): | ||||
"""Listens for new objects from the SWH Journal, and schedules tasks | """Listens for new objects from the SWH Journal, and schedules tasks | ||||
to run relevant indexers (currently, only origin) | to run relevant indexers (currently, only origin) | ||||
on these new objects.""" | on these new objects.""" | ||||
client = get_journal_client( | client = get_journal_client( | ||||
ctx, object_types=['origin', 'origin_visit'], | ctx, object_types=['origin', 'origin_visit'], | ||||
max_messages=max_messages) | stop_after_objects=stop_after_objects) | ||||
search = get_search(**ctx.obj['config']['search']) | search = get_search(**ctx.obj['config']['search']) | ||||
worker_fn = functools.partial( | worker_fn = functools.partial( | ||||
process_journal_objects, | process_journal_objects, | ||||
search=search, | search=search, | ||||
) | ) | ||||
nb_messages = 0 | nb_messages = 0 | ||||
try: | try: | ||||
while not max_messages or nb_messages < max_messages: | nb_messages = client.process(worker_fn) | ||||
nb_messages += client.process(worker_fn) | |||||
print('Processed %d messages.' % nb_messages) | print('Processed %d messages.' % nb_messages) | ||||
olasd: `client.process` does not need an outer loop, it handles that itself. | |||||
except KeyboardInterrupt: | except KeyboardInterrupt: | ||||
ctx.exit(0) | ctx.exit(0) | ||||
else: | else: | ||||
print('Done.') | print('Done.') | ||||
finally: | |||||
client.close() | |||||
Not Done Inline ActionsPlease add a finally: client.close() clause so that the client properly shuts down (ref. swh.journal.cli) olasd: Please add a `finally: client.close()` clause so that the client properly shuts down (ref. `swh. | |||||
@cli.command('rpc-serve') | @cli.command('rpc-serve') | ||||
@click.argument('config-path', required=True) | @click.argument('config-path', required=True) | ||||
@click.option('--host', default='0.0.0.0', help="Host to run the server") | @click.option('--host', default='0.0.0.0', help="Host to run the server") | ||||
@click.option('--port', default=5010, type=click.INT, | @click.option('--port', default=5010, type=click.INT, | ||||
help="Binding port of the server") | help="Binding port of the server") | ||||
@click.option('--debug/--nodebug', default=True, | @click.option('--debug/--nodebug', default=True, | ||||
help="Indicates if the server should run in debug mode") | help="Indicates if the server should run in debug mode") | ||||
def rpc_server(config_path, host, port, debug): | def rpc_server(config_path, host, port, debug): | ||||
"""Starts a Software Heritage Indexer RPC HTTP server.""" | """Starts a Software Heritage Indexer RPC HTTP server.""" | ||||
api_cfg = load_and_check_config(config_path, type='any') | api_cfg = load_and_check_config(config_path, type='any') | ||||
app.config.update(api_cfg) | app.config.update(api_cfg) | ||||
app.run(host, port=int(port), debug=bool(debug)) | app.run(host, port=int(port), debug=bool(debug)) |
client.process does not need an outer loop, it handles that itself.