diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py --- a/swh/scheduler/celery_backend/runner.py +++ b/swh/scheduler/celery_backend/runner.py @@ -1,14 +1,16 @@ -# Copyright (C) 2015-2018 The Software Heritage developers +# Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging +from typing import Dict, List, Tuple from kombu.utils.uuid import uuid from swh.core.statsd import statsd -from swh.scheduler import compute_nb_tasks_from, get_scheduler +from swh.scheduler import get_scheduler +from swh.scheduler.interface import SchedulerInterface from swh.scheduler.utils import utcnow logger = logging.getLogger(__name__) @@ -17,11 +19,18 @@ MAX_NUM_TASKS = 10000 -def run_ready_tasks(backend, app): - """Run tasks that are ready +def run_ready_tasks(backend: SchedulerInterface, app) -> List[Dict]: + """Schedule tasks ready to be scheduled. + + This lookups any tasks per task type and mass schedules those accordingly (send + messages to rabbitmq and mark as scheduled equivalent tasks in the scheduler + backend). + + If tasks (per task type) with priority exist, they will get redirected to dedicated + high priority queue (standard queue name prefixed with `save_code_now:`). Args: - backend (Scheduler): backend to read tasks to schedule + backend: scheduler backend to interact with (read/update tasks) app (App): Celery application to send tasks to Returns: @@ -40,7 +49,7 @@ AsyncResult(id=task['backend_id']).get() """ - all_backend_tasks = [] + all_backend_tasks: List[Dict] = [] while True: task_types = {} pending_tasks = [] @@ -67,12 +76,9 @@ # only pull tasks if the buffer is at least 1/5th empty (= 80% # full), to help postgresql use properly indexed queries. if num_tasks > min(MAX_NUM_TASKS, max_queue_length) // 5: - num_tasks, num_tasks_priority = compute_nb_tasks_from(num_tasks) - + # Only grab num_tasks tasks with no priority grabbed_tasks = backend.grab_ready_tasks( - task_type_name, - num_tasks=num_tasks, - num_tasks_priority=num_tasks_priority, + task_type_name, num_tasks=num_tasks, num_tasks_priority=0 ) if grabbed_tasks: pending_tasks.extend(grabbed_tasks) @@ -84,18 +90,44 @@ len(grabbed_tasks), tags={"task_type": task_type_name}, ) + # grab max_queue_length potential tasks with any priority for the same type + # (limit the result to avoid too long running queries) + grabbed_priority_tasks = backend.grab_ready_priority_tasks( + task_type_name, num_tasks=max_queue_length + ) + if grabbed_priority_tasks: + pending_tasks.extend(grabbed_priority_tasks) + logger.info( + "Grabbed %s tasks %s (priority)", + len(grabbed_priority_tasks), + task_type_name, + ) + statsd.increment( + "swh_scheduler_runner_scheduled_task_total", + len(grabbed_priority_tasks), + tags={"task_type": task_type_name}, + ) + if not pending_tasks: return all_backend_tasks backend_tasks = [] - celery_tasks = [] + celery_tasks: List[Tuple[bool, str, str, List, Dict]] = [] for task in pending_tasks: args = task["arguments"]["args"] kwargs = task["arguments"]["kwargs"] backend_name = task_types[task["type"]]["backend_name"] backend_id = uuid() - celery_tasks.append((backend_name, backend_id, args, kwargs)) + celery_tasks.append( + ( + task.get("priority") is not None, + backend_name, + backend_id, + args, + kwargs, + ) + ) data = { "task": task["id"], "backend_id": backend_id, @@ -106,10 +138,20 @@ logger.debug("Sent %s celery tasks", len(backend_tasks)) backend.mass_schedule_task_runs(backend_tasks) - for backend_name, backend_id, args, kwargs in celery_tasks: - app.send_task( - backend_name, task_id=backend_id, args=args, kwargs=kwargs, - ) + for with_priority, backend_name, backend_id, args, kwargs in celery_tasks: + if with_priority: + queue = f"save_code_now:{backend_name}" + app.send_task( + backend_name, + queue=queue, + task_id=backend_id, + args=args, + kwargs=kwargs, + ) + else: + app.send_task( + backend_name, task_id=backend_id, args=args, kwargs=kwargs, + ) all_backend_tasks.extend(backend_tasks) diff --git a/swh/scheduler/tests/test_config.py b/swh/scheduler/tests/test_config.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/tests/test_config.py @@ -0,0 +1,18 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest + +from swh.scheduler.celery_backend.config import route_for_task + + +@pytest.mark.parametrize("name", ["swh.something", "swh.anything"]) +def test_route_for_task_routing(name): + assert route_for_task(name, [], {}, {}) == {"queue": name} + + +@pytest.mark.parametrize("name", [None, "foobar"]) +def test_route_for_task_no_routing(name): + assert route_for_task(name, [], {}, {}) is None