diff --git a/sql/updates/26.sql b/sql/updates/26.sql new file mode 100644 --- /dev/null +++ b/sql/updates/26.sql @@ -0,0 +1,50 @@ +-- SWH DB schema upgrade +-- from_version: 25 +-- to_version: 26 +-- description: Add new functions to peek/grab tasks (with any priority) ready to be +-- scheduled. + +insert into dbversion (version, release, description) + values (26, now(), 'Work In Progress'); + +create or replace function swh_scheduler_peek_any_ready_priority_tasks ( + task_type text, ts timestamptz default now(), + num_tasks bigint default NULL + ) + returns setof task + language sql stable +as $$ + select * + from task t + where t.next_run <= ts + and t.type = task_type + and t.status = 'next_run_not_scheduled' + and t.priority is not null + order by t.next_run + limit num_tasks + for update skip locked; +$$; + +comment on function swh_scheduler_peek_any_ready_priority_tasks(text, timestamptz, bigint) +is 'List tasks with any priority ready for scheduling'; + +create or replace function swh_scheduler_grab_any_ready_priority_tasks ( + task_type text, ts timestamptz default now(), + num_tasks bigint default NULL + ) + returns setof task + language sql +as $$ + update task + set status='next_run_scheduled' + from ( + select id from swh_scheduler_peek_any_ready_priority_tasks( + task_type, ts, num_tasks + ) + ) next_tasks + where task.id = next_tasks.id + returning task.*; +$$; + +comment on function swh_scheduler_grab_any_ready_priority_tasks (text, timestamptz, bigint) +is 'Grab any priority tasks ready for scheduling and change their status'; diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -658,6 +658,45 @@ logger.debug("GRAB %s => %s" % (task_type, cur.rowcount)) return cur.fetchall() + @db_transaction() + def peek_ready_priority_tasks( + self, + task_type: str, + timestamp: Optional[datetime.datetime] = None, + num_tasks: Optional[int] = None, + db=None, + cur=None, + ) -> List[Dict]: + if timestamp is None: + timestamp = utcnow() + + cur.execute( + """select * from swh_scheduler_peek_any_ready_priority_tasks( + %s, %s, %s :: bigint)""", + (task_type, timestamp, num_tasks), + ) + logger.debug("PEEK %s => %s", task_type, cur.rowcount) + return cur.fetchall() + + @db_transaction() + def grab_ready_priority_tasks( + self, + task_type: str, + timestamp: Optional[datetime.datetime] = None, + num_tasks: Optional[int] = None, + db=None, + cur=None, + ) -> List[Dict]: + if timestamp is None: + timestamp = utcnow() + cur.execute( + """select * from swh_scheduler_grab_any_ready_priority_tasks( + %s, %s, %s :: bigint)""", + (task_type, timestamp, num_tasks), + ) + logger.debug("GRAB %s => %s", task_type, cur.rowcount) + return cur.fetchall() + task_run_create_keys = ["task", "backend_id", "scheduled", "metadata"] @db_transaction() diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py --- a/swh/scheduler/interface.py +++ b/swh/scheduler/interface.py @@ -166,6 +166,47 @@ """ ... + @remote_api_endpoint("task/peek_ready_with_priority") + def peek_ready_priority_tasks( + self, + task_type: str, + timestamp: Optional[datetime.datetime] = None, + num_tasks: Optional[int] = None, + ) -> List[Dict]: + """Fetch list of tasks (with any priority) ready to be scheduled. + + Args: + task_type: filtering task per their type + timestamp: peek tasks that need to be executed before that timestamp + num_tasks: only peek at num_tasks tasks (with no priority) + + Returns: + a list of tasks + + """ + ... + + @remote_api_endpoint("task/grab_ready_with_priority") + def grab_ready_priority_tasks( + self, + task_type: str, + timestamp: Optional[datetime.datetime] = None, + num_tasks: Optional[int] = None, + ) -> List[Dict]: + """Fetch and schedule the list of tasks (with any priority) ready to be scheduled. + + Args: + task_type: filtering task per their type + timestamp: grab tasks that need to be executed + before that timestamp + num_tasks: only grab num_tasks tasks (with no priority) + + Returns: + a list of tasks + + """ + ... + @remote_api_endpoint("task_run/schedule_one") def schedule_task_run(self, task_id, backend_id, metadata=None, timestamp=None): """Mark a given task as scheduled, adding a task_run entry in the database. diff --git a/swh/scheduler/sql/30-schema.sql b/swh/scheduler/sql/30-schema.sql --- a/swh/scheduler/sql/30-schema.sql +++ b/swh/scheduler/sql/30-schema.sql @@ -11,7 +11,7 @@ comment on column dbversion.description is 'Version description'; insert into dbversion (version, release, description) - values (25, now(), 'Work In Progress'); + values (26, now(), 'Work In Progress'); create table task_type ( type text primary key, diff --git a/swh/scheduler/sql/40-func.sql b/swh/scheduler/sql/40-func.sql --- a/swh/scheduler/sql/40-func.sql +++ b/swh/scheduler/sql/40-func.sql @@ -210,6 +210,50 @@ comment on function swh_scheduler_grab_ready_tasks (text, timestamptz, bigint, bigint) is 'Grab tasks ready for scheduling and change their status'; + +create or replace function swh_scheduler_peek_any_ready_priority_tasks ( + task_type text, ts timestamptz default now(), + num_tasks bigint default NULL + ) + returns setof task + language sql stable +as $$ + select * + from task t + where t.next_run <= ts + and t.type = task_type + and t.status = 'next_run_not_scheduled' + and t.priority is not null + order by t.next_run + limit num_tasks + for update skip locked; +$$; + +comment on function swh_scheduler_peek_any_ready_priority_tasks(text, timestamptz, bigint) +is 'List tasks with any priority ready for scheduling'; + +create or replace function swh_scheduler_grab_any_ready_priority_tasks ( + task_type text, ts timestamptz default now(), + num_tasks bigint default NULL + ) + returns setof task + language sql +as $$ + update task + set status='next_run_scheduled' + from ( + select id from swh_scheduler_peek_any_ready_priority_tasks( + task_type, ts, num_tasks + ) + ) next_tasks + where task.id = next_tasks.id + returning task.*; +$$; + +comment on function swh_scheduler_grab_any_ready_priority_tasks (text, timestamptz, bigint) +is 'Grab any priority tasks ready for scheduling and change their status'; + + create or replace function swh_scheduler_schedule_task_run (task_id bigint, backend_id text, metadata jsonb default '{}'::jsonb, diff --git a/swh/scheduler/tests/common.py b/swh/scheduler/tests/common.py --- a/swh/scheduler/tests/common.py +++ b/swh/scheduler/tests/common.py @@ -5,6 +5,7 @@ import copy import datetime +from typing import Dict, List TEMPLATES = { "git": { @@ -49,22 +50,31 @@ } -def tasks_from_template(template, max_timestamp, num, num_priority=0, priorities=None): +def _task_from_template( + template: Dict, next_run: datetime.datetime, priority: str, *args, **kwargs +) -> Dict: + ret = copy.deepcopy(template) + ret["next_run"] = next_run + if priority: + ret["priority"] = priority + if args: + ret["arguments"]["args"] = list(args) + if kwargs: + ret["arguments"]["kwargs"] = kwargs + return ret + + +def tasks_from_template( + template: Dict, + max_timestamp: datetime.datetime, + num: int, + num_priority: int = 0, + priorities: Dict = {}, +) -> List[Dict]: """Build tasks from template """ - def _task_from_template(template, next_run, priority, *args, **kwargs): - ret = copy.deepcopy(template) - ret["next_run"] = next_run - if priority: - ret["priority"] = priority - if args: - ret["arguments"]["args"] = list(args) - if kwargs: - ret["arguments"]["kwargs"] = kwargs - return ret - def _pop_priority(priorities): if not priorities: return None @@ -94,6 +104,24 @@ return tasks +def tasks_with_priority_from_template( + template: Dict, max_timestamp: datetime.datetime, num: int, priority: str +) -> List[Dict]: + """Build tasks with priority from template + + """ + return [ + _task_from_template( + template, + max_timestamp - datetime.timedelta(microseconds=i), + priority, + "argument-%03d" % i, + **{"kwarg%03d" % i: "bogus-kwarg"}, + ) + for i in range(num) + ] + + LISTERS = ( {"name": "github"}, {"name": "gitlab", "instance_name": "gitlab"}, diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py --- a/swh/scheduler/tests/test_api_client.py +++ b/swh/scheduler/tests/test_api_client.py @@ -58,6 +58,8 @@ "task/get", "task/grab_ready", "task/peek_ready", + "task/grab_ready_with_priority", + "task/peek_ready_with_priority", "task/search", "task/set_status", "task_run/end", diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -21,7 +21,13 @@ from swh.scheduler.model import ListedOrigin, OriginVisitStats, SchedulerMetrics from swh.scheduler.utils import utcnow -from .common import LISTERS, TASK_TYPES, TEMPLATES, tasks_from_template +from .common import ( + LISTERS, + TASK_TYPES, + TEMPLATES, + tasks_from_template, + tasks_with_priority_from_template, +) ONEDAY = datetime.timedelta(days=1) @@ -292,6 +298,39 @@ assert peeked == grabbed assert peeked["priority"] == grabbed["priority"] + def test_grab_ready_priority_tasks(self, swh_scheduler): + """check the grab and peek priority tasks endpoint behave as expected""" + self._create_task_types(swh_scheduler) + t = utcnow() + task_type = TEMPLATES["git"]["type"] + num_tasks = 100 + # Create tasks with and without priorities + tasks0 = tasks_with_priority_from_template( + TEMPLATES["git"], t, num_tasks, "high", + ) + tasks1 = tasks_with_priority_from_template( + TEMPLATES["hg"], t, num_tasks, "low", + ) + tasks2 = tasks_with_priority_from_template( + TEMPLATES["hg"], t, num_tasks, "normal", + ) + tasks = tasks0 + tasks1 + tasks2 + + random.shuffle(tasks) + swh_scheduler.create_tasks(tasks) + + ready_tasks = swh_scheduler.peek_ready_priority_tasks(task_type, num_tasks=50) + grabbed_tasks = swh_scheduler.grab_ready_priority_tasks(task_type, num_tasks=50) + + for peeked, grabbed in zip(ready_tasks, grabbed_tasks): + assert peeked["status"] == "next_run_not_scheduled" + del peeked["status"] + assert grabbed["status"] == "next_run_scheduled" + del grabbed["status"] + assert peeked == grabbed + assert peeked["priority"] == grabbed["priority"] + assert peeked["priority"] is not None + def test_get_tasks(self, swh_scheduler): self._create_task_types(swh_scheduler) t = utcnow()