diff --git a/swh/scheduler/api/client.py b/swh/scheduler/api/client.py --- a/swh/scheduler/api/client.py +++ b/swh/scheduler/api/client.py @@ -50,6 +50,10 @@ def get_tasks(self, task_ids): return self.post('get_tasks', {'task_ids': task_ids}) + def get_task_runs(self, task_ids, limit=None): + return self.post( + 'get_task_runs', {'task_ids': task_ids, 'limit': limit}) + def search_tasks(self, task_id=None, task_type=None, status=None, priority=None, policy=None, before=None, after=None, limit=None): diff --git a/swh/scheduler/api/server.py b/swh/scheduler/api/server.py --- a/swh/scheduler/api/server.py +++ b/swh/scheduler/api/server.py @@ -100,6 +100,13 @@ return get_sched().get_tasks(**decode_request(request)) +@app.route('/get_task_runs', methods=['POST']) +@negotiate(MsgpackFormatter) +@negotiate(JSONFormatter) +def get_task_runs(): + return get_sched().get_task_runs(**decode_request(request)) + + @app.route('/search_tasks', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -454,3 +454,29 @@ cur.execute( "select * from swh_scheduler_delete_archived_tasks(%s, %s)", (_task_ids, _task_run_ids)) + + task_run_keys = ['id', 'task', 'backend_id', 'scheduled', + 'started', 'ended', 'metadata', 'status', ] + + @db_transaction() + def get_task_runs(self, task_ids, limit=None, db=None, cur=None): + """Search task run for a task id""" + where = [] + args = [] + + if task_ids: + if isinstance(task_ids, (str, int)): + where.append('task = %s') + else: + where.append('task in %s') + task_ids = tuple(task_ids) + args.append(task_ids) + else: + return () + + query = 'select * from task_run where ' + ' and '.join(where) + if limit: + query += ' limit %s :: bigint' + args.append(limit) + cur.execute(query, args) + return cur.fetchall() diff --git a/swh/scheduler/cli.py b/swh/scheduler/cli.py --- a/swh/scheduler/cli.py +++ b/swh/scheduler/cli.py @@ -11,6 +11,7 @@ import locale import logging import time +import datetime from swh.core import utils, config from . import compute_nb_tasks_from @@ -36,18 +37,35 @@ CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) -def pretty_print_list(list, indent): +def format_dict(d): + ret = {} + for k, v in d.items(): + if isinstance(v, (arrow.Arrow, datetime.date, datetime.datetime)): + v = arrow.get(v).format() + elif isinstance(v, dict): + v = format_dict(v) + ret[k] = v + return ret + + +def pretty_print_list(list, indent=0): """Pretty-print a list""" return ''.join('%s%s\n' % (' ' * indent, item) for item in list) -def pretty_print_dict(dict, indent): +def pretty_print_dict(dict, indent=0): """Pretty-print a list""" return ''.join('%s%s: %s\n' % (' ' * indent, click.style(key, bold=True), value) for key, value in dict.items()) +def pretty_print_run(run, indent=4): + fmt = ('{indent}{backend_id} [{status}]\n' + '{indent} scheduled: {scheduled} [{started}:{ended}]') + return fmt.format(indent=' '*indent, **format_dict(run)) + + def pretty_print_task(task, full=False): """Pretty-print a task @@ -320,9 +338,11 @@ @click.option('--after', '-a', required=False, type=DATETIME, metavar='DATETIME', help='Limit to tasks supposed to run after the given date.') +@click.option('--list-runs', '-r', is_flag=True, default=False, + help='Also list past executions of each task.') @click.pass_context def list_tasks(ctx, task_id, task_type, limit, status, policy, priority, - before, after): + before, after, list_runs): """List tasks. """ scheduler = ctx.obj['scheduler'] @@ -350,11 +370,21 @@ status=status, priority=priority, policy=policy, before=before, after=after, limit=limit) + if list_runs: + runs = {t['id']: [] for t in tasks} + for r in scheduler.get_task_runs([task['id'] for task in tasks]): + runs[r['task']].append(r) + else: + runs = {} output.append('Found %d tasks\n' % ( len(tasks))) for task in tasks: output.append(pretty_print_task(task, full=True)) + if runs.get(task['id']): + output.append(click.style(' Executions:', bold=True)) + for run in runs[task['id']]: + output.append(pretty_print_run(run, indent=4)) click.echo('\n'.join(output)) diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -69,6 +69,12 @@ } +def subdict(d, keys=None, excl=()): + if keys is None: + keys = [k for k in d.keys()] + return {k: d[k] for k in keys if k not in excl} + + @pytest.mark.db class CommonSchedulerTest(SingleDbTestFixture): TEST_DB_NAME = 'softwareheritage-scheduler-test' @@ -476,6 +482,141 @@ self.assertEqual(tasks_count[0], total_tasks - len(_tasks)) self.assertEqual(tasks_run_count[0], total_tasks - len(_tasks)) + def test_get_task_runs_no_task(self): + '''No task exist in the scheduler's db, get_task_runs() should always return an + empty list. + + ''' + self.assertFalse(self.backend.get_task_runs(task_ids=())) + self.assertFalse(self.backend.get_task_runs(task_ids=(1, 2, 3))) + self.assertFalse(self.backend.get_task_runs(task_ids=(1, 2, 3), + limit=10)) + + def test_get_task_runs_no_task_executed(self): + '''No task has been executed yet, get_task_runs() should always return an empty + list. + + ''' + self._create_task_types() + _time = utcnow() + recurring = self._tasks_from_template( + TEMPLATES['git'], _time, 12) + oneshots = self._tasks_from_template( + TEMPLATES['hg'], _time, 12) + self.backend.create_tasks(recurring + oneshots) + + self.assertFalse(self.backend.get_task_runs( + task_ids=())) + self.assertFalse(self.backend.get_task_runs( + task_ids=(1, 2, 3))) + self.assertFalse(self.backend.get_task_runs( + task_ids=(1, 2, 3), limit=10)) + + def test_get_task_runs_with_scheduled(self): + '''Some tasks have been scheduled but not executed yet, get_task_runs() should + not return an empty list. limit should behave as expected. + + ''' + self._create_task_types() + _time = utcnow() + recurring = self._tasks_from_template( + TEMPLATES['git'], _time, 12) + oneshots = self._tasks_from_template( + TEMPLATES['hg'], _time, 12) + total_tasks = len(recurring) + len(oneshots) + pending_tasks = self.backend.create_tasks(recurring + oneshots) + backend_tasks = [{ + 'task': task['id'], + 'backend_id': str(uuid.uuid4()), + 'scheduled': utcnow(), + } for task in pending_tasks] + self.backend.mass_schedule_task_runs(backend_tasks) + + self.assertFalse(self.backend.get_task_runs( + task_ids=[total_tasks + 1])) + + btask = backend_tasks[0] + runs = self.backend.get_task_runs( + task_ids=[btask['task']]) + self.assertEqual(len(runs), 1) + run = runs[0] + + self.assertEqual(subdict(run, excl=('id',)), + {'task': btask['task'], + 'backend_id': btask['backend_id'], + 'scheduled': btask['scheduled'], + 'started': None, + 'ended': None, + 'metadata': None, + 'status': 'scheduled', + }) + + runs = self.backend.get_task_runs( + task_ids=[bt['task'] for bt in backend_tasks], limit=2) + self.assertEqual(len(runs), 2) + + runs = self.backend.get_task_runs( + task_ids=[bt['task'] for bt in backend_tasks]) + self.assertEqual(len(runs), total_tasks) + + keys = ('task', 'backend_id', 'scheduled') + self.assertEqual(sorted([subdict(x, keys) for x in runs], + key=lambda x: x['task']), + backend_tasks) + + def test_get_task_runs_with_executed(self): + '''Some tasks have been executed, get_task_runs() should + not return an empty list. limit should behave as expected. + + ''' + self._create_task_types() + _time = utcnow() + recurring = self._tasks_from_template( + TEMPLATES['git'], _time, 12) + oneshots = self._tasks_from_template( + TEMPLATES['hg'], _time, 12) + pending_tasks = self.backend.create_tasks(recurring + oneshots) + backend_tasks = [{ + 'task': task['id'], + 'backend_id': str(uuid.uuid4()), + 'scheduled': utcnow(), + } for task in pending_tasks] + self.backend.mass_schedule_task_runs(backend_tasks) + + btask = backend_tasks[0] + ts = utcnow() + self.backend.start_task_run(btask['backend_id'], + metadata={'something': 'stupid'}, + timestamp=ts) + runs = self.backend.get_task_runs(task_ids=[btask['task']]) + self.assertEqual(len(runs), 1) + self.assertEqual(subdict(runs[0], excl=('id')), { + 'task': btask['task'], + 'backend_id': btask['backend_id'], + 'scheduled': btask['scheduled'], + 'started': ts, + 'ended': None, + 'metadata': {'something': 'stupid'}, + 'status': 'started', + }) + + ts2 = utcnow() + self.backend.end_task_run(btask['backend_id'], + metadata={'other': 'stuff'}, + timestamp=ts2, + status='eventful') + runs = self.backend.get_task_runs(task_ids=[btask['task']]) + self.assertEqual(len(runs), 1) + self.assertEqual(subdict(runs[0], excl=('id')), { + 'task': btask['task'], + 'backend_id': btask['backend_id'], + 'scheduled': btask['scheduled'], + 'started': ts, + 'ended': ts2, + 'metadata': {'something': 'stupid', 'other': 'stuff'}, + 'status': 'eventful', + }) + class LocalSchedulerTest(CommonSchedulerTest, unittest.TestCase): def setUp(self):