Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/cli.py
Show All 9 Lines | |||||
import json | import json | ||||
import locale | import locale | ||||
import logging | import logging | ||||
import time | import time | ||||
from swh.core import utils, config | from swh.core import utils, config | ||||
from . import compute_nb_tasks_from | from . import compute_nb_tasks_from | ||||
from .backend_es import SWHElasticSearchClient | from .backend_es import SWHElasticSearchClient | ||||
from . import get_scheduler | from . import get_scheduler, DEFAULT_CONFIG | ||||
locale.setlocale(locale.LC_ALL, '') | locale.setlocale(locale.LC_ALL, '') | ||||
ARROW_LOCALE = locale.getlocale(locale.LC_TIME)[0] | ARROW_LOCALE = locale.getlocale(locale.LC_TIME)[0] | ||||
class DateTimeType(click.ParamType): | class DateTimeType(click.ParamType): | ||||
name = 'time and date' | name = 'time and date' | ||||
▲ Show 20 Lines • Show All 51 Lines • ▼ Show 20 Lines | lines += [ | ||||
click.style(' Keyword args:\n', bold=True), | click.style(' Keyword args:\n', bold=True), | ||||
pretty_print_dict(task['arguments']['kwargs'], indent=4), | pretty_print_dict(task['arguments']['kwargs'], indent=4), | ||||
] | ] | ||||
return ''.join(lines) | return ''.join(lines) | ||||
@click.group(context_settings=CONTEXT_SETTINGS) | @click.group(context_settings=CONTEXT_SETTINGS) | ||||
@click.option('--cls', '-c', default='local', | @click.option('--config-file', '-C', default=None, | ||||
type=click.Choice(['local', 'remote']), | type=click.Path(exists=True, dir_okay=False,), | ||||
help="Scheduler's class, default to 'local'") | help="Configuration file.") | ||||
@click.option('--database', '-d', | @click.option('--database', '-d', default=None, | ||||
help="Scheduling database DSN (if cls is 'local')") | help="Scheduling database DSN (imply cls is 'local')") | ||||
@click.option('--url', '-u', | @click.option('--url', '-u', default=None, | ||||
help="Scheduler's url access (if cls is 'remote')") | help="Scheduler's url access (imply cls is 'remote')") | ||||
@click.option('--log-level', '-l', default='INFO', | @click.option('--log-level', '-l', default='INFO', | ||||
type=click.Choice(logging._nameToLevel.keys()), | type=click.Choice(logging._nameToLevel.keys()), | ||||
help="Log level (default to INFO)") | help="Log level (default to INFO)") | ||||
@click.pass_context | @click.pass_context | ||||
def cli(ctx, cls, database, url, log_level): | def cli(ctx, config_file, cls, database, url, log_level): | ||||
"""Software Heritage Scheduler CLI interface | """Software Heritage Scheduler CLI interface | ||||
Default to use the the local scheduler instance (plugged to the | Default to use the the local scheduler instance (plugged to the | ||||
main scheduler db). | main scheduler db). | ||||
""" | """ | ||||
from swh.scheduler.celery_backend.config import setup_log_handler | from swh.scheduler.celery_backend.config import setup_log_handler | ||||
log_level = setup_log_handler( | log_level = setup_log_handler( | ||||
loglevel=log_level, colorize=False, | loglevel=log_level, colorize=False, | ||||
format='[%(levelname)s] %(name)s -- %(message)s') | format='[%(levelname)s] %(name)s -- %(message)s') | ||||
ctx.ensure_object(dict) | ctx.ensure_object(dict) | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
scheduler = None | scheduler = None | ||||
override_config = {} | conf = config.read(config_file, DEFAULT_CONFIG) | ||||
if 'scheduler' not in conf: | |||||
raise ValueError("missing 'scheduler' configuration") | |||||
if database: | |||||
conf['scheduler']['cls'] = 'local' | |||||
conf['scheduler']['args']['db'] = database | |||||
elif url: | |||||
conf['scheduler']['cls'] = 'local' | |||||
conf['scheduler']['args']['url'] = url | |||||
sched_conf = conf['scheduler'] | |||||
try: | try: | ||||
if cls == 'local' and database: | logger.debug('Instanciating scheduler with %s' % ( | ||||
override_config = {'scheduling_db': database} | sched_conf)) | ||||
elif cls == 'remote' and url: | scheduler = get_scheduler(**sched_conf) | ||||
override_config = {'url': url} | |||||
logger.debug('Instanciating scheduler %s with %s' % ( | |||||
cls, override_config)) | |||||
scheduler = get_scheduler(cls, args=override_config) | |||||
except Exception: | except Exception: | ||||
# it's the subcommand to decide whether not having a proper | # it's the subcommand to decide whether not having a proper | ||||
# scheduler instance is a problem. | # scheduler instance is a problem. | ||||
pass | pass | ||||
ctx.obj['scheduler'] = scheduler | ctx.obj['scheduler'] = scheduler | ||||
ctx.obj['config'] = {'cls': cls, 'args': override_config} | ctx.obj['config'] = conf | ||||
ctx.obj['loglevel'] = log_level | ctx.obj['loglevel'] = log_level | ||||
@cli.group('task') | @cli.group('task') | ||||
@click.pass_context | @click.pass_context | ||||
def task(ctx): | def task(ctx): | ||||
"""Manipulate tasks.""" | """Manipulate tasks.""" | ||||
pass | pass | ||||
▲ Show 20 Lines • Show All 388 Lines • ▼ Show 20 Lines | if not scheduler: | ||||
raise ValueError('Scheduler class (local/remote) must be instantiated') | raise ValueError('Scheduler class (local/remote) must be instantiated') | ||||
from swh.scheduler.celery_backend.listener import ( | from swh.scheduler.celery_backend.listener import ( | ||||
event_monitor, main_app) | event_monitor, main_app) | ||||
event_monitor(main_app, backend=scheduler) | event_monitor(main_app, backend=scheduler) | ||||
@cli.command('api-server') | @cli.command('api-server') | ||||
@click.argument('config-path', required=1) | |||||
@click.option('--host', default='0.0.0.0', | @click.option('--host', default='0.0.0.0', | ||||
help="Host to run the scheduler server api") | help="Host to run the scheduler server api") | ||||
@click.option('--port', default=5008, type=click.INT, | @click.option('--port', default=5008, type=click.INT, | ||||
help="Binding port of the server") | help="Binding port of the server") | ||||
@click.option('--debug/--nodebug', default=None, | @click.option('--debug/--nodebug', default=None, | ||||
help=("Indicates if the server should run in debug mode. " | help=("Indicates if the server should run in debug mode. " | ||||
"Defaults to True if log-level is DEBUG, False otherwise.") | "Defaults to True if log-level is DEBUG, False otherwise.") | ||||
) | ) | ||||
@click.pass_context | @click.pass_context | ||||
def api_server(ctx, config_path, host, port, debug): | def api_server(ctx, host, port, debug): | ||||
"""Starts a swh-scheduler API HTTP server. | """Starts a swh-scheduler API HTTP server. | ||||
""" | """ | ||||
if ctx.obj['config']['cls'] == 'remote': | if ctx.obj['config']['scheduler']['cls'] == 'remote': | ||||
click.echo("The API server can only be started with a 'local' " | click.echo("The API server can only be started with a 'local' " | ||||
"configuration", err=True) | "configuration", err=True) | ||||
ctx.exit(1) | ctx.exit(1) | ||||
from swh.scheduler.api.server import app, DEFAULT_CONFIG | from swh.scheduler.api import server | ||||
conf = config.read(config_path, DEFAULT_CONFIG) | server.app.scheduler = ctx.obj['scheduler'] | ||||
if ctx.obj['config']['args']: | server.app.config.update(ctx.obj['config']) | ||||
conf['scheduler']['args'].update(ctx.obj['config']['args']) | |||||
app.config.update(conf) | |||||
if debug is None: | if debug is None: | ||||
debug = ctx.obj['loglevel'] <= logging.DEBUG | debug = ctx.obj['loglevel'] <= logging.DEBUG | ||||
server.app.run(host, port=port, debug=bool(debug)) | |||||
app.run(host, port=port, debug=bool(debug)) | |||||
@cli.group('task-type') | @cli.group('task-type') | ||||
@click.pass_context | @click.pass_context | ||||
def task_type(ctx): | def task_type(ctx): | ||||
"""Manipulate task types.""" | """Manipulate task types.""" | ||||
pass | pass | ||||
▲ Show 20 Lines • Show All 68 Lines • Show Last 20 Lines |