diff --git a/swh/scheduler/api/client.py b/swh/scheduler/api/client.py --- a/swh/scheduler/api/client.py +++ b/swh/scheduler/api/client.py @@ -50,6 +50,14 @@ def get_tasks(self, task_ids): return self.post('get_tasks', {'task_ids': task_ids}) + def search_tasks(self, task_id=None, task_type=None, status=None, + priority=None, policy=None, before=None, after=None, + limit=None): + return self.post('search_tasks', dict( + task_id=task_id, task_type=task_type, status=status, + priority=priority, policy=policy, before=before, after=after, + limit=limit)) + def peek_ready_tasks(self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None): return self.post('peek_ready_tasks', { diff --git a/swh/scheduler/api/server.py b/swh/scheduler/api/server.py --- a/swh/scheduler/api/server.py +++ b/swh/scheduler/api/server.py @@ -85,6 +85,11 @@ return encode_data(get_sched().get_tasks(**decode_request(request))) +@app.route('/search_tasks', methods=['POST']) +def search_tasks(): + return encode_data(get_sched().search_tasks(**decode_request(request))) + + @app.route('/peek_ready_tasks', methods=['POST']) def peek_ready_tasks(): return encode_data(get_sched().peek_ready_tasks(**decode_request(request))) diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -305,6 +305,59 @@ return self.set_status_tasks(task_ids) @autocommit + def search_tasks(self, task_id=None, task_type=None, status=None, + priority=None, policy=None, before=None, after=None, + limit=None, cursor=None): + """Search tasks from selected criterions""" + where = [] + args = [] + + if task_id: + if isinstance(task_id, (str, int)): + where.append('id = %s') + else: + where.append('id in %s') + task_id = tuple(task_id) + args.append(task_id) + if task_type: + if isinstance(task_type, str): + where.append('type = %s') + else: + where.append('type in %s') + task_type = tuple(task_type) + args.append(task_type) + if status: + if isinstance(status, str): + where.append('status = %s') + else: + where.append('status in %s') + status = tuple(status) + args.append(status) + if priority: + if isinstance(priority, str): + where.append('priority = %s') + else: + priority = tuple(priority) + where.append('priority in %s') + args.append(priority) + if policy: + where.append('policy = %s') + args.append(policy) + if before: + where.append('next_run <= %s') + args.append(before) + if after: + where.append('next_run >= %s') + args.append(after) + + query = 'select * from task where ' + ' and '.join(where) + if limit: + query += ' limit %s :: bigint' + args.append(limit) + cursor.execute(query, args) + return cursor.fetchall() + + @autocommit def get_tasks(self, task_ids, cursor=None): """Retrieve the info of tasks whose ids are listed.""" query = self._format_query('select {keys} from task where id in %s',