Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/cli/task.py
- This file was copied from swh/scheduler/cli/__init__.py.
# Copyright (C) 2016-2019 The Software Heritage developers | # Copyright (C) 2016-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import arrow | import datetime | ||||
import click | |||||
import csv | |||||
import itertools | |||||
import json | import json | ||||
import itertools | |||||
import locale | import locale | ||||
import logging | import logging | ||||
import time | |||||
import datetime | import arrow | ||||
import csv | |||||
import click | |||||
from . import cli | |||||
locale.setlocale(locale.LC_ALL, '') | locale.setlocale(locale.LC_ALL, '') | ||||
ARROW_LOCALE = locale.getlocale(locale.LC_TIME)[0] | ARROW_LOCALE = locale.getlocale(locale.LC_TIME)[0] | ||||
class DateTimeType(click.ParamType): | class DateTimeType(click.ParamType): | ||||
name = 'time and date' | name = 'time and date' | ||||
▲ Show 20 Lines • Show All 111 Lines • ▼ Show 20 Lines | lines += [ | ||||
pretty_print_list(task['arguments']['args'], indent=4), | pretty_print_list(task['arguments']['args'], indent=4), | ||||
click.style(' Keyword args:\n', bold=True), | click.style(' Keyword args:\n', bold=True), | ||||
pretty_print_dict(task['arguments']['kwargs'], indent=4), | pretty_print_dict(task['arguments']['kwargs'], indent=4), | ||||
] | ] | ||||
return ''.join(lines) | return ''.join(lines) | ||||
@click.group(context_settings=CONTEXT_SETTINGS) | |||||
@click.option('--config-file', '-C', default=None, | |||||
type=click.Path(exists=True, dir_okay=False,), | |||||
help="Configuration file.") | |||||
@click.option('--database', '-d', default=None, | |||||
help="Scheduling database DSN (imply cls is 'local')") | |||||
@click.option('--url', '-u', default=None, | |||||
help="Scheduler's url access (imply cls is 'remote')") | |||||
@click.option('--no-stdout', is_flag=True, default=False, | |||||
help="Do NOT output logs on the console") | |||||
@click.pass_context | |||||
def cli(ctx, config_file, database, url, no_stdout): | |||||
"""Scheduler CLI interface. | |||||
Default to use the the local scheduler instance (plugged to the | |||||
main scheduler db). | |||||
""" | |||||
from swh.core import config | |||||
from swh.scheduler.celery_backend.config import setup_log_handler | |||||
from swh.scheduler import get_scheduler, DEFAULT_CONFIG | |||||
setup_log_handler( | |||||
loglevel=ctx.obj['log_level'], colorize=False, | |||||
format='[%(levelname)s] %(name)s -- %(message)s', | |||||
log_console=not no_stdout) | |||||
ctx.ensure_object(dict) | |||||
logger = logging.getLogger(__name__) | |||||
scheduler = None | |||||
conf = config.read(config_file, DEFAULT_CONFIG) | |||||
if 'scheduler' not in conf: | |||||
raise ValueError("missing 'scheduler' configuration") | |||||
if database: | |||||
conf['scheduler']['cls'] = 'local' | |||||
conf['scheduler']['args']['db'] = database | |||||
elif url: | |||||
conf['scheduler']['cls'] = 'remote' | |||||
conf['scheduler']['args'] = {'url': url} | |||||
sched_conf = conf['scheduler'] | |||||
try: | |||||
logger.debug('Instanciating scheduler with %s' % ( | |||||
sched_conf)) | |||||
scheduler = get_scheduler(**sched_conf) | |||||
except ValueError: | |||||
# it's the subcommand to decide whether not having a proper | |||||
# scheduler instance is a problem. | |||||
pass | |||||
ctx.obj['scheduler'] = scheduler | |||||
ctx.obj['config'] = conf | |||||
@cli.group('task') | @cli.group('task') | ||||
@click.pass_context | @click.pass_context | ||||
def task(ctx): | def task(ctx): | ||||
"""Manipulate tasks.""" | """Manipulate tasks.""" | ||||
pass | pass | ||||
@task.command('schedule') | @task.command('schedule') | ||||
▲ Show 20 Lines • Show All 405 Lines • ▼ Show 20 Lines | if cleanup: | ||||
if dry_run: # no clean up | if dry_run: # no clean up | ||||
continue | continue | ||||
ctx.obj['scheduler'].delete_archived_tasks(task_ids) | ctx.obj['scheduler'].delete_archived_tasks(task_ids) | ||||
else: | else: | ||||
for task_ids in grouper(gen, n=batch_index): | for task_ids in grouper(gen, n=batch_index): | ||||
task_ids = list(task_ids) | task_ids = list(task_ids) | ||||
log.info('Indexed %s tasks: [%s, ...]' % ( | log.info('Indexed %s tasks: [%s, ...]' % ( | ||||
len(task_ids), task_ids[0])) | len(task_ids), task_ids[0])) | ||||
@cli.command('runner') | |||||
@click.option('--period', '-p', default=0, | |||||
help=('Period (in s) at witch pending tasks are checked and ' | |||||
'executed. Set to 0 (default) for a one shot.')) | |||||
@click.pass_context | |||||
def runner(ctx, period): | |||||
"""Starts a swh-scheduler runner service. | |||||
This process is responsible for checking for ready-to-run tasks and | |||||
schedule them.""" | |||||
from swh.scheduler.celery_backend.runner import run_ready_tasks | |||||
from swh.scheduler.celery_backend.config import build_app | |||||
app = build_app(ctx.obj['config'].get('celery')) | |||||
app.set_current() | |||||
logger = logging.getLogger(__name__ + '.runner') | |||||
scheduler = ctx.obj['scheduler'] | |||||
logger.debug('Scheduler %s' % scheduler) | |||||
try: | |||||
while True: | |||||
logger.debug('Run ready tasks') | |||||
try: | |||||
ntasks = len(run_ready_tasks(scheduler, app)) | |||||
if ntasks: | |||||
logger.info('Scheduled %s tasks', ntasks) | |||||
except Exception: | |||||
logger.exception('Unexpected error in run_ready_tasks()') | |||||
if not period: | |||||
break | |||||
time.sleep(period) | |||||
except KeyboardInterrupt: | |||||
ctx.exit(0) | |||||
@cli.command('listener') | |||||
@click.pass_context | |||||
def listener(ctx): | |||||
"""Starts a swh-scheduler listener service. | |||||
This service is responsible for listening at task lifecycle events and | |||||
handle their workflow status in the database.""" | |||||
scheduler = ctx.obj['scheduler'] | |||||
if not scheduler: | |||||
raise ValueError('Scheduler class (local/remote) must be instantiated') | |||||
from swh.scheduler.celery_backend.config import build_app | |||||
app = build_app(ctx.obj['config'].get('celery')) | |||||
app.set_current() | |||||
from swh.scheduler.celery_backend.listener import event_monitor | |||||
event_monitor(app, backend=scheduler) | |||||
@cli.command('api-server') | |||||
@click.option('--host', default='0.0.0.0', | |||||
help="Host to run the scheduler server api") | |||||
@click.option('--port', default=5008, type=click.INT, | |||||
help="Binding port of the server") | |||||
@click.option('--debug/--nodebug', default=None, | |||||
help=("Indicates if the server should run in debug mode. " | |||||
"Defaults to True if log-level is DEBUG, False otherwise.") | |||||
) | |||||
@click.pass_context | |||||
def api_server(ctx, host, port, debug): | |||||
"""Starts a swh-scheduler API HTTP server. | |||||
""" | |||||
if ctx.obj['config']['scheduler']['cls'] == 'remote': | |||||
click.echo("The API server can only be started with a 'local' " | |||||
"configuration", err=True) | |||||
ctx.exit(1) | |||||
from swh.scheduler.api import server | |||||
server.app.config.update(ctx.obj['config']) | |||||
if debug is None: | |||||
debug = ctx.obj['log_level'] <= logging.DEBUG | |||||
server.app.run(host, port=port, debug=bool(debug)) | |||||
@cli.group('task-type') | |||||
@click.pass_context | |||||
def task_type(ctx): | |||||
"""Manipulate task types.""" | |||||
pass | |||||
@task_type.command('list') | |||||
@click.option('--verbose', '-v', is_flag=True, default=False, | |||||
help='Verbose mode') | |||||
@click.option('--task_type', '-t', multiple=True, default=None, | |||||
help='List task types of given type') | |||||
@click.option('--task_name', '-n', multiple=True, default=None, | |||||
help='List task types of given backend task name') | |||||
@click.pass_context | |||||
def list_task_types(ctx, verbose, task_type, task_name): | |||||
click.echo("Known task types:") | |||||
if verbose: | |||||
tmpl = click.style('{type}: ', bold=True) + '''{backend_name} | |||||
{description} | |||||
interval: {default_interval} [{min_interval}, {max_interval}] | |||||
backoff_factor: {backoff_factor} | |||||
max_queue_length: {max_queue_length} | |||||
num_retries: {num_retries} | |||||
retry_delay: {retry_delay} | |||||
''' | |||||
else: | |||||
tmpl = '{type}:\n {description}' | |||||
for tasktype in sorted(ctx.obj['scheduler'].get_task_types(), | |||||
key=lambda x: x['type']): | |||||
if task_type and tasktype['type'] not in task_type: | |||||
continue | |||||
if task_name and tasktype['backend_name'] not in task_name: | |||||
continue | |||||
click.echo(tmpl.format(**tasktype)) | |||||
@task_type.command('add') | |||||
@click.argument('type', required=1) | |||||
@click.argument('task-name', required=1) | |||||
@click.argument('description', required=1) | |||||
@click.option('--default-interval', '-i', default='90 days', | |||||
help='Default interval ("90 days" by default)') | |||||
@click.option('--min-interval', default=None, | |||||
help='Minimum interval (default interval if not set)') | |||||
@click.option('--max-interval', '-i', default=None, | |||||
help='Maximal interval (default interval if not set)') | |||||
@click.option('--backoff-factor', '-f', type=float, default=1, | |||||
help='Backoff factor') | |||||
@click.pass_context | |||||
def add_task_type(ctx, type, task_name, description, | |||||
default_interval, min_interval, max_interval, | |||||
backoff_factor): | |||||
"""Create a new task type | |||||
""" | |||||
scheduler = ctx.obj['scheduler'] | |||||
if not scheduler: | |||||
raise ValueError('Scheduler class (local/remote) must be instantiated') | |||||
task_type = dict( | |||||
type=type, | |||||
backend_name=task_name, | |||||
description=description, | |||||
default_interval=default_interval, | |||||
min_interval=min_interval, | |||||
max_interval=max_interval, | |||||
backoff_factor=backoff_factor, | |||||
max_queue_length=None, | |||||
num_retries=None, | |||||
retry_delay=None, | |||||
) | |||||
scheduler.create_task_type(task_type) | |||||
click.echo('OK') | |||||
@cli.command('updater') | |||||
@click.option('--verbose/--no-verbose', '-v', default=False, | |||||
help='Verbose mode') | |||||
@click.pass_context | |||||
def updater(ctx, verbose): | |||||
"""Insert tasks in the scheduler from the scheduler-updater's events | |||||
""" | |||||
from swh.scheduler.updater.writer import UpdaterWriter | |||||
UpdaterWriter(**ctx.obj['config']).run() | |||||
@cli.command('ghtorrent') | |||||
@click.option('--verbose/--no-verbose', '-v', default=False, | |||||
help='Verbose mode') | |||||
@click.pass_context | |||||
def ghtorrent(ctx, verbose): | |||||
"""Consume events from ghtorrent and write them to cache. | |||||
""" | |||||
from swh.scheduler.updater.ghtorrent import GHTorrentConsumer | |||||
from swh.scheduler.updater.backend import SchedulerUpdaterBackend | |||||
ght_config = ctx.obj['config'].get('ghtorrent', {}) | |||||
back_config = ctx.obj['config'].get('scheduler_updater', {}) | |||||
backend = SchedulerUpdaterBackend(**back_config) | |||||
GHTorrentConsumer(backend, **ght_config).run() | |||||
def main(): | |||||
return cli(auto_envvar_prefix='SWH_SCHEDULER') | |||||
if __name__ == '__main__': | |||||
main() |