diff --git a/swh/scheduler/api/client.py b/swh/scheduler/api/client.py --- a/swh/scheduler/api/client.py +++ b/swh/scheduler/api/client.py @@ -50,6 +50,14 @@ def get_tasks(self, task_ids): return self.post('get_tasks', {'task_ids': task_ids}) + def search_tasks(self, task_id=None, task_type=None, status=None, + priority=None, policy=None, before=None, after=None, + limit=None): + return self.post('search_tasks', dict( + task_id=task_id, task_type=task_type, status=status, + priority=priority, policy=policy, before=before, after=after, + limit=limit)) + def peek_ready_tasks(self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None): return self.post('peek_ready_tasks', { diff --git a/swh/scheduler/api/server.py b/swh/scheduler/api/server.py --- a/swh/scheduler/api/server.py +++ b/swh/scheduler/api/server.py @@ -85,6 +85,11 @@ return encode_data(get_sched().get_tasks(**decode_request(request))) +@app.route('/search_tasks', methods=['POST']) +def search_tasks(): + return encode_data(get_sched().search_tasks(**decode_request(request))) + + @app.route('/peek_ready_tasks', methods=['POST']) def peek_ready_tasks(): return encode_data(get_sched().peek_ready_tasks(**decode_request(request))) diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -305,6 +305,59 @@ return self.set_status_tasks(task_ids) @autocommit + def search_tasks(self, task_id=None, task_type=None, status=None, + priority=None, policy=None, before=None, after=None, + limit=None, cursor=None): + """Search tasks from selected criterions""" + where = [] + args = [] + + if task_id: + if isinstance(task_id, (str, int)): + where.append('id = %s') + else: + where.append('id in %s') + task_id = tuple(task_id) + args.append(task_id) + if task_type: + if isinstance(task_type, str): + where.append('type = %s') + else: + where.append('type in %s') + task_type = tuple(task_type) + args.append(task_type) + if status: + if isinstance(status, str): + where.append('status = %s') + else: + where.append('status in %s') + status = tuple(status) + args.append(status) + if priority: + if isinstance(priority, str): + where.append('priority = %s') + else: + priority = tuple(priority) + where.append('priority in %s') + args.append(priority) + if policy: + where.append('policy = %s') + args.append(policy) + if before: + where.append('next_run <= %s') + args.append(before) + if after: + where.append('next_run >= %s') + args.append(after) + + query = 'select * from task where ' + ' and '.join(where) + if limit: + query += ' limit %s :: bigint' + args.append(limit) + cursor.execute(query, args) + return cursor.fetchall() + + @autocommit def get_tasks(self, task_ids, cursor=None): """Retrieve the info of tasks whose ids are listed.""" query = self._format_query('select {keys} from task where id in %s', diff --git a/swh/scheduler/cli.py b/swh/scheduler/cli.py --- a/swh/scheduler/cli.py +++ b/swh/scheduler/cli.py @@ -48,7 +48,7 @@ for key, value in dict.items()) -def pretty_print_task(task): +def pretty_print_task(task, full=False): """Pretty-print a task""" next_run = arrow.get(task['next_run']) lines = [ @@ -59,8 +59,17 @@ '\n', click.style(' Interval: ', bold=True), str(task['current_interval']), '\n', - click.style(' Type: ', bold=True), task['type'], '\n', - click.style(' Policy: ', bold=True), task['policy'], '\n', + click.style(' Type: ', bold=True), task['type'] or '', '\n', + click.style(' Policy: ', bold=True), task['policy'] or '', '\n', + ] + if full: + lines += [ + click.style(' Status: ', bold=True), + task['status'] or '', '\n', + click.style(' Priority: ', bold=True), + task['priority'] or '', '\n', + ] + lines += [ click.style(' Args:\n', bold=True), pretty_print_list(task['arguments']['args'], indent=4), click.style(' Keyword args:\n', bold=True), @@ -274,6 +283,65 @@ click.echo('\n'.join(output)) +@task.command('list') +@click.option('--task-id', '-i', default=None, multiple=True, + help='List only tasks which id is given') +@click.option('--task-type', '-t', default=None, multiple=True, + help='List only tasks of type task-type') +@click.option('--limit', '-l', required=False, type=click.INT, + help='The maximum number of tasks to fetch') +@click.option('--status', '-s', multiple=True, + default=None, + help='List tasks which status is STATUS') +@click.option('--policy', '-p', default=None, + type=click.Choice(['recurring', 'oneshot']), + help='List tasks which policy is POLICY') +@click.option('--priority', '-P', default=None, multiple=True, + type=click.Choice(['all', 'low', 'normal', 'high']), + help='List tasks which priority is PRIORITY') +@click.option('--before', '-b', required=False, type=DATETIME, + help='List all tasks supposed to run before the given date') +@click.option('--after', '-a', required=False, type=DATETIME, + help='List all tasks supposed to run after the given date') +@click.pass_context +def list_tasks(ctx, task_id, task_type, limit, status, policy, priority, + before, after): + """List tasks. + """ + scheduler = ctx.obj['scheduler'] + if not scheduler: + raise ValueError('Scheduler class (local/remote) must be instantiated') + + if not task_type: + task_type = [x['type'] for x in scheduler.get_task_types()] + + # if task_id is not given, default value for status is + # 'next_run_not_scheduled' + # if task_id is given, default status is 'all' + if task_id is None and status is None: + status = ['next_run_not_scheduled'] + if status and 'all' in status: + status = None + + if priority and 'all' in priority: + priority = None + + output = [] + tasks = scheduler.search_tasks( + task_id=task_id, + task_type=task_type, + status=status, priority=priority, policy=policy, + before=before, after=after, + limit=limit) + + output.append('Found %d tasks\n' % ( + len(tasks))) + for task in tasks: + output.append(pretty_print_task(task, full=True)) + + click.echo('\n'.join(output)) + + @task.command('archive') @click.option('--before', '-b', default=None, help='''Task whose ended date is anterior will be archived.