Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/cli.py
Show First 20 Lines • Show All 66 Lines • ▼ Show 20 Lines | def pretty_print_task(task): | ||||
return ''.join(lines) | return ''.join(lines) | ||||
def list_task_types(ctx, param, value): | def list_task_types(ctx, param, value): | ||||
if not value or ctx.resilient_parsing: | if not value or ctx.resilient_parsing: | ||||
return | return | ||||
click.echo("Known task types:") | click.echo("Known task types:") | ||||
for tasktype in ctx.obj.get_task_types(): | for tasktype in ctx.obj['scheduler'].get_task_types(): | ||||
click.echo('{type}:\n {description}'.format(**tasktype)) | click.echo('{type}:\n {description}'.format(**tasktype)) | ||||
ctx.exit() | ctx.exit() | ||||
@click.group(context_settings=CONTEXT_SETTINGS) | @click.group(context_settings=CONTEXT_SETTINGS) | ||||
@click.option('--cls', '-c', default='local', | @click.option('--cls', '-c', default='local', | ||||
help="Scheduler's class, default to 'local'") | help="Scheduler's class, default to 'local'") | ||||
@click.option('--database', '-d', | @click.option('--database', '-d', | ||||
help='Scheduling database DSN') | help='Scheduling database DSN') | ||||
@click.option('--url', '-u', | @click.option('--url', '-u', | ||||
help="(Optional) Scheduler's url access") | help="(Optional) Scheduler's url access") | ||||
@click.pass_context | @click.pass_context | ||||
def cli(ctx, cls, database, url): | def cli(ctx, cls, database, url): | ||||
"""Software Heritage Scheduler CLI interface | """Software Heritage Scheduler CLI interface | ||||
Default to use the the local scheduler instance (plugged to the | Default to use the the local scheduler instance (plugged to the | ||||
main scheduler db). | main scheduler db). | ||||
""" | """ | ||||
ctx.ensure_object(dict) | |||||
scheduler = None | scheduler = None | ||||
override_config = {} | override_config = {} | ||||
from . import get_scheduler | from . import get_scheduler | ||||
if cls == 'local': | if cls == 'local': | ||||
if database: | if database: | ||||
override_config = {'scheduling_db': database} | override_config = {'scheduling_db': database} | ||||
scheduler = get_scheduler(cls, args=override_config) | scheduler = get_scheduler(cls, args=override_config) | ||||
elif cls == 'remote': | elif cls == 'remote': | ||||
if url: | if url: | ||||
override_config = {'url': url} | override_config = {'url': url} | ||||
scheduler = get_scheduler(cls, args=override_config) | scheduler = get_scheduler(cls, args=override_config) | ||||
if not scheduler: | if not scheduler: | ||||
raise ValueError('Scheduler class (local/remote) must be instantiated') | raise ValueError('Scheduler class (local/remote) must be instantiated') | ||||
ctx.obj = scheduler | ctx.obj['scheduler'] = scheduler | ||||
@cli.group('task') | @cli.group('task') | ||||
@click.option('--list-types', '-l', is_flag=True, default=False, is_eager=True, | @click.option('--list-types', '-l', is_flag=True, default=False, is_eager=True, | ||||
expose_value=False, callback=list_task_types) | expose_value=False, callback=list_task_types) | ||||
@click.pass_context | @click.pass_context | ||||
def task(ctx): | def task(ctx): | ||||
"""Manipulate tasks.""" | """Manipulate tasks.""" | ||||
▲ Show 20 Lines • Show All 49 Lines • ▼ Show 20 Lines | for line in reader: | ||||
task['arguments'] = { | task['arguments'] = { | ||||
'args': args, | 'args': args, | ||||
'kwargs': kwargs, | 'kwargs': kwargs, | ||||
} | } | ||||
task['next_run'] = DATETIME.convert(task.get('next_run', now), | task['next_run'] = DATETIME.convert(task.get('next_run', now), | ||||
None, None) | None, None) | ||||
tasks.append(task) | tasks.append(task) | ||||
created = ctx.obj.create_tasks(tasks) | created = ctx.obj['scheduler'].create_tasks(tasks) | ||||
output = [ | output = [ | ||||
'Created %d tasks\n' % len(created), | 'Created %d tasks\n' % len(created), | ||||
] | ] | ||||
for task in created: | for task in created: | ||||
output.append(pretty_print_task(task)) | output.append(pretty_print_task(task)) | ||||
click.echo_via_pager('\n'.join(output)) | click.echo_via_pager('\n'.join(output)) | ||||
Show All 26 Lines | task = {'type': type, | ||||
'policy': policy, | 'policy': policy, | ||||
'arguments': { | 'arguments': { | ||||
'args': args, | 'args': args, | ||||
'kwargs': kw, | 'kwargs': kw, | ||||
}, | }, | ||||
'next_run': DATETIME.convert(next_run or now, | 'next_run': DATETIME.convert(next_run or now, | ||||
None, None), | None, None), | ||||
} | } | ||||
created = ctx.obj.create_tasks([task]) | created = ctx.obj['scheduler'].create_tasks([task]) | ||||
output = [ | output = [ | ||||
'Created %d tasks\n' % len(created), | 'Created %d tasks\n' % len(created), | ||||
] | ] | ||||
for task in created: | for task in created: | ||||
output.append(pretty_print_task(task)) | output.append(pretty_print_task(task)) | ||||
click.echo('\n'.join(output)) | click.echo('\n'.join(output)) | ||||
Show All 11 Lines | def list_pending_tasks(ctx, task_types, limit, before): | ||||
You can override the number of tasks to fetch | You can override the number of tasks to fetch | ||||
""" | """ | ||||
num_tasks, num_tasks_priority = compute_nb_tasks_from(limit) | num_tasks, num_tasks_priority = compute_nb_tasks_from(limit) | ||||
output = [] | output = [] | ||||
for task_type in task_types: | for task_type in task_types: | ||||
pending = ctx.obj.peek_ready_tasks( | pending = ctx.obj['scheduler'].peek_ready_tasks( | ||||
task_type, timestamp=before, | task_type, timestamp=before, | ||||
num_tasks=num_tasks, num_tasks_priority=num_tasks_priority) | num_tasks=num_tasks, num_tasks_priority=num_tasks_priority) | ||||
output.append('Found %d %s tasks\n' % ( | output.append('Found %d %s tasks\n' % ( | ||||
len(pending), task_type)) | len(pending), task_type)) | ||||
for task in pending: | for task in pending: | ||||
output.append(pretty_print_task(task)) | output.append(pretty_print_task(task)) | ||||
▲ Show 20 Lines • Show All 60 Lines • ▼ Show 20 Lines | def group_by_index_name(data, es_client=es_client): | ||||
status. | status. | ||||
""" | """ | ||||
date = data.get('started') | date = data.get('started') | ||||
if not date: | if not date: | ||||
date = data['scheduled'] | date = data['scheduled'] | ||||
return es_client.compute_index_name(date.year, date.month) | return es_client.compute_index_name(date.year, date.month) | ||||
def index_data(before, last_id, batch_index, backend=ctx.obj): | def index_data(before, last_id, batch_index): | ||||
backend = ctx.obj['scheduler'] | |||||
tasks_in = backend.filter_task_to_archive( | tasks_in = backend.filter_task_to_archive( | ||||
after, before, last_id=last_id, limit=batch_index) | after, before, last_id=last_id, limit=batch_index) | ||||
for index_name, tasks_group in itertools.groupby( | for index_name, tasks_group in itertools.groupby( | ||||
tasks_in, key=group_by_index_name): | tasks_in, key=group_by_index_name): | ||||
log.debug('Index tasks to %s' % index_name) | log.debug('Index tasks to %s' % index_name) | ||||
if dry_run: | if dry_run: | ||||
for task in tasks_group: | for task in tasks_group: | ||||
yield task | yield task | ||||
continue | continue | ||||
yield from es_client.streaming_bulk( | yield from es_client.streaming_bulk( | ||||
index_name, tasks_group, source=['task_id', 'task_run_id'], | index_name, tasks_group, source=['task_id', 'task_run_id'], | ||||
chunk_size=bulk_index, log=log) | chunk_size=bulk_index, log=log) | ||||
gen = index_data(before, last_id=start_from, batch_index=batch_index) | gen = index_data(before, last_id=start_from, batch_index=batch_index) | ||||
if cleanup: | if cleanup: | ||||
for task_ids in utils.grouper(gen, n=batch_clean): | for task_ids in utils.grouper(gen, n=batch_clean): | ||||
task_ids = list(task_ids) | task_ids = list(task_ids) | ||||
log.info('Clean up %s tasks: [%s, ...]' % ( | log.info('Clean up %s tasks: [%s, ...]' % ( | ||||
len(task_ids), task_ids[0])) | len(task_ids), task_ids[0])) | ||||
if dry_run: # no clean up | if dry_run: # no clean up | ||||
continue | continue | ||||
ctx.obj.delete_archived_tasks(task_ids) | ctx.obj['scheduler'].delete_archived_tasks(task_ids) | ||||
else: | else: | ||||
for task_ids in utils.grouper(gen, n=batch_index): | for task_ids in utils.grouper(gen, n=batch_index): | ||||
task_ids = list(task_ids) | task_ids = list(task_ids) | ||||
log.info('Indexed %s tasks: [%s, ...]' % ( | log.info('Indexed %s tasks: [%s, ...]' % ( | ||||
len(task_ids), task_ids[0])) | len(task_ids), task_ids[0])) | ||||
@cli.group('task-run') | @cli.group('task-run') | ||||
@click.pass_context | @click.pass_context | ||||
ardumont: which | |||||
def task_run(ctx): | def task_run(ctx): | ||||
"""Manipulate task runs.""" | """Manipulate task runs.""" | ||||
pass | pass | ||||
if __name__ == '__main__': | if __name__ == '__main__': | ||||
cli() | cli() | ||||
Not Done Inline Actionslistener ardumont: listener | |||||
Not Done Inline Actionstypo olasd: typo | |||||
Not Done Inline ActionsSame question as before, do we still need the functions starting the server defined in server.py? [1] ardumont: Same question as before, do we still need the functions starting the server defined in server. | |||||
Done Inline Actionsprobably not, let's remove it douardda: probably not, let's remove it | |||||
Done Inline ActionsIn fact, we can delete the launch function but not the run_from_webserver() one since it's the entry point for prod wsgi servers (gunicorn here) douardda: In fact, we can delete the launch function but not the run_from_webserver() one since it's the… |
which