Page MenuHomeSoftware Heritage

D5520.id19753.diff
No OneTemporary

D5520.id19753.diff

diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py
--- a/swh/scheduler/celery_backend/runner.py
+++ b/swh/scheduler/celery_backend/runner.py
@@ -1,14 +1,16 @@
-# Copyright (C) 2015-2018 The Software Heritage developers
+# Copyright (C) 2015-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
+from typing import Dict, List, Tuple
from kombu.utils.uuid import uuid
from swh.core.statsd import statsd
-from swh.scheduler import compute_nb_tasks_from, get_scheduler
+from swh.scheduler import get_scheduler
+from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.utils import utcnow
logger = logging.getLogger(__name__)
@@ -17,11 +19,18 @@
MAX_NUM_TASKS = 10000
-def run_ready_tasks(backend, app):
- """Run tasks that are ready
+def run_ready_tasks(backend: SchedulerInterface, app) -> List[Dict]:
+ """Schedule tasks ready to be scheduled.
+
+ This lookups any tasks per task type and mass schedules those accordingly (send
+ messages to rabbitmq and mark as scheduled equivalent tasks in the scheduler
+ backend).
+
+ If tasks (per task type) with priority exist, they will get redirected to dedicated
+ high priority queue (standard queue name prefixed with `save_code_now:`).
Args:
- backend (Scheduler): backend to read tasks to schedule
+ backend: scheduler backend to interact with (read/update tasks)
app (App): Celery application to send tasks to
Returns:
@@ -40,7 +49,7 @@
AsyncResult(id=task['backend_id']).get()
"""
- all_backend_tasks = []
+ all_backend_tasks: List[Dict] = []
while True:
task_types = {}
pending_tasks = []
@@ -67,12 +76,9 @@
# only pull tasks if the buffer is at least 1/5th empty (= 80%
# full), to help postgresql use properly indexed queries.
if num_tasks > min(MAX_NUM_TASKS, max_queue_length) // 5:
- num_tasks, num_tasks_priority = compute_nb_tasks_from(num_tasks)
-
+ # Only grab num_tasks tasks with no priority
grabbed_tasks = backend.grab_ready_tasks(
- task_type_name,
- num_tasks=num_tasks,
- num_tasks_priority=num_tasks_priority,
+ task_type_name, num_tasks=num_tasks, num_tasks_priority=0
)
if grabbed_tasks:
pending_tasks.extend(grabbed_tasks)
@@ -84,18 +90,44 @@
len(grabbed_tasks),
tags={"task_type": task_type_name},
)
+ # grab max_queue_length (or 10) potential tasks with any priority for the
+ # same type (limit the result to avoid too long running queries)
+ grabbed_priority_tasks = backend.grab_ready_priority_tasks(
+ task_type_name, num_tasks=max_queue_length or 10
+ )
+ if grabbed_priority_tasks:
+ pending_tasks.extend(grabbed_priority_tasks)
+ logger.info(
+ "Grabbed %s tasks %s (priority)",
+ len(grabbed_priority_tasks),
+ task_type_name,
+ )
+ statsd.increment(
+ "swh_scheduler_runner_scheduled_task_total",
+ len(grabbed_priority_tasks),
+ tags={"task_type": task_type_name},
+ )
+
if not pending_tasks:
return all_backend_tasks
backend_tasks = []
- celery_tasks = []
+ celery_tasks: List[Tuple[bool, str, str, List, Dict]] = []
for task in pending_tasks:
args = task["arguments"]["args"]
kwargs = task["arguments"]["kwargs"]
backend_name = task_types[task["type"]]["backend_name"]
backend_id = uuid()
- celery_tasks.append((backend_name, backend_id, args, kwargs))
+ celery_tasks.append(
+ (
+ task.get("priority") is not None,
+ backend_name,
+ backend_id,
+ args,
+ kwargs,
+ )
+ )
data = {
"task": task["id"],
"backend_id": backend_id,
@@ -106,10 +138,11 @@
logger.debug("Sent %s celery tasks", len(backend_tasks))
backend.mass_schedule_task_runs(backend_tasks)
- for backend_name, backend_id, args, kwargs in celery_tasks:
- app.send_task(
- backend_name, task_id=backend_id, args=args, kwargs=kwargs,
- )
+ for with_priority, backend_name, backend_id, args, kwargs in celery_tasks:
+ kw = dict(task_id=backend_id, args=args, kwargs=kwargs,)
+ if with_priority:
+ kw["queue"] = f"save_code_now:{backend_name}"
+ app.send_task(backend_name, **kw)
all_backend_tasks.extend(backend_tasks)
diff --git a/swh/scheduler/tests/test_celery_tasks.py b/swh/scheduler/tests/test_celery_tasks.py
--- a/swh/scheduler/tests/test_celery_tasks.py
+++ b/swh/scheduler/tests/test_celery_tasks.py
@@ -1,3 +1,10 @@
+# Copyright (C) 2019-2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""Module in charge of testing the scheduler runner module"""
+
from itertools import count
from time import sleep
@@ -70,20 +77,93 @@
AsyncResult(id=task["backend_id"]).get()
-def test_task_return_value(
+def test_run_ready_task_standard(
swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler
):
- task_type = swh_scheduler.get_task_type("swh-test-add")
+ """Ensure scheduler runner schedules tasks ready for scheduling"""
+ task_type_name, backend_name = "swh-test-add", "swh.scheduler.tests.tasks.add"
+ task_type = swh_scheduler.get_task_type(task_type_name)
assert task_type
- assert task_type["backend_name"] == "swh.scheduler.tests.tasks.add"
+ assert task_type["backend_name"] == backend_name
+
+ task_inputs = [
+ ("oneshot", (12, 30)),
+ ("oneshot", (20, 10)),
+ ("recurring", (30, 10)),
+ ]
+
+ tasks = swh_scheduler.create_tasks(
+ create_task_dict(task_type_name, policy, *args)
+ for (policy, args) in task_inputs
+ )
+
+ assert len(tasks) == len(task_inputs)
- swh_scheduler.create_tasks([create_task_dict("swh-test-add", "oneshot", 12, 30)])
+ task_ids = set()
+ for task in tasks:
+ assert task["status"] == "next_run_not_scheduled"
+ assert task["priority"] is None
+ task_ids.add(task["id"])
backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app)
- assert len(backend_tasks) == 1
- task = backend_tasks[0]
- value = AsyncResult(id=task["backend_id"]).get()
- assert value == 42
+ assert len(backend_tasks) == len(tasks)
+
+ scheduled_tasks = swh_scheduler.search_tasks(task_type=task_type_name)
+ assert len(scheduled_tasks) == len(tasks)
+ for task in scheduled_tasks:
+ assert task["status"] == "next_run_scheduled"
+ assert task["id"] in task_ids
+
+ for i, (_, args) in enumerate(task_inputs):
+ # take one of the task and read it from the queue backend
+ task = backend_tasks[i]
+ value = AsyncResult(id=task["backend_id"]).get()
+ assert value == sum(args)
+
+
+def test_run_ready_task_with_priority(
+ swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler
+):
+ """Ensure scheduler runner schedules priority tasks ready for scheduling"""
+ task_type_name, backend_name = "swh-test-add", "swh.scheduler.tests.tasks.add"
+ task_type = swh_scheduler.get_task_type(task_type_name)
+ assert task_type
+ assert task_type["backend_name"] == backend_name
+
+ task_inputs = [
+ ("oneshot", (10, 22), "low"),
+ ("oneshot", (20, 10), "normal"),
+ ("recurring", (30, 10), "high"),
+ ]
+
+ tasks = swh_scheduler.create_tasks(
+ create_task_dict(task_type_name, policy, *args, priority=priority)
+ for (policy, args, priority) in task_inputs
+ )
+
+ assert len(tasks) == len(task_inputs)
+
+ task_ids = set()
+ for task in tasks:
+ assert task["status"] == "next_run_not_scheduled"
+ assert task["priority"] is not None
+ task_ids.add(task["id"])
+
+ backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app)
+ assert len(backend_tasks) == len(tasks)
+
+ scheduled_tasks = swh_scheduler.search_tasks(task_type=task_type_name)
+ assert len(scheduled_tasks) == len(tasks)
+ for task in scheduled_tasks:
+ assert task["status"] == "next_run_scheduled"
+ assert task["id"] in task_ids
+
+ # TODO: Make the worker consume those messages so this can go green
+ # for i, (_, args, _) in enumerate(task_inputs):
+ # # take one of the task and read it from the queue backend
+ # task = backend_tasks[i]
+ # value = AsyncResult(id=task["backend_id"]).get()
+ # assert value == sum(args)
def test_task_exception(
diff --git a/swh/scheduler/tests/test_config.py b/swh/scheduler/tests/test_config.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/tests/test_config.py
@@ -0,0 +1,18 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import pytest
+
+from swh.scheduler.celery_backend.config import route_for_task
+
+
+@pytest.mark.parametrize("name", ["swh.something", "swh.anything"])
+def test_route_for_task_routing(name):
+ assert route_for_task(name, [], {}, {}) == {"queue": name}
+
+
+@pytest.mark.parametrize("name", [None, "foobar"])
+def test_route_for_task_no_routing(name):
+ assert route_for_task(name, [], {}, {}) is None

File Metadata

Mime Type
text/plain
Expires
Dec 21 2024, 7:36 AM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3227451

Event Timeline