diff --git a/sql/updates/27.sql b/sql/updates/27.sql new file mode 100644 --- /dev/null +++ b/sql/updates/27.sql @@ -0,0 +1,29 @@ +-- SWH DB schema upgrade +-- from_version: 26 +-- to_version: 27 +-- description: Clean up no longer used stored procedure + +insert into dbversion (version, release, description) + values (27, now(), 'Work In Progress'); + +delete function swh_scheduler_peek_ready_tasks (text, timestamptz, bigint, bigint); +delete function swh_scheduler_peek_priority_tasks (text, timestamptz, bigint); +-- delete old signature function +delete function swh_scheduler_grab_ready_tasks (text, timestamptz, bigint, bigint); + +create or replace function swh_scheduler_grab_ready_tasks (task_type text, ts timestamptz default now(), + num_tasks bigint default NULL) + returns setof task + language sql +as $$ + update task + set status='next_run_scheduled' + from ( + select id from swh_scheduler_peek_no_priority_tasks(task_type, ts, num_tasks) + ) next_tasks + where task.id = next_tasks.id + returning task.*; +$$; + +comment on function swh_scheduler_grab_ready_tasks (text, timestamptz, bigint) +is 'Grab (no priority) tasks ready for scheduling and change their status'; diff --git a/swh/scheduler/__init__.py b/swh/scheduler/__init__.py --- a/swh/scheduler/__init__.py +++ b/swh/scheduler/__init__.py @@ -1,4 +1,4 @@ -# Copyright (C) 2018-2020 The Software Heritage developers +# Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -9,9 +9,6 @@ from typing import TYPE_CHECKING, Any, Dict import warnings -# Percentage of tasks with priority to schedule -PRIORITY_SLOT = 0.6 - DEFAULT_CONFIG = { "scheduler": ( "dict", @@ -26,23 +23,6 @@ from swh.scheduler.interface import SchedulerInterface -def compute_nb_tasks_from(num_tasks): - """Compute and returns the tuple, number of tasks without priority, - number of tasks with priority. - - Args: - num_tasks (int): - - Returns: - tuple number of tasks without priority (int), number of tasks with - priority (int) - - """ - if not num_tasks: - return None, None - return (int((1 - PRIORITY_SLOT) * num_tasks), int(PRIORITY_SLOT * num_tasks)) - - BACKEND_TYPES: Dict[str, str] = { "local": ".backend.SchedulerBackend", "remote": ".api.client.RemoteScheduler", diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -592,34 +592,19 @@ @db_transaction() def peek_ready_tasks( self, - task_type, - timestamp=None, - num_tasks=None, - num_tasks_priority=None, + task_type: str, + timestamp: Optional[datetime.datetime] = None, + num_tasks: Optional[int] = None, db=None, cur=None, - ): - """Fetch the list of ready tasks - - Args: - task_type (str): filtering task per their type - timestamp (datetime.datetime): peek tasks that need to be executed - before that timestamp - num_tasks (int): only peek at num_tasks tasks (with no priority) - num_tasks_priority (int): only peek at num_tasks_priority - tasks (with priority) - - Returns: - a list of tasks - - """ + ) -> List[Dict]: if timestamp is None: timestamp = utcnow() cur.execute( - """select * from swh_scheduler_peek_ready_tasks( - %s, %s, %s :: bigint, %s :: bigint)""", - (task_type, timestamp, num_tasks, num_tasks_priority), + """select * from swh_scheduler_peek_no_priority_tasks( + %s, %s, %s :: bigint)""", + (task_type, timestamp, num_tasks), ) logger.debug("PEEK %s => %s" % (task_type, cur.rowcount)) return cur.fetchall() @@ -627,33 +612,18 @@ @db_transaction() def grab_ready_tasks( self, - task_type, - timestamp=None, - num_tasks=None, - num_tasks_priority=None, + task_type: str, + timestamp: Optional[datetime.datetime] = None, + num_tasks: Optional[int] = None, db=None, cur=None, - ): - """Fetch the list of ready tasks, and mark them as scheduled - - Args: - task_type (str): filtering task per their type - timestamp (datetime.datetime): grab tasks that need to be executed - before that timestamp - num_tasks (int): only grab num_tasks tasks (with no priority) - num_tasks_priority (int): only grab oneshot num_tasks tasks (with - priorities) - - Returns: - a list of tasks - - """ + ) -> List[Dict]: if timestamp is None: timestamp = utcnow() cur.execute( """select * from swh_scheduler_grab_ready_tasks( - %s, %s, %s :: bigint, %s :: bigint)""", - (task_type, timestamp, num_tasks, num_tasks_priority), + %s, %s, %s :: bigint)""", + (task_type, timestamp, num_tasks), ) logger.debug("GRAB %s => %s" % (task_type, cur.rowcount)) return cur.fetchall() @@ -921,11 +891,6 @@ cur.execute(query, args) return cur.fetchall() - @db_transaction() - def get_priority_ratios(self, db=None, cur=None): - cur.execute("select id, ratio from priority_ratio") - return {row["id"]: row["ratio"] for row in cur.fetchall()} - @db_transaction() def origin_visit_stats_upsert( self, origin_visit_stats: Iterable[OriginVisitStats], db=None, cur=None diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py --- a/swh/scheduler/celery_backend/runner.py +++ b/swh/scheduler/celery_backend/runner.py @@ -78,7 +78,7 @@ if num_tasks > min(MAX_NUM_TASKS, max_queue_length) // 5: # Only grab num_tasks tasks with no priority grabbed_tasks = backend.grab_ready_tasks( - task_type_name, num_tasks=num_tasks, num_tasks_priority=0 + task_type_name, num_tasks=num_tasks ) if grabbed_tasks: pending_tasks.extend(grabbed_tasks) diff --git a/swh/scheduler/cli/task.py b/swh/scheduler/cli/task.py --- a/swh/scheduler/cli/task.py +++ b/swh/scheduler/cli/task.py @@ -382,6 +382,7 @@ @click.option( "--limit", "-l", + "num_tasks", required=False, type=click.INT, help="The maximum number of tasks to fetch", @@ -394,26 +395,20 @@ help="List all jobs supposed to run before the given date", ) @click.pass_context -def list_pending_tasks(ctx, task_types, limit, before): - """List the tasks that are going to be run. +def list_pending_tasks(ctx, task_types, num_tasks, before): + """List tasks with no priority that are going to be run. - You can override the number of tasks to fetch + You can override the number of tasks to fetch with the --limit flag. """ - from swh.scheduler import compute_nb_tasks_from - scheduler = ctx.obj["scheduler"] if not scheduler: raise ValueError("Scheduler class (local/remote) must be instantiated") - num_tasks, num_tasks_priority = compute_nb_tasks_from(limit) output = [] for task_type in task_types: pending = scheduler.peek_ready_tasks( - task_type, - timestamp=before, - num_tasks=num_tasks, - num_tasks_priority=num_tasks_priority, + task_type, timestamp=before, num_tasks=num_tasks, ) output.append("Found %d %s tasks\n" % (len(pending), task_type)) diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py --- a/swh/scheduler/interface.py +++ b/swh/scheduler/interface.py @@ -128,40 +128,42 @@ @remote_api_endpoint("task/peek_ready") def peek_ready_tasks( - self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None, - ): - """Fetch the list of ready tasks + self, + task_type: str, + timestamp: Optional[datetime.datetime] = None, + num_tasks: Optional[int] = None, + ) -> List[Dict]: + """Fetch the list of tasks (with no priority) to be scheduled. Args: - task_type (str): filtering task per their type - timestamp (datetime.datetime): peek tasks that need to be executed + task_type: filtering task per their type + timestamp: peek tasks that need to be executed before that timestamp - num_tasks (int): only peek at num_tasks tasks (with no priority) - num_tasks_priority (int): only peek at num_tasks_priority - tasks (with priority) + num_tasks: only peek at num_tasks tasks (with no priority) Returns: - a list of tasks + the list of tasks which would be scheduled """ ... @remote_api_endpoint("task/grab_ready") def grab_ready_tasks( - self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None, - ): - """Fetch the list of ready tasks, and mark them as scheduled + self, + task_type: str, + timestamp: Optional[datetime.datetime] = None, + num_tasks: Optional[int] = None, + ) -> List[Dict]: + """Fetch and schedule the list of tasks (with no priority) ready to be scheduled. Args: - task_type (str): filtering task per their type - timestamp (datetime.datetime): grab tasks that need to be executed + task_type: filtering task per their type + timestamp: grab tasks that need to be executed before that timestamp - num_tasks (int): only grab num_tasks tasks (with no priority) - num_tasks_priority (int): only grab oneshot num_tasks tasks (with - priorities) + num_tasks: only grab num_tasks tasks (with no priority) Returns: - a list of tasks + the list of scheduled tasks """ ... @@ -372,7 +374,7 @@ Use the `limit` and `page_token` arguments for continuation. The next page token, if any, is returned in the PaginatedListedOriginList object. - """ +pffunc """ ... @remote_api_endpoint("origins/grab_next") @@ -398,10 +400,6 @@ """ ... - @remote_api_endpoint("priority_ratios/get") - def get_priority_ratios(self): - ... - @remote_api_endpoint("visit_stats/upsert") def origin_visit_stats_upsert( self, origin_visit_stats: Iterable[OriginVisitStats] diff --git a/swh/scheduler/sql/30-schema.sql b/swh/scheduler/sql/30-schema.sql --- a/swh/scheduler/sql/30-schema.sql +++ b/swh/scheduler/sql/30-schema.sql @@ -11,7 +11,7 @@ comment on column dbversion.description is 'Version description'; insert into dbversion (version, release, description) - values (26, now(), 'Work In Progress'); + values (27, now(), 'Work In Progress'); create table task_type ( type text primary key, diff --git a/swh/scheduler/sql/40-func.sql b/swh/scheduler/sql/40-func.sql --- a/swh/scheduler/sql/40-func.sql +++ b/swh/scheduler/sql/40-func.sql @@ -105,111 +105,23 @@ comment on function swh_scheduler_peek_tasks_with_priority(text, timestamptz, bigint, task_priority) is 'Retrieve tasks with a given priority'; -create or replace function swh_scheduler_peek_priority_tasks (task_type text, ts timestamptz default now(), - num_tasks_priority bigint default NULL) - returns setof task - language plpgsql -as $$ -declare - r record; - count_row bigint; - nb_diff bigint; - nb_high bigint; - nb_normal bigint; - nb_low bigint; -begin - -- expected values to fetch - select swh_scheduler_nb_priority_tasks(num_tasks_priority, 'high') into nb_high; - select swh_scheduler_nb_priority_tasks(num_tasks_priority, 'normal') into nb_normal; - select swh_scheduler_nb_priority_tasks(num_tasks_priority, 'low') into nb_low; - nb_diff := 0; - count_row := 0; - - for r in select * from swh_scheduler_peek_tasks_with_priority(task_type, ts, nb_high, 'high') - loop - count_row := count_row + 1; - return next r; - end loop; - - if count_row < nb_high then - nb_normal := nb_normal + nb_high - count_row; - end if; - - count_row := 0; - for r in select * from swh_scheduler_peek_tasks_with_priority(task_type, ts, nb_normal, 'normal') - loop - count_row := count_row + 1; - return next r; - end loop; - - if count_row < nb_normal then - nb_low := nb_low + nb_normal - count_row; - end if; - - return query select * from swh_scheduler_peek_tasks_with_priority(task_type, ts, nb_low, 'low'); -end -$$; - -comment on function swh_scheduler_peek_priority_tasks(text, timestamptz, bigint) -is 'Retrieve priority tasks'; - -create or replace function swh_scheduler_peek_ready_tasks (task_type text, ts timestamptz default now(), - num_tasks bigint default NULL, num_tasks_priority bigint default NULL) - returns setof task - language plpgsql -as $$ -declare - r record; - count_row bigint; - nb_diff bigint; - nb_tasks bigint; -begin - count_row := 0; - - for r in select * from swh_scheduler_peek_priority_tasks(task_type, ts, num_tasks_priority) - order by priority, next_run - loop - count_row := count_row + 1; - return next r; - end loop; - - if count_row < num_tasks_priority then - nb_tasks := num_tasks + num_tasks_priority - count_row; - else - nb_tasks := num_tasks; - end if; - - for r in select * from swh_scheduler_peek_no_priority_tasks(task_type, ts, nb_tasks) - order by priority, next_run - loop - return next r; - end loop; - - return; -end -$$; - -comment on function swh_scheduler_peek_ready_tasks(text, timestamptz, bigint, bigint) -is 'Retrieve tasks with/without priority in order'; create or replace function swh_scheduler_grab_ready_tasks (task_type text, ts timestamptz default now(), - num_tasks bigint default NULL, - num_tasks_priority bigint default NULL) + num_tasks bigint default NULL) returns setof task language sql as $$ update task set status='next_run_scheduled' from ( - select id from swh_scheduler_peek_ready_tasks(task_type, ts, num_tasks, num_tasks_priority) + select id from swh_scheduler_peek_no_priority_tasks(task_type, ts, num_tasks) ) next_tasks where task.id = next_tasks.id returning task.*; $$; -comment on function swh_scheduler_grab_ready_tasks (text, timestamptz, bigint, bigint) -is 'Grab tasks ready for scheduling and change their status'; - +comment on function swh_scheduler_grab_ready_tasks (text, timestamptz, bigint) +is 'Grab (no priority) tasks ready for scheduling and change their status'; create or replace function swh_scheduler_peek_any_ready_priority_tasks ( task_type text, ts timestamptz default now(), diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py --- a/swh/scheduler/tests/test_api_client.py +++ b/swh/scheduler/tests/test_api_client.py @@ -48,7 +48,6 @@ "origins/get", "origins/grab_next", "origins/record", - "priority_ratios/get", "scheduler_metrics/get", "scheduler_metrics/update", "task/create", diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -31,6 +31,9 @@ ONEDAY = datetime.timedelta(days=1) +# for compatibility purpose with exiting test code +PRIORITY_RATIO = {"high": 0.6, "normal": 0.3, "low": 0.2} + def subdict(d, keys=None, excl=()): if keys is None: @@ -77,13 +80,6 @@ assert missing_methods == [] - def test_get_priority_ratios(self, swh_scheduler): - assert swh_scheduler.get_priority_ratios() == { - "high": 0.5, - "normal": 0.3, - "low": 0.2, - } - def test_add_task_type(self, swh_scheduler): tt = TASK_TYPES["git"] swh_scheduler.create_task_type(tt) @@ -108,7 +104,6 @@ assert tt2 in actual_task_types def test_create_tasks(self, swh_scheduler): - priority_ratio = self._priority_ratio(swh_scheduler) self._create_task_types(swh_scheduler) num_tasks_priority = 100 tasks_1 = tasks_from_template(TEMPLATES["git"], utcnow(), 100) @@ -117,7 +112,7 @@ utcnow(), 100, num_tasks_priority, - priorities=priority_ratio, + priorities=PRIORITY_RATIO, ) tasks = tasks_1 + tasks_2 @@ -161,7 +156,7 @@ assert dict(actual_priorities) == { priority: int(ratio * num_tasks_priority) - for priority, ratio in priority_ratio.items() + for priority, ratio in PRIORITY_RATIO.items() } def test_peek_ready_tasks_no_priority(self, swh_scheduler): @@ -205,11 +200,8 @@ assert ready_task["next_run"] <= max_ts assert ready_task in ready_tasks[: limit // 3] - def _priority_ratio(self, swh_scheduler): - return swh_scheduler.get_priority_ratios() - - def test_peek_ready_tasks_mixed_priorities(self, swh_scheduler): - priority_ratio = self._priority_ratio(swh_scheduler) + def test_peek_ready_tasks_returns_only_no_priority_tasks(self, swh_scheduler): + """Peek ready tasks only return standard tasks (no priority)""" self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES["git"]["type"] @@ -221,52 +213,28 @@ t, num=num_tasks_no_priority, num_priority=num_tasks_priority, - priorities=priority_ratio, + priorities=PRIORITY_RATIO, ) + count_priority = 0 + for task in tasks: + count_priority += 0 if task.get("priority") is None else 1 + + assert count_priority > 0, "Some created tasks should have some priority" + random.shuffle(tasks) swh_scheduler.create_tasks(tasks) - # take all available tasks + # take all available no priority tasks ready_tasks = swh_scheduler.peek_ready_tasks(task_type) - assert len(ready_tasks) == len(tasks) - assert num_tasks_priority + num_tasks_no_priority == len(ready_tasks) + assert len(ready_tasks) == len(tasks) - count_priority - count_tasks_per_priority = defaultdict(int) + # No read task should have any priority for task in ready_tasks: - priority = task.get("priority") - if priority: - count_tasks_per_priority[priority] += 1 - - assert dict(count_tasks_per_priority) == { - priority: int(ratio * num_tasks_priority) - for priority, ratio in priority_ratio.items() - } - - # Only get some ready tasks - num_tasks = random.randrange(5, 5 + num_tasks_no_priority // 2) - num_tasks_priority = random.randrange(5, num_tasks_priority // 2) - ready_tasks_limited = swh_scheduler.peek_ready_tasks( - task_type, num_tasks=num_tasks, num_tasks_priority=num_tasks_priority - ) - - count_tasks_per_priority = defaultdict(int) - for task in ready_tasks_limited: - priority = task.get("priority") - count_tasks_per_priority[priority] += 1 - - import math - - for priority, ratio in priority_ratio.items(): - expected_count = math.ceil(ratio * num_tasks_priority) - actual_prio = count_tasks_per_priority[priority] - assert actual_prio == expected_count or actual_prio == expected_count + 1 - - assert count_tasks_per_priority[None] == num_tasks + assert task.get("priority") is None def test_grab_ready_tasks(self, swh_scheduler): - priority_ratio = self._priority_ratio(swh_scheduler) self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES["git"]["type"] @@ -278,17 +246,13 @@ t, num=num_tasks_no_priority, num_priority=num_tasks_priority, - priorities=priority_ratio, + priorities=PRIORITY_RATIO, ) random.shuffle(tasks) swh_scheduler.create_tasks(tasks) - first_ready_tasks = swh_scheduler.peek_ready_tasks( - task_type, num_tasks=10, num_tasks_priority=10 - ) - grabbed_tasks = swh_scheduler.grab_ready_tasks( - task_type, num_tasks=10, num_tasks_priority=10 - ) + first_ready_tasks = swh_scheduler.peek_ready_tasks(task_type, num_tasks=50) + grabbed_tasks = swh_scheduler.grab_ready_tasks(task_type, num_tasks=50) for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks): assert peeked["status"] == "next_run_not_scheduled" @@ -296,7 +260,9 @@ assert grabbed["status"] == "next_run_scheduled" del grabbed["status"] assert peeked == grabbed - assert peeked["priority"] == grabbed["priority"] + priority = grabbed["priority"] + assert priority == peeked["priority"] + assert priority is None def test_grab_ready_priority_tasks(self, swh_scheduler): """check the grab and peek priority tasks endpoint behave as expected"""