Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/cli.py
# Copyright (C) 2016-2018 The Software Heritage developers | # Copyright (C) 2016-2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import arrow | import arrow | ||||
import click | import click | ||||
import csv | import csv | ||||
import itertools | import itertools | ||||
import json | import json | ||||
import locale | import locale | ||||
import logging | import logging | ||||
import time | |||||
from swh.core import utils | from swh.core import utils, config | ||||
from . import compute_nb_tasks_from | from . import compute_nb_tasks_from | ||||
from .backend_es import SWHElasticSearchClient | from .backend_es import SWHElasticSearchClient | ||||
from . import get_scheduler | |||||
locale.setlocale(locale.LC_ALL, '') | locale.setlocale(locale.LC_ALL, '') | ||||
ARROW_LOCALE = locale.getlocale(locale.LC_TIME)[0] | ARROW_LOCALE = locale.getlocale(locale.LC_TIME)[0] | ||||
class DateTimeType(click.ParamType): | class DateTimeType(click.ParamType): | ||||
name = 'time and date' | name = 'time and date' | ||||
▲ Show 20 Lines • Show All 42 Lines • ▼ Show 20 Lines | def pretty_print_task(task): | ||||
return ''.join(lines) | return ''.join(lines) | ||||
def list_task_types(ctx, param, value): | def list_task_types(ctx, param, value): | ||||
if not value or ctx.resilient_parsing: | if not value or ctx.resilient_parsing: | ||||
return | return | ||||
click.echo("Known task types:") | click.echo("Known task types:") | ||||
for tasktype in ctx.obj.get_task_types(): | for tasktype in ctx.obj['scheduler'].get_task_types(): | ||||
click.echo('{type}:\n {description}'.format(**tasktype)) | click.echo('{type}:\n {description}'.format(**tasktype)) | ||||
ctx.exit() | ctx.exit() | ||||
@click.group(context_settings=CONTEXT_SETTINGS) | @click.group(context_settings=CONTEXT_SETTINGS) | ||||
@click.option('--cls', '-c', default='local', | @click.option('--cls', '-c', default='local', | ||||
type=click.Choice(['local', 'remote']), | |||||
help="Scheduler's class, default to 'local'") | help="Scheduler's class, default to 'local'") | ||||
@click.option('--database', '-d', | @click.option('--database', '-d', | ||||
help='Scheduling database DSN') | help="Scheduling database DSN (if cls is 'local')") | ||||
@click.option('--url', '-u', | @click.option('--url', '-u', | ||||
help="(Optional) Scheduler's url access") | help="Scheduler's url access (if cls is 'remote')") | ||||
@click.option('--log-level', '-l', default='INFO', | |||||
type=click.Choice(logging._nameToLevel.keys()), | |||||
help="Log level (default to INFO)") | |||||
@click.pass_context | @click.pass_context | ||||
def cli(ctx, cls, database, url): | def cli(ctx, cls, database, url, log_level): | ||||
"""Software Heritage Scheduler CLI interface | """Software Heritage Scheduler CLI interface | ||||
Default to use the the local scheduler instance (plugged to the | Default to use the the local scheduler instance (plugged to the | ||||
main scheduler db). | main scheduler db). | ||||
""" | """ | ||||
from swh.scheduler.celery_backend.config import setup_log_handler | |||||
log_level = setup_log_handler( | |||||
loglevel=log_level, colorize=False, | |||||
format='[%(levelname)s] %(name)s -- %(message)s') | |||||
ctx.ensure_object(dict) | |||||
scheduler = None | scheduler = None | ||||
override_config = {} | override_config = {} | ||||
from . import get_scheduler | try: | ||||
if cls == 'local': | if cls == 'local' and database: | ||||
if database: | |||||
override_config = {'scheduling_db': database} | override_config = {'scheduling_db': database} | ||||
scheduler = get_scheduler(cls, args=override_config) | elif cls == 'remote' and url: | ||||
elif cls == 'remote': | |||||
if url: | |||||
override_config = {'url': url} | override_config = {'url': url} | ||||
scheduler = get_scheduler(cls, args=override_config) | scheduler = get_scheduler(cls, args=override_config) | ||||
except Exception: | |||||
# it's the subcommand to decide whether not having a proper | |||||
# scheduler instance is a problem. | |||||
pass | |||||
if not scheduler: | ctx.obj['scheduler'] = scheduler | ||||
raise ValueError('Scheduler class (local/remote) must be instantiated') | ctx.obj['config'] = {'cls': cls, 'args': override_config} | ||||
ctx.obj['loglevel'] = log_level | |||||
ctx.obj = scheduler | |||||
@cli.group('task') | @cli.group('task') | ||||
@click.option('--list-types', '-l', is_flag=True, default=False, is_eager=True, | @click.option('--list-types', '-l', is_flag=True, default=False, is_eager=True, | ||||
expose_value=False, callback=list_task_types) | expose_value=False, callback=list_task_types) | ||||
@click.pass_context | @click.pass_context | ||||
def task(ctx): | def task(ctx): | ||||
"""Manipulate tasks.""" | """Manipulate tasks.""" | ||||
Show All 35 Lines | cat scheduling-task.txt | \ | ||||
--database 'service=swh-scheduler-dev' \ | --database 'service=swh-scheduler-dev' \ | ||||
task schedule \ | task schedule \ | ||||
--columns type --columns kwargs --columns policy \ | --columns type --columns kwargs --columns policy \ | ||||
--delimiter ';' - | --delimiter ';' - | ||||
""" | """ | ||||
tasks = [] | tasks = [] | ||||
now = arrow.utcnow() | now = arrow.utcnow() | ||||
scheduler = ctx.obj['scheduler'] | |||||
if not scheduler: | |||||
raise ValueError('Scheduler class (local/remote) must be instantiated') | |||||
reader = csv.reader(file, delimiter=delimiter) | reader = csv.reader(file, delimiter=delimiter) | ||||
for line in reader: | for line in reader: | ||||
task = dict(zip(columns, line)) | task = dict(zip(columns, line)) | ||||
args = json.loads(task.pop('args', '[]')) | args = json.loads(task.pop('args', '[]')) | ||||
kwargs = json.loads(task.pop('kwargs', '{}')) | kwargs = json.loads(task.pop('kwargs', '{}')) | ||||
task['arguments'] = { | task['arguments'] = { | ||||
'args': args, | 'args': args, | ||||
'kwargs': kwargs, | 'kwargs': kwargs, | ||||
} | } | ||||
task['next_run'] = DATETIME.convert(task.get('next_run', now), | task['next_run'] = DATETIME.convert(task.get('next_run', now), | ||||
None, None) | None, None) | ||||
tasks.append(task) | tasks.append(task) | ||||
created = ctx.obj.create_tasks(tasks) | created = scheduler.create_tasks(tasks) | ||||
output = [ | output = [ | ||||
'Created %d tasks\n' % len(created), | 'Created %d tasks\n' % len(created), | ||||
] | ] | ||||
for task in created: | for task in created: | ||||
output.append(pretty_print_task(task)) | output.append(pretty_print_task(task)) | ||||
click.echo_via_pager('\n'.join(output)) | click.echo_via_pager('\n'.join(output)) | ||||
Show All 13 Lines | def schedule_task(ctx, type, options, policy, next_run): | ||||
swh-scheduler --database 'service=swh-scheduler' \ | swh-scheduler --database 'service=swh-scheduler' \ | ||||
task add swh-lister-pypi | task add swh-lister-pypi | ||||
swh-scheduler --database 'service=swh-scheduler' \ | swh-scheduler --database 'service=swh-scheduler' \ | ||||
task add swh-lister-debian --policy=oneshot distribution=stretch | task add swh-lister-debian --policy=oneshot distribution=stretch | ||||
""" | """ | ||||
scheduler = ctx.obj['scheduler'] | |||||
if not scheduler: | |||||
raise ValueError('Scheduler class (local/remote) must be instantiated') | |||||
now = arrow.utcnow() | now = arrow.utcnow() | ||||
args = [x for x in options if '=' not in x] | args = [x for x in options if '=' not in x] | ||||
kw = dict(x.split('=', 1) for x in options if '=' in x) | kw = dict(x.split('=', 1) for x in options if '=' in x) | ||||
task = {'type': type, | task = {'type': type, | ||||
'policy': policy, | 'policy': policy, | ||||
'arguments': { | 'arguments': { | ||||
'args': args, | 'args': args, | ||||
'kwargs': kw, | 'kwargs': kw, | ||||
}, | }, | ||||
'next_run': DATETIME.convert(next_run or now, | 'next_run': DATETIME.convert(next_run or now, | ||||
None, None), | None, None), | ||||
} | } | ||||
created = ctx.obj.create_tasks([task]) | created = scheduler.create_tasks([task]) | ||||
output = [ | output = [ | ||||
'Created %d tasks\n' % len(created), | 'Created %d tasks\n' % len(created), | ||||
] | ] | ||||
for task in created: | for task in created: | ||||
output.append(pretty_print_task(task)) | output.append(pretty_print_task(task)) | ||||
click.echo('\n'.join(output)) | click.echo('\n'.join(output)) | ||||
@task.command('list-pending') | @task.command('list-pending') | ||||
@click.argument('task-types', required=True, nargs=-1) | @click.argument('task-types', required=True, nargs=-1) | ||||
@click.option('--limit', '-l', required=False, type=click.INT, | @click.option('--limit', '-l', required=False, type=click.INT, | ||||
help='The maximum number of tasks to fetch') | help='The maximum number of tasks to fetch') | ||||
@click.option('--before', '-b', required=False, type=DATETIME, | @click.option('--before', '-b', required=False, type=DATETIME, | ||||
help='List all jobs supposed to run before the given date') | help='List all jobs supposed to run before the given date') | ||||
@click.pass_context | @click.pass_context | ||||
def list_pending_tasks(ctx, task_types, limit, before): | def list_pending_tasks(ctx, task_types, limit, before): | ||||
"""List the tasks that are going to be run. | """List the tasks that are going to be run. | ||||
You can override the number of tasks to fetch | You can override the number of tasks to fetch | ||||
""" | """ | ||||
scheduler = ctx.obj['scheduler'] | |||||
if not scheduler: | |||||
raise ValueError('Scheduler class (local/remote) must be instantiated') | |||||
num_tasks, num_tasks_priority = compute_nb_tasks_from(limit) | num_tasks, num_tasks_priority = compute_nb_tasks_from(limit) | ||||
output = [] | output = [] | ||||
for task_type in task_types: | for task_type in task_types: | ||||
pending = ctx.obj.peek_ready_tasks( | pending = scheduler.peek_ready_tasks( | ||||
task_type, timestamp=before, | task_type, timestamp=before, | ||||
num_tasks=num_tasks, num_tasks_priority=num_tasks_priority) | num_tasks=num_tasks, num_tasks_priority=num_tasks_priority) | ||||
output.append('Found %d %s tasks\n' % ( | output.append('Found %d %s tasks\n' % ( | ||||
len(pending), task_type)) | len(pending), task_type)) | ||||
for task in pending: | for task in pending: | ||||
output.append(pretty_print_task(task)) | output.append(pretty_print_task(task)) | ||||
Show All 26 Lines | def archive_tasks(ctx, before, after, batch_index, bulk_index, batch_clean, | ||||
dry_run, verbose, cleanup, start_from): | dry_run, verbose, cleanup, start_from): | ||||
"""Archive task/task_run whose (task_type is 'oneshot' and task_status | """Archive task/task_run whose (task_type is 'oneshot' and task_status | ||||
is 'completed') or (task_type is 'recurring' and task_status is | is 'completed') or (task_type is 'recurring' and task_status is | ||||
'disabled'). | 'disabled'). | ||||
With --dry-run flag set (default), only list those. | With --dry-run flag set (default), only list those. | ||||
""" | """ | ||||
scheduler = ctx.obj['scheduler'] | |||||
if not scheduler: | |||||
raise ValueError('Scheduler class (local/remote) must be instantiated') | |||||
es_client = SWHElasticSearchClient() | es_client = SWHElasticSearchClient() | ||||
logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO) | logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO) | ||||
log = logging.getLogger('swh.scheduler.cli.archive') | log = logging.getLogger('swh.scheduler.cli.archive') | ||||
logging.getLogger('urllib3').setLevel(logging.WARN) | logging.getLogger('urllib3').setLevel(logging.WARN) | ||||
logging.getLogger('elasticsearch').setLevel(logging.WARN) | logging.getLogger('elasticsearch').setLevel(logging.WARN) | ||||
if dry_run: | if dry_run: | ||||
log.info('**DRY-RUN** (only reading db)') | log.info('**DRY-RUN** (only reading db)') | ||||
if not cleanup: | if not cleanup: | ||||
Show All 18 Lines | def group_by_index_name(data, es_client=es_client): | ||||
status. | status. | ||||
""" | """ | ||||
date = data.get('started') | date = data.get('started') | ||||
if not date: | if not date: | ||||
date = data['scheduled'] | date = data['scheduled'] | ||||
return es_client.compute_index_name(date.year, date.month) | return es_client.compute_index_name(date.year, date.month) | ||||
def index_data(before, last_id, batch_index, backend=ctx.obj): | def index_data(before, last_id, batch_index): | ||||
tasks_in = backend.filter_task_to_archive( | tasks_in = scheduler.filter_task_to_archive( | ||||
after, before, last_id=last_id, limit=batch_index) | after, before, last_id=last_id, limit=batch_index) | ||||
for index_name, tasks_group in itertools.groupby( | for index_name, tasks_group in itertools.groupby( | ||||
tasks_in, key=group_by_index_name): | tasks_in, key=group_by_index_name): | ||||
log.debug('Index tasks to %s' % index_name) | log.debug('Index tasks to %s' % index_name) | ||||
if dry_run: | if dry_run: | ||||
for task in tasks_group: | for task in tasks_group: | ||||
yield task | yield task | ||||
continue | continue | ||||
yield from es_client.streaming_bulk( | yield from es_client.streaming_bulk( | ||||
index_name, tasks_group, source=['task_id', 'task_run_id'], | index_name, tasks_group, source=['task_id', 'task_run_id'], | ||||
chunk_size=bulk_index, log=log) | chunk_size=bulk_index, log=log) | ||||
gen = index_data(before, last_id=start_from, batch_index=batch_index) | gen = index_data(before, last_id=start_from, batch_index=batch_index) | ||||
if cleanup: | if cleanup: | ||||
for task_ids in utils.grouper(gen, n=batch_clean): | for task_ids in utils.grouper(gen, n=batch_clean): | ||||
task_ids = list(task_ids) | task_ids = list(task_ids) | ||||
log.info('Clean up %s tasks: [%s, ...]' % ( | log.info('Clean up %s tasks: [%s, ...]' % ( | ||||
len(task_ids), task_ids[0])) | len(task_ids), task_ids[0])) | ||||
if dry_run: # no clean up | if dry_run: # no clean up | ||||
continue | continue | ||||
ctx.obj.delete_archived_tasks(task_ids) | ctx.obj['scheduler'].delete_archived_tasks(task_ids) | ||||
else: | else: | ||||
for task_ids in utils.grouper(gen, n=batch_index): | for task_ids in utils.grouper(gen, n=batch_index): | ||||
task_ids = list(task_ids) | task_ids = list(task_ids) | ||||
log.info('Indexed %s tasks: [%s, ...]' % ( | log.info('Indexed %s tasks: [%s, ...]' % ( | ||||
len(task_ids), task_ids[0])) | len(task_ids), task_ids[0])) | ||||
@cli.group('task-run') | @cli.command('runner') | ||||
@click.option('--period', '-p', default=0, | |||||
ardumont: which | |||||
help=('Period (in s) at witch pending tasks are checked and ' | |||||
'executed. Set to 0 (default) for a one shot.')) | |||||
@click.pass_context | @click.pass_context | ||||
def task_run(ctx): | def runner(ctx, period): | ||||
"""Manipulate task runs.""" | """Starts a swh-scheduler runner service. | ||||
pass | |||||
This process is responsible for checking for ready-to-run tasks and | |||||
schedule them.""" | |||||
from swh.scheduler.celery_backend.runner import run_ready_tasks | |||||
from swh.scheduler.celery_backend.config import app | |||||
scheduler = ctx.obj['scheduler'] | |||||
try: | |||||
while True: | |||||
try: | |||||
run_ready_tasks(scheduler, app) | |||||
except Exception: | |||||
scheduler.rollback() | |||||
raise | |||||
if not period: | |||||
break | |||||
time.sleep(period) | |||||
except KeyboardInterrupt: | |||||
ctx.exit(0) | |||||
@cli.command('listener') | |||||
@click.pass_context | |||||
Not Done Inline Actionslistener ardumont: listener | |||||
Not Done Inline Actionstypo olasd: typo | |||||
def listener(ctx): | |||||
"""Starts a swh-scheduler listener service. | |||||
This service is responsible for listening at task lifecycle events and | |||||
handle their workflow status in the database.""" | |||||
scheduler = ctx.obj['scheduler'] | |||||
if not scheduler: | |||||
raise ValueError('Scheduler class (local/remote) must be instantiated') | |||||
from swh.scheduler.celery_backend.listener import ( | |||||
event_monitor, main_app) | |||||
event_monitor(main_app, backend=scheduler) | |||||
@cli.command('api-server') | |||||
@click.argument('config-path', required=1) | |||||
@click.option('--host', default='0.0.0.0', | |||||
help="Host to run the scheduler server api") | |||||
@click.option('--port', default=5008, type=click.INT, | |||||
help="Binding port of the server") | |||||
@click.option('--debug/--nodebug', default=None, | |||||
help=("Indicates if the server should run in debug mode. " | |||||
"Defaults to True if log-level is DEBUG, False otherwise.") | |||||
) | |||||
@click.pass_context | |||||
def api_server(ctx, config_path, host, port, debug): | |||||
"""Starts a swh-scheduler API HTTP server. | |||||
""" | |||||
if ctx.obj['config']['cls'] == 'remote': | |||||
click.echo("The API server can only be started with a 'local' " | |||||
"configuration", err=True) | |||||
ctx.exit(1) | |||||
from swh.scheduler.api.server import app, DEFAULT_CONFIG | |||||
conf = config.read(config_path, DEFAULT_CONFIG) | |||||
if ctx.obj['config']['args']: | |||||
conf['scheduler']['args'].update(ctx.obj['config']['args']) | |||||
app.config.update(conf) | |||||
if debug is None: | |||||
debug = ctx.obj['loglevel'] <= logging.DEBUG | |||||
Not Done Inline ActionsSame question as before, do we still need the functions starting the server defined in server.py? [1] ardumont: Same question as before, do we still need the functions starting the server defined in server. | |||||
Done Inline Actionsprobably not, let's remove it douardda: probably not, let's remove it | |||||
Done Inline ActionsIn fact, we can delete the launch function but not the run_from_webserver() one since it's the entry point for prod wsgi servers (gunicorn here) douardda: In fact, we can delete the launch function but not the run_from_webserver() one since it's the… | |||||
app.run(host, port=port, debug=bool(debug)) | |||||
if __name__ == '__main__': | if __name__ == '__main__': | ||||
cli() | cli() |
which