Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9697265
D5535.id19766.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
7 KB
Subscribers
None
D5535.id19766.diff
View Options
diff --git a/swh/scheduler/tests/tasks.py b/swh/scheduler/tests/tasks.py
--- a/swh/scheduler/tests/tasks.py
+++ b/swh/scheduler/tests/tasks.py
@@ -1,12 +1,18 @@
-# Copyright (C) 2018-2019 The Software Heritage developers
+# Copyright (C) 2018-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from celery import group, shared_task
+TASK_PING = "swh.scheduler.tests.tasks.ping"
+TASK_MULTIPING = "swh.scheduler.tests.tasks.multiping"
+TASK_ERROR = "swh.scheduler.tests.tasks.error"
+TASK_ADD = "swh.scheduler.tests.tasks.add"
+TASK_ECHO = "swh.scheduler.tests.tasks.echo"
-@shared_task(name="swh.scheduler.tests.tasks.ping", bind=True)
+
+@shared_task(name=TASK_PING, bind=True)
def ping(self, **kw):
# check this is a SWHTask
assert hasattr(self, "log")
@@ -18,7 +24,7 @@
return "OK"
-@shared_task(name="swh.scheduler.tests.tasks.multiping", bind=True)
+@shared_task(name=TASK_MULTIPING, bind=True)
def multiping(self, n=10):
promise = group(ping.s(i=i) for i in range(n))()
self.log.debug("%s OK (spawned %s subtasks)" % (self.name, n))
@@ -26,17 +32,17 @@
return promise.id
-@shared_task(name="swh.scheduler.tests.tasks.error")
+@shared_task(name=TASK_ERROR)
def not_implemented():
raise NotImplementedError("Nope")
-@shared_task(name="swh.scheduler.tests.tasks.add")
+@shared_task(name=TASK_ADD)
def add(x, y):
return x + y
-@shared_task(name="swh.scheduler.tests.tasks.echo")
+@shared_task(name=TASK_ECHO)
def echo(**kw):
"Does nothing, just return the given kwargs dict"
return kw
diff --git a/swh/scheduler/tests/test_celery_tasks.py b/swh/scheduler/tests/test_celery_tasks.py
--- a/swh/scheduler/tests/test_celery_tasks.py
+++ b/swh/scheduler/tests/test_celery_tasks.py
@@ -9,14 +9,48 @@
from time import sleep
from celery.result import AsyncResult, GroupResult
+from kombu import Exchange, Queue
import pytest
from swh.scheduler.celery_backend.runner import run_ready_tasks
+from swh.scheduler.tests.tasks import (
+ TASK_ADD,
+ TASK_ECHO,
+ TASK_ERROR,
+ TASK_MULTIPING,
+ TASK_PING,
+)
from swh.scheduler.utils import create_task_dict
+# Queues to subscribe. Due to the rerouting of high priority tasks, this module requires
+# to declare all queues/task names
+TEST_QUEUES = [
+ "celery",
+ TASK_ECHO,
+ TASK_ERROR,
+ TASK_PING,
+ TASK_ADD,
+ TASK_MULTIPING,
+ # and the high priority queue
+ f"save_code_now:{TASK_ADD}",
+]
+
+
+@pytest.fixture(scope="session")
+def swh_scheduler_celery_app(swh_scheduler_celery_app):
+ swh_scheduler_celery_app.add_defaults(
+ {
+ "task_queues": [
+ Queue(queue, Exchange(queue), routing_key=queue)
+ for queue in TEST_QUEUES
+ ],
+ }
+ )
+ return swh_scheduler_celery_app
+
def test_ping(swh_scheduler_celery_app, swh_scheduler_celery_worker):
- res = swh_scheduler_celery_app.send_task("swh.scheduler.tests.tasks.ping")
+ res = swh_scheduler_celery_app.send_task(TASK_PING)
assert res
res.wait()
assert res.successful()
@@ -24,9 +58,7 @@
def test_ping_with_kw(swh_scheduler_celery_app, swh_scheduler_celery_worker):
- res = swh_scheduler_celery_app.send_task(
- "swh.scheduler.tests.tasks.ping", kwargs={"a": 1}
- )
+ res = swh_scheduler_celery_app.send_task(TASK_PING, kwargs={"a": 1})
assert res
res.wait()
assert res.successful()
@@ -35,9 +67,7 @@
def test_multiping(swh_scheduler_celery_app, swh_scheduler_celery_worker):
"Test that a task that spawns subtasks (group) works"
- res = swh_scheduler_celery_app.send_task(
- "swh.scheduler.tests.tasks.multiping", kwargs={"n": 5}
- )
+ res = swh_scheduler_celery_app.send_task(TASK_MULTIPING, kwargs={"n": 5})
assert res
res.wait()
@@ -66,7 +96,7 @@
task_type = swh_scheduler.get_task_type("swh-test-ping")
assert task_type
- assert task_type["backend_name"] == "swh.scheduler.tests.tasks.ping"
+ assert task_type["backend_name"] == TASK_PING
swh_scheduler.create_tasks([create_task_dict("swh-test-ping", "oneshot")])
@@ -81,7 +111,7 @@
swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler
):
"""Ensure scheduler runner schedules tasks ready for scheduling"""
- task_type_name, backend_name = "swh-test-add", "swh.scheduler.tests.tasks.add"
+ task_type_name, backend_name = "swh-test-add", TASK_ADD
task_type = swh_scheduler.get_task_type(task_type_name)
assert task_type
assert task_type["backend_name"] == backend_name
@@ -114,8 +144,8 @@
assert task["status"] == "next_run_scheduled"
assert task["id"] in task_ids
+ # Ensure each task is indeed scheduled to the queue backend
for i, (_, args) in enumerate(task_inputs):
- # take one of the task and read it from the queue backend
task = backend_tasks[i]
value = AsyncResult(id=task["backend_id"]).get()
assert value == sum(args)
@@ -125,7 +155,7 @@
swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler
):
"""Ensure scheduler runner schedules priority tasks ready for scheduling"""
- task_type_name, backend_name = "swh-test-add", "swh.scheduler.tests.tasks.add"
+ task_type_name, backend_name = "swh-test-add", TASK_ADD
task_type = swh_scheduler.get_task_type(task_type_name)
assert task_type
assert task_type["backend_name"] == backend_name
@@ -158,12 +188,11 @@
assert task["status"] == "next_run_scheduled"
assert task["id"] in task_ids
- # TODO: Make the worker consume those messages so this can go green
- # for i, (_, args, _) in enumerate(task_inputs):
- # # take one of the task and read it from the queue backend
- # task = backend_tasks[i]
- # value = AsyncResult(id=task["backend_id"]).get()
- # assert value == sum(args)
+ # Ensure each priority task is indeed scheduled to the queue backend
+ for i, (_, args, _) in enumerate(task_inputs):
+ task = backend_tasks[i]
+ value = AsyncResult(id=task["backend_id"]).get()
+ assert value == sum(args)
def test_task_exception(
@@ -171,7 +200,7 @@
):
task_type = swh_scheduler.get_task_type("swh-test-error")
assert task_type
- assert task_type["backend_name"] == "swh.scheduler.tests.tasks.error"
+ assert task_type["backend_name"] == TASK_ERROR
swh_scheduler.create_tasks([create_task_dict("swh-test-error", "oneshot")])
@@ -188,7 +217,7 @@
m = mocker.patch("swh.scheduler.task.Statsd._send_to_server")
mocker.patch("swh.scheduler.task.ts", side_effect=count())
mocker.patch("swh.core.statsd.monotonic", side_effect=count())
- res = swh_scheduler_celery_app.send_task("swh.scheduler.tests.tasks.echo")
+ res = swh_scheduler_celery_app.send_task(TASK_ECHO)
assert res
res.wait()
assert res.successful()
@@ -223,9 +252,7 @@
m = mocker.patch("swh.scheduler.task.Statsd._send_to_server")
mocker.patch("swh.scheduler.task.ts", side_effect=count())
mocker.patch("swh.core.statsd.monotonic", side_effect=count())
- res = swh_scheduler_celery_app.send_task(
- "swh.scheduler.tests.tasks.echo", kwargs={"status": "eventful"}
- )
+ res = swh_scheduler_celery_app.send_task(TASK_ECHO, kwargs={"status": "eventful"})
assert res
res.wait()
assert res.successful()
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sun, Aug 17, 11:15 PM (2 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3216141
Attached To
D5535: tests: Complete checks on message with priority consumption
Event Timeline
Log In to Comment