diff --git a/swh/scheduler/celery_backend/config.py b/swh/scheduler/celery_backend/config.py --- a/swh/scheduler/celery_backend/config.py +++ b/swh/scheduler/celery_backend/config.py @@ -175,7 +175,7 @@ def route_for_task(name, args, kwargs, options, task=None, **kw): """Route tasks according to the task_queue attribute in the task class""" - if name is not None and name.startswith("swh."): + if name is not None and name.startswith(("swh.", "save_code_now:swh.")): return {"queue": name} diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py --- a/swh/scheduler/celery_backend/runner.py +++ b/swh/scheduler/celery_backend/runner.py @@ -1,14 +1,16 @@ -# Copyright (C) 2015-2018 The Software Heritage developers +# Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging +from typing import Dict, List, Tuple from kombu.utils.uuid import uuid from swh.core.statsd import statsd -from swh.scheduler import compute_nb_tasks_from, get_scheduler +from swh.scheduler import get_scheduler +from swh.scheduler.interface import SchedulerInterface from swh.scheduler.utils import utcnow logger = logging.getLogger(__name__) @@ -17,8 +19,14 @@ MAX_NUM_TASKS = 10000 -def run_ready_tasks(backend, app): - """Run tasks that are ready +def run_ready_tasks(backend: SchedulerInterface, app) -> List[Dict]: + """Schedule tasks ready to be scheduled. + + This will lookup any tasks per task type and mass schedule (send to rabbitmq and + mark as scheduled in the scheduler backend) those accordingly. + + If tasks (per task type) with priority exists, they will get redirected to dedicated + high priority queue (standard queue name prefixed with a `save_code_now:` prefix). Args: backend (Scheduler): backend to read tasks to schedule @@ -40,7 +48,7 @@ AsyncResult(id=task['backend_id']).get() """ - all_backend_tasks = [] + all_backend_tasks: List[Dict] = [] while True: task_types = {} pending_tasks = [] @@ -67,12 +75,8 @@ # only pull tasks if the buffer is at least 1/5th empty (= 80% # full), to help postgresql use properly indexed queries. if num_tasks > min(MAX_NUM_TASKS, max_queue_length) // 5: - num_tasks, num_tasks_priority = compute_nb_tasks_from(num_tasks) - grabbed_tasks = backend.grab_ready_tasks( - task_type_name, - num_tasks=num_tasks, - num_tasks_priority=num_tasks_priority, + task_type_name, num_tasks=num_tasks, ) if grabbed_tasks: pending_tasks.extend(grabbed_tasks) @@ -84,18 +88,26 @@ len(grabbed_tasks), tags={"task_type": task_type_name}, ) + + # grab potential tasks with any priority + grabbed_priority_tasks = backend.grab_ready_priority_tasks(task_type_name) + if grabbed_priority_tasks: + pending_tasks.extend(grabbed_priority_tasks) + if not pending_tasks: return all_backend_tasks backend_tasks = [] - celery_tasks = [] + celery_tasks: List[Tuple[str, str, str, List, Dict]] = [] for task in pending_tasks: args = task["arguments"]["args"] kwargs = task["arguments"]["kwargs"] backend_name = task_types[task["type"]]["backend_name"] backend_id = uuid() - celery_tasks.append((backend_name, backend_id, args, kwargs)) + celery_tasks.append( + (task.get("priority"), backend_name, backend_id, args, kwargs) + ) data = { "task": task["id"], "backend_id": backend_id, @@ -106,9 +118,10 @@ logger.debug("Sent %s celery tasks", len(backend_tasks)) backend.mass_schedule_task_runs(backend_tasks) - for backend_name, backend_id, args, kwargs in celery_tasks: + for priority, backend_name, backend_id, args, kwargs in celery_tasks: + name = backend_name if priority is None else f"save_code_now:{backend_name}" app.send_task( - backend_name, task_id=backend_id, args=args, kwargs=kwargs, + name, task_id=backend_id, args=args, kwargs=kwargs, ) all_backend_tasks.extend(backend_tasks) diff --git a/swh/scheduler/tests/test_config.py b/swh/scheduler/tests/test_config.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/tests/test_config.py @@ -0,0 +1,18 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest + +from swh.scheduler.celery_backend.config import route_for_task + + +@pytest.mark.parametrize("name", ["swh.something", "save_code_now:swh.something"]) +def test_route_for_task_routing(name): + assert route_for_task(name, [], {}, {}) == {"queue": name} + + +@pytest.mark.parametrize("name", [None, "foobar", "save_code_now:foobar"]) +def test_route_for_task_no_routing(name): + assert route_for_task(name, [], {}, {}) is None