Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7124327
D5520.id19753.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
10 KB
Subscribers
None
D5520.id19753.diff
View Options
diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py
--- a/swh/scheduler/celery_backend/runner.py
+++ b/swh/scheduler/celery_backend/runner.py
@@ -1,14 +1,16 @@
-# Copyright (C) 2015-2018 The Software Heritage developers
+# Copyright (C) 2015-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
+from typing import Dict, List, Tuple
from kombu.utils.uuid import uuid
from swh.core.statsd import statsd
-from swh.scheduler import compute_nb_tasks_from, get_scheduler
+from swh.scheduler import get_scheduler
+from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.utils import utcnow
logger = logging.getLogger(__name__)
@@ -17,11 +19,18 @@
MAX_NUM_TASKS = 10000
-def run_ready_tasks(backend, app):
- """Run tasks that are ready
+def run_ready_tasks(backend: SchedulerInterface, app) -> List[Dict]:
+ """Schedule tasks ready to be scheduled.
+
+ This lookups any tasks per task type and mass schedules those accordingly (send
+ messages to rabbitmq and mark as scheduled equivalent tasks in the scheduler
+ backend).
+
+ If tasks (per task type) with priority exist, they will get redirected to dedicated
+ high priority queue (standard queue name prefixed with `save_code_now:`).
Args:
- backend (Scheduler): backend to read tasks to schedule
+ backend: scheduler backend to interact with (read/update tasks)
app (App): Celery application to send tasks to
Returns:
@@ -40,7 +49,7 @@
AsyncResult(id=task['backend_id']).get()
"""
- all_backend_tasks = []
+ all_backend_tasks: List[Dict] = []
while True:
task_types = {}
pending_tasks = []
@@ -67,12 +76,9 @@
# only pull tasks if the buffer is at least 1/5th empty (= 80%
# full), to help postgresql use properly indexed queries.
if num_tasks > min(MAX_NUM_TASKS, max_queue_length) // 5:
- num_tasks, num_tasks_priority = compute_nb_tasks_from(num_tasks)
-
+ # Only grab num_tasks tasks with no priority
grabbed_tasks = backend.grab_ready_tasks(
- task_type_name,
- num_tasks=num_tasks,
- num_tasks_priority=num_tasks_priority,
+ task_type_name, num_tasks=num_tasks, num_tasks_priority=0
)
if grabbed_tasks:
pending_tasks.extend(grabbed_tasks)
@@ -84,18 +90,44 @@
len(grabbed_tasks),
tags={"task_type": task_type_name},
)
+ # grab max_queue_length (or 10) potential tasks with any priority for the
+ # same type (limit the result to avoid too long running queries)
+ grabbed_priority_tasks = backend.grab_ready_priority_tasks(
+ task_type_name, num_tasks=max_queue_length or 10
+ )
+ if grabbed_priority_tasks:
+ pending_tasks.extend(grabbed_priority_tasks)
+ logger.info(
+ "Grabbed %s tasks %s (priority)",
+ len(grabbed_priority_tasks),
+ task_type_name,
+ )
+ statsd.increment(
+ "swh_scheduler_runner_scheduled_task_total",
+ len(grabbed_priority_tasks),
+ tags={"task_type": task_type_name},
+ )
+
if not pending_tasks:
return all_backend_tasks
backend_tasks = []
- celery_tasks = []
+ celery_tasks: List[Tuple[bool, str, str, List, Dict]] = []
for task in pending_tasks:
args = task["arguments"]["args"]
kwargs = task["arguments"]["kwargs"]
backend_name = task_types[task["type"]]["backend_name"]
backend_id = uuid()
- celery_tasks.append((backend_name, backend_id, args, kwargs))
+ celery_tasks.append(
+ (
+ task.get("priority") is not None,
+ backend_name,
+ backend_id,
+ args,
+ kwargs,
+ )
+ )
data = {
"task": task["id"],
"backend_id": backend_id,
@@ -106,10 +138,11 @@
logger.debug("Sent %s celery tasks", len(backend_tasks))
backend.mass_schedule_task_runs(backend_tasks)
- for backend_name, backend_id, args, kwargs in celery_tasks:
- app.send_task(
- backend_name, task_id=backend_id, args=args, kwargs=kwargs,
- )
+ for with_priority, backend_name, backend_id, args, kwargs in celery_tasks:
+ kw = dict(task_id=backend_id, args=args, kwargs=kwargs,)
+ if with_priority:
+ kw["queue"] = f"save_code_now:{backend_name}"
+ app.send_task(backend_name, **kw)
all_backend_tasks.extend(backend_tasks)
diff --git a/swh/scheduler/tests/test_celery_tasks.py b/swh/scheduler/tests/test_celery_tasks.py
--- a/swh/scheduler/tests/test_celery_tasks.py
+++ b/swh/scheduler/tests/test_celery_tasks.py
@@ -1,3 +1,10 @@
+# Copyright (C) 2019-2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""Module in charge of testing the scheduler runner module"""
+
from itertools import count
from time import sleep
@@ -70,20 +77,93 @@
AsyncResult(id=task["backend_id"]).get()
-def test_task_return_value(
+def test_run_ready_task_standard(
swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler
):
- task_type = swh_scheduler.get_task_type("swh-test-add")
+ """Ensure scheduler runner schedules tasks ready for scheduling"""
+ task_type_name, backend_name = "swh-test-add", "swh.scheduler.tests.tasks.add"
+ task_type = swh_scheduler.get_task_type(task_type_name)
assert task_type
- assert task_type["backend_name"] == "swh.scheduler.tests.tasks.add"
+ assert task_type["backend_name"] == backend_name
+
+ task_inputs = [
+ ("oneshot", (12, 30)),
+ ("oneshot", (20, 10)),
+ ("recurring", (30, 10)),
+ ]
+
+ tasks = swh_scheduler.create_tasks(
+ create_task_dict(task_type_name, policy, *args)
+ for (policy, args) in task_inputs
+ )
+
+ assert len(tasks) == len(task_inputs)
- swh_scheduler.create_tasks([create_task_dict("swh-test-add", "oneshot", 12, 30)])
+ task_ids = set()
+ for task in tasks:
+ assert task["status"] == "next_run_not_scheduled"
+ assert task["priority"] is None
+ task_ids.add(task["id"])
backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app)
- assert len(backend_tasks) == 1
- task = backend_tasks[0]
- value = AsyncResult(id=task["backend_id"]).get()
- assert value == 42
+ assert len(backend_tasks) == len(tasks)
+
+ scheduled_tasks = swh_scheduler.search_tasks(task_type=task_type_name)
+ assert len(scheduled_tasks) == len(tasks)
+ for task in scheduled_tasks:
+ assert task["status"] == "next_run_scheduled"
+ assert task["id"] in task_ids
+
+ for i, (_, args) in enumerate(task_inputs):
+ # take one of the task and read it from the queue backend
+ task = backend_tasks[i]
+ value = AsyncResult(id=task["backend_id"]).get()
+ assert value == sum(args)
+
+
+def test_run_ready_task_with_priority(
+ swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler
+):
+ """Ensure scheduler runner schedules priority tasks ready for scheduling"""
+ task_type_name, backend_name = "swh-test-add", "swh.scheduler.tests.tasks.add"
+ task_type = swh_scheduler.get_task_type(task_type_name)
+ assert task_type
+ assert task_type["backend_name"] == backend_name
+
+ task_inputs = [
+ ("oneshot", (10, 22), "low"),
+ ("oneshot", (20, 10), "normal"),
+ ("recurring", (30, 10), "high"),
+ ]
+
+ tasks = swh_scheduler.create_tasks(
+ create_task_dict(task_type_name, policy, *args, priority=priority)
+ for (policy, args, priority) in task_inputs
+ )
+
+ assert len(tasks) == len(task_inputs)
+
+ task_ids = set()
+ for task in tasks:
+ assert task["status"] == "next_run_not_scheduled"
+ assert task["priority"] is not None
+ task_ids.add(task["id"])
+
+ backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app)
+ assert len(backend_tasks) == len(tasks)
+
+ scheduled_tasks = swh_scheduler.search_tasks(task_type=task_type_name)
+ assert len(scheduled_tasks) == len(tasks)
+ for task in scheduled_tasks:
+ assert task["status"] == "next_run_scheduled"
+ assert task["id"] in task_ids
+
+ # TODO: Make the worker consume those messages so this can go green
+ # for i, (_, args, _) in enumerate(task_inputs):
+ # # take one of the task and read it from the queue backend
+ # task = backend_tasks[i]
+ # value = AsyncResult(id=task["backend_id"]).get()
+ # assert value == sum(args)
def test_task_exception(
diff --git a/swh/scheduler/tests/test_config.py b/swh/scheduler/tests/test_config.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/tests/test_config.py
@@ -0,0 +1,18 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import pytest
+
+from swh.scheduler.celery_backend.config import route_for_task
+
+
+@pytest.mark.parametrize("name", ["swh.something", "swh.anything"])
+def test_route_for_task_routing(name):
+ assert route_for_task(name, [], {}, {}) == {"queue": name}
+
+
+@pytest.mark.parametrize("name", [None, "foobar"])
+def test_route_for_task_no_routing(name):
+ assert route_for_task(name, [], {}, {}) is None
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 21 2024, 7:36 AM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3227451
Attached To
D5520: Route priority tasks to dedicated save code now queues
Event Timeline
Log In to Comment