diff --git a/swh/scheduler/api/client.py b/swh/scheduler/api/client.py --- a/swh/scheduler/api/client.py +++ b/swh/scheduler/api/client.py @@ -50,6 +50,9 @@ def get_tasks(self, task_ids): return self.post('get_tasks', {'task_ids': task_ids}) + def get_task_runs(self, task_ids): + return self.post('get_task_runs', {'task_ids': task_ids}) + def search_tasks(self, task_id=None, task_type=None, status=None, priority=None, policy=None, before=None, after=None, limit=None): diff --git a/swh/scheduler/api/server.py b/swh/scheduler/api/server.py --- a/swh/scheduler/api/server.py +++ b/swh/scheduler/api/server.py @@ -100,6 +100,13 @@ return get_sched().get_tasks(**decode_request(request)) +@app.route('/get_task_runs', methods=['POST']) +@negotiate(MsgpackFormatter) +@negotiate(JSONFormatter) +def get_task_runs(): + return get_sched().get_task_runs(**decode_request(request)) + + @app.route('/search_tasks', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -454,3 +454,56 @@ cur.execute( "select * from swh_scheduler_delete_archived_tasks(%s, %s)", (_task_ids, _task_run_ids)) + + task_run_keys = ['id', 'task', 'backend_id', 'scheduled', + 'started', 'ended', 'metadata', 'status', ] + + @db_transaction() + def get_task_runs(self, task_ids=None, limit=None, db=None, cur=None): + """Search task run for a task id""" + where = [] + args = [] + where = [] + args = [] + + if task_ids: + if isinstance(task_ids, (str, int)): + where.append('task = %s') + else: + where.append('task in %s') + task_ids = tuple(task_ids) + args.append(task_ids) + else: + return {} + + query = 'select * from task_run where ' + ' and '.join(where) + if limit: + query += ' limit %s :: bigint' + args.append(limit) + cur.execute(query, args) + return cur.fetchall() + + @db_transaction() + def get_task_run_from_backend(self, backend_ids=None, limit=None, + db=None, cur=None): + """Search task run for a task id""" + where = [] + args = [] + where = [] + args = [] + + if backend_ids: + if isinstance(backend_ids, (str, int)): + where.append('backend_id = %s') + else: + where.append('backend_id in %s') + backend_ids = tuple(backend_ids) + args.append(backend_ids) + else: + return {} + query = 'select * from task_run where ' + ' and '.join(where) + if limit: + query += ' limit %s :: bigint' + args.append(limit) + cur.execute(query, args) + return cur.fetchall() diff --git a/swh/scheduler/cli.py b/swh/scheduler/cli.py --- a/swh/scheduler/cli.py +++ b/swh/scheduler/cli.py @@ -11,6 +11,7 @@ import locale import logging import time +import datetime from swh.core import utils, config from . import compute_nb_tasks_from @@ -36,18 +37,35 @@ CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) -def pretty_print_list(list, indent): +def format_dict(d): + ret = {} + for k, v in d.items(): + if isinstance(v, (arrow.Arrow, datetime.date, datetime.datetime)): + v = arrow.get(v).format() + elif isinstance(v, dict): + v = format_dict(v) + ret[k] = v + return ret + + +def pretty_print_list(list, indent=0): """Pretty-print a list""" return ''.join('%s%s\n' % (' ' * indent, item) for item in list) -def pretty_print_dict(dict, indent): +def pretty_print_dict(dict, indent=0): """Pretty-print a list""" return ''.join('%s%s: %s\n' % (' ' * indent, click.style(key, bold=True), value) for key, value in dict.items()) +def pretty_print_run(run, indent=4): + fmt = ('{indent}{backend_id} [{status}]\n' + '{indent} scheduled: {scheduled} [{started}:{ended}]') + return fmt.format(indent=' '*indent, **format_dict(run)) + + def pretty_print_task(task, full=False): """Pretty-print a task @@ -320,9 +338,11 @@ @click.option('--after', '-a', required=False, type=DATETIME, metavar='DATETIME', help='Limit to tasks supposed to run after the given date.') +@click.option('--runs', '-r', is_flag=True, default=False, + help='List past executions of the task.') @click.pass_context def list_tasks(ctx, task_id, task_type, limit, status, policy, priority, - before, after): + before, after, runs): """List tasks. """ scheduler = ctx.obj['scheduler'] @@ -350,11 +370,19 @@ status=status, priority=priority, policy=policy, before=before, after=after, limit=limit) + if runs: + runs = {t['id']: [] for t in tasks} + for r in scheduler.get_task_runs([task['id'] for task in tasks]): + runs[r['task']].append(r) output.append('Found %d tasks\n' % ( len(tasks))) for task in tasks: output.append(pretty_print_task(task, full=True)) + if runs and runs[task['id']]: + output.append(click.style(' Executions:', bold=True)) + for run in runs[task['id']]: + output.append(pretty_print_run(run, indent=4)) click.echo('\n'.join(output))