Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/cli/admin.py
# Copyright (C) 2016-2019 The Software Heritage developers | # Copyright (C) 2016-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import logging | import logging | ||||
import time | import time | ||||
import click | import click | ||||
from . import cli | from . import cli | ||||
@cli.command('runner') | @cli.command('start-runner') | ||||
@click.option('--period', '-p', default=0, | @click.option('--period', '-p', default=0, | ||||
help=('Period (in s) at witch pending tasks are checked and ' | help=('Period (in s) at witch pending tasks are checked and ' | ||||
'executed. Set to 0 (default) for a one shot.')) | 'executed. Set to 0 (default) for a one shot.')) | ||||
@click.pass_context | @click.pass_context | ||||
def runner(ctx, period): | def runner(ctx, period): | ||||
"""Starts a swh-scheduler runner service. | """Starts a swh-scheduler runner service. | ||||
This process is responsible for checking for ready-to-run tasks and | This process is responsible for checking for ready-to-run tasks and | ||||
Show All 18 Lines | try: | ||||
logger.exception('Unexpected error in run_ready_tasks()') | logger.exception('Unexpected error in run_ready_tasks()') | ||||
if not period: | if not period: | ||||
break | break | ||||
time.sleep(period) | time.sleep(period) | ||||
except KeyboardInterrupt: | except KeyboardInterrupt: | ||||
ctx.exit(0) | ctx.exit(0) | ||||
@cli.command('listener') | @cli.command('start-listener') | ||||
@click.pass_context | @click.pass_context | ||||
def listener(ctx): | def listener(ctx): | ||||
"""Starts a swh-scheduler listener service. | """Starts a swh-scheduler listener service. | ||||
This service is responsible for listening at task lifecycle events and | This service is responsible for listening at task lifecycle events and | ||||
handle their workflow status in the database.""" | handle their workflow status in the database.""" | ||||
scheduler = ctx.obj['scheduler'] | scheduler = ctx.obj['scheduler'] | ||||
if not scheduler: | if not scheduler: | ||||
raise ValueError('Scheduler class (local/remote) must be instantiated') | raise ValueError('Scheduler class (local/remote) must be instantiated') | ||||
from swh.scheduler.celery_backend.config import build_app | from swh.scheduler.celery_backend.config import build_app | ||||
app = build_app(ctx.obj['config'].get('celery')) | app = build_app(ctx.obj['config'].get('celery')) | ||||
app.set_current() | app.set_current() | ||||
from swh.scheduler.celery_backend.listener import event_monitor | from swh.scheduler.celery_backend.listener import event_monitor | ||||
event_monitor(app, backend=scheduler) | event_monitor(app, backend=scheduler) | ||||
@cli.command('api-server') | @cli.command('rpc-serve') | ||||
@click.option('--host', default='0.0.0.0', | @click.option('--host', default='0.0.0.0', | ||||
help="Host to run the scheduler server api") | help="Host to run the scheduler server api") | ||||
@click.option('--port', default=5008, type=click.INT, | @click.option('--port', default=5008, type=click.INT, | ||||
help="Binding port of the server") | help="Binding port of the server") | ||||
@click.option('--debug/--nodebug', default=None, | @click.option('--debug/--nodebug', default=None, | ||||
help=("Indicates if the server should run in debug mode. " | help=("Indicates if the server should run in debug mode. " | ||||
"Defaults to True if log-level is DEBUG, False otherwise.") | "Defaults to True if log-level is DEBUG, False otherwise.") | ||||
) | ) | ||||
@click.pass_context | @click.pass_context | ||||
def api_server(ctx, host, port, debug): | def rpc_server(ctx, host, port, debug): | ||||
"""Starts a swh-scheduler API HTTP server. | """Starts a swh-scheduler API HTTP server. | ||||
""" | """ | ||||
if ctx.obj['config']['scheduler']['cls'] == 'remote': | if ctx.obj['config']['scheduler']['cls'] == 'remote': | ||||
click.echo("The API server can only be started with a 'local' " | click.echo("The API server can only be started with a 'local' " | ||||
"configuration", err=True) | "configuration", err=True) | ||||
ctx.exit(1) | ctx.exit(1) | ||||
from swh.scheduler.api import server | from swh.scheduler.api import server | ||||
server.app.config.update(ctx.obj['config']) | server.app.config.update(ctx.obj['config']) | ||||
if debug is None: | if debug is None: | ||||
debug = ctx.obj['log_level'] <= logging.DEBUG | debug = ctx.obj['log_level'] <= logging.DEBUG | ||||
server.app.run(host, port=port, debug=bool(debug)) | server.app.run(host, port=port, debug=bool(debug)) | ||||
@cli.command('updater') | @cli.command('start-updater') | ||||
@click.option('--verbose/--no-verbose', '-v', default=False, | @click.option('--verbose/--no-verbose', '-v', default=False, | ||||
help='Verbose mode') | help='Verbose mode') | ||||
@click.pass_context | @click.pass_context | ||||
def updater(ctx, verbose): | def updater(ctx, verbose): | ||||
"""Insert tasks in the scheduler from the scheduler-updater's events | """Starts a scheduler-updater service. | ||||
Insert tasks in the scheduler from the scheduler-updater's events read from | |||||
the db cache (filled e.g. by the ghtorrent consumer service) . | |||||
""" | """ | ||||
from swh.scheduler.updater.writer import UpdaterWriter | from swh.scheduler.updater.writer import UpdaterWriter | ||||
UpdaterWriter(**ctx.obj['config']).run() | UpdaterWriter(**ctx.obj['config']).run() | ||||
@cli.command('ghtorrent') | @cli.command('start-ghtorrent') | ||||
@click.option('--verbose/--no-verbose', '-v', default=False, | @click.option('--verbose/--no-verbose', '-v', default=False, | ||||
help='Verbose mode') | help='Verbose mode') | ||||
@click.pass_context | @click.pass_context | ||||
def ghtorrent(ctx, verbose): | def ghtorrent(ctx, verbose): | ||||
"""Consume events from ghtorrent and write them to cache. | """Starts a ghtorrent consumer service. | ||||
Consumes events from ghtorrent and write them to a cache. | |||||
""" | """ | ||||
from swh.scheduler.updater.ghtorrent import GHTorrentConsumer | from swh.scheduler.updater.ghtorrent import GHTorrentConsumer | ||||
from swh.scheduler.updater.backend import SchedulerUpdaterBackend | from swh.scheduler.updater.backend import SchedulerUpdaterBackend | ||||
ght_config = ctx.obj['config'].get('ghtorrent', {}) | ght_config = ctx.obj['config'].get('ghtorrent', {}) | ||||
back_config = ctx.obj['config'].get('scheduler_updater', {}) | back_config = ctx.obj['config'].get('scheduler_updater', {}) | ||||
backend = SchedulerUpdaterBackend(**back_config) | backend = SchedulerUpdaterBackend(**back_config) | ||||
GHTorrentConsumer(backend, **ght_config).run() | GHTorrentConsumer(backend, **ght_config).run() | ||||
# for bw compat | |||||
cli.add_alias(ghtorrent, 'ghtorrent') | |||||
cli.add_alias(listener, 'listerner') | |||||
ardumont: listener | |||||
cli.add_alias(runner, 'runner') | |||||
cli.add_alias(updater, 'updater') | |||||
cli.add_alias(rpc_server, 'serve') | |||||
cli.add_alias(rpc_server, 'api-server') |
listener