diff --git a/swh/scheduler/celery_backend/config.py b/swh/scheduler/celery_backend/config.py --- a/swh/scheduler/celery_backend/config.py +++ b/swh/scheduler/celery_backend/config.py @@ -78,6 +78,7 @@ logging.getLogger('swh').setLevel(loglevel) # get_task_logger makes the swh tasks loggers children of celery.task logging.getLogger('celery.task').setLevel(loglevel) + return loglevel @celeryd_after_setup.connect diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py --- a/swh/scheduler/celery_backend/runner.py +++ b/swh/scheduler/celery_backend/runner.py @@ -99,7 +99,7 @@ all_backend_tasks.extend(backend_tasks) -if __name__ == '__main__': +def main(): for module in main_app.conf.CELERY_IMPORTS: __import__(module) @@ -109,3 +109,7 @@ except Exception: main_backend.rollback() raise + + +if __name__ == '__main__': + main() diff --git a/swh/scheduler/cli.py b/swh/scheduler/cli.py --- a/swh/scheduler/cli.py +++ b/swh/scheduler/cli.py @@ -10,10 +10,12 @@ import json import locale import logging +import time -from swh.core import utils +from swh.core import utils, config from . import compute_nb_tasks_from from .backend_es import SWHElasticSearchClient +from . import get_scheduler locale.setlocale(locale.LC_ALL, '') @@ -72,42 +74,53 @@ if not value or ctx.resilient_parsing: return click.echo("Known task types:") - for tasktype in ctx.obj.get_task_types(): + for tasktype in ctx.obj['scheduler'].get_task_types(): click.echo('{type}:\n {description}'.format(**tasktype)) ctx.exit() @click.group(context_settings=CONTEXT_SETTINGS) @click.option('--cls', '-c', default='local', + type=click.Choice(['local', 'remote']), help="Scheduler's class, default to 'local'") @click.option('--database', '-d', - help='Scheduling database DSN') + help="Scheduling database DSN (if cls is 'local')") @click.option('--url', '-u', - help="(Optional) Scheduler's url access") + help="Scheduler's url access (if cls is 'remote')") +@click.option('--log-level', '-l', default='INFO', + type=click.Choice(logging._nameToLevel.keys()), + help="Log level (default to INFO)") @click.pass_context -def cli(ctx, cls, database, url): +def cli(ctx, cls, database, url, log_level): """Software Heritage Scheduler CLI interface Default to use the the local scheduler instance (plugged to the main scheduler db). """ + from swh.scheduler.celery_backend.config import setup_log_handler + log_level = setup_log_handler( + loglevel=log_level, colorize=False, + format='[%(levelname)s] %(name)s -- %(message)s') + + ctx.ensure_object(dict) + scheduler = None override_config = {} - from . import get_scheduler - if cls == 'local': - if database: + try: + if cls == 'local' and database: override_config = {'scheduling_db': database} - scheduler = get_scheduler(cls, args=override_config) - elif cls == 'remote': - if url: + elif cls == 'remote' and url: override_config = {'url': url} scheduler = get_scheduler(cls, args=override_config) + except Exception: + # it's the subcommand to decide whether not having a proper + # scheduler instance is a problem. + pass - if not scheduler: - raise ValueError('Scheduler class (local/remote) must be instantiated') - - ctx.obj = scheduler + ctx.obj['scheduler'] = scheduler + ctx.obj['config'] = {'cls': cls, 'args': override_config} + ctx.obj['loglevel'] = log_level @cli.group('task') @@ -159,6 +172,9 @@ """ tasks = [] now = arrow.utcnow() + scheduler = ctx.obj['scheduler'] + if not scheduler: + raise ValueError('Scheduler class (local/remote) must be instantiated') reader = csv.reader(file, delimiter=delimiter) for line in reader: @@ -173,7 +189,7 @@ None, None) tasks.append(task) - created = ctx.obj.create_tasks(tasks) + created = scheduler.create_tasks(tasks) output = [ 'Created %d tasks\n' % len(created), @@ -203,6 +219,10 @@ task add swh-lister-debian --policy=oneshot distribution=stretch """ + scheduler = ctx.obj['scheduler'] + if not scheduler: + raise ValueError('Scheduler class (local/remote) must be instantiated') + now = arrow.utcnow() args = [x for x in options if '=' not in x] @@ -216,7 +236,7 @@ 'next_run': DATETIME.convert(next_run or now, None, None), } - created = ctx.obj.create_tasks([task]) + created = scheduler.create_tasks([task]) output = [ 'Created %d tasks\n' % len(created), @@ -240,11 +260,15 @@ You can override the number of tasks to fetch """ + scheduler = ctx.obj['scheduler'] + if not scheduler: + raise ValueError('Scheduler class (local/remote) must be instantiated') + num_tasks, num_tasks_priority = compute_nb_tasks_from(limit) output = [] for task_type in task_types: - pending = ctx.obj.peek_ready_tasks( + pending = scheduler.peek_ready_tasks( task_type, timestamp=before, num_tasks=num_tasks, num_tasks_priority=num_tasks_priority) output.append('Found %d %s tasks\n' % ( @@ -287,6 +311,10 @@ With --dry-run flag set (default), only list those. """ + scheduler = ctx.obj['scheduler'] + if not scheduler: + raise ValueError('Scheduler class (local/remote) must be instantiated') + es_client = SWHElasticSearchClient() logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO) log = logging.getLogger('swh.scheduler.cli.archive') @@ -321,8 +349,8 @@ date = data['scheduled'] return es_client.compute_index_name(date.year, date.month) - def index_data(before, last_id, batch_index, backend=ctx.obj): - tasks_in = backend.filter_task_to_archive( + def index_data(before, last_id, batch_index): + tasks_in = scheduler.filter_task_to_archive( after, before, last_id=last_id, limit=batch_index) for index_name, tasks_group in itertools.groupby( tasks_in, key=group_by_index_name): @@ -344,7 +372,7 @@ len(task_ids), task_ids[0])) if dry_run: # no clean up continue - ctx.obj.delete_archived_tasks(task_ids) + ctx.obj['scheduler'].delete_archived_tasks(task_ids) else: for task_ids in utils.grouper(gen, n=batch_index): task_ids = list(task_ids) @@ -352,11 +380,78 @@ len(task_ids), task_ids[0])) -@cli.group('task-run') +@cli.command('runner') +@click.option('--period', '-p', default=0, + help=('Period (in s) at witch pending tasks are checked and ' + 'executed. Set to 0 (default) for a one shot.')) @click.pass_context -def task_run(ctx): - """Manipulate task runs.""" - pass +def runner(ctx, period): + """Starts a swh-scheduler runner service. + + This process is responsible for checking for ready-to-run tasks and + schedule them.""" + from swh.scheduler.celery_backend.runner import run_ready_tasks + from swh.scheduler.celery_backend.config import app + + scheduler = ctx.obj['scheduler'] + try: + while True: + try: + run_ready_tasks(scheduler, app) + except Exception: + scheduler.rollback() + raise + if not period: + break + time.sleep(period) + except KeyboardInterrupt: + ctx.exit(0) + + +@cli.command('listener') +@click.pass_context +def listerner(ctx): + """Starts a swh-scheduler listener service. + + This service is responsible for listening at task lifecycle events and + handle their workflow status in the database.""" + scheduler = ctx.obj['scheduler'] + if not scheduler: + raise ValueError('Scheduler class (local/remote) must be instantiated') + + from swh.scheduler.celery_backend.listener import ( + event_monitor, main_app) + event_monitor(main_app, backend=scheduler) + + +@cli.command('api-server') +@click.argument('config-path', required=1) +@click.option('--host', default='0.0.0.0', + help="Host to run the scheduler server api") +@click.option('--port', default=5008, type=click.INT, + help="Binding port of the server") +@click.option('--debug/--nodebug', default=None, + help=("Indicates if the server should run in debug mode. " + "Defaults to True if log-level is DEBUG, False otherwise.") + ) +@click.pass_context +def api_server(ctx, config_path, host, port, debug): + """Starts a swh-scheduler API HTTP server. + """ + if ctx.obj['config']['cls'] == 'remote': + click.echo("The API server can only be started with a 'local' " + "configuration", err=True) + ctx.exit(1) + + from swh.scheduler.api.server import app, DEFAULT_CONFIG + conf = config.read(config_path, DEFAULT_CONFIG) + if ctx.obj['config']['args']: + conf['scheduler']['args'].update(ctx.obj['config']['args']) + app.config.update(conf) + if debug is None: + debug = ctx.obj['loglevel'] <= logging.DEBUG + + app.run(host, port=port, debug=bool(debug)) if __name__ == '__main__':