Page MenuHomeSoftware Heritage

D5535.id19766.diff
No OneTemporary

D5535.id19766.diff

diff --git a/swh/scheduler/tests/tasks.py b/swh/scheduler/tests/tasks.py
--- a/swh/scheduler/tests/tasks.py
+++ b/swh/scheduler/tests/tasks.py
@@ -1,12 +1,18 @@
-# Copyright (C) 2018-2019 The Software Heritage developers
+# Copyright (C) 2018-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from celery import group, shared_task
+TASK_PING = "swh.scheduler.tests.tasks.ping"
+TASK_MULTIPING = "swh.scheduler.tests.tasks.multiping"
+TASK_ERROR = "swh.scheduler.tests.tasks.error"
+TASK_ADD = "swh.scheduler.tests.tasks.add"
+TASK_ECHO = "swh.scheduler.tests.tasks.echo"
-@shared_task(name="swh.scheduler.tests.tasks.ping", bind=True)
+
+@shared_task(name=TASK_PING, bind=True)
def ping(self, **kw):
# check this is a SWHTask
assert hasattr(self, "log")
@@ -18,7 +24,7 @@
return "OK"
-@shared_task(name="swh.scheduler.tests.tasks.multiping", bind=True)
+@shared_task(name=TASK_MULTIPING, bind=True)
def multiping(self, n=10):
promise = group(ping.s(i=i) for i in range(n))()
self.log.debug("%s OK (spawned %s subtasks)" % (self.name, n))
@@ -26,17 +32,17 @@
return promise.id
-@shared_task(name="swh.scheduler.tests.tasks.error")
+@shared_task(name=TASK_ERROR)
def not_implemented():
raise NotImplementedError("Nope")
-@shared_task(name="swh.scheduler.tests.tasks.add")
+@shared_task(name=TASK_ADD)
def add(x, y):
return x + y
-@shared_task(name="swh.scheduler.tests.tasks.echo")
+@shared_task(name=TASK_ECHO)
def echo(**kw):
"Does nothing, just return the given kwargs dict"
return kw
diff --git a/swh/scheduler/tests/test_celery_tasks.py b/swh/scheduler/tests/test_celery_tasks.py
--- a/swh/scheduler/tests/test_celery_tasks.py
+++ b/swh/scheduler/tests/test_celery_tasks.py
@@ -9,14 +9,48 @@
from time import sleep
from celery.result import AsyncResult, GroupResult
+from kombu import Exchange, Queue
import pytest
from swh.scheduler.celery_backend.runner import run_ready_tasks
+from swh.scheduler.tests.tasks import (
+ TASK_ADD,
+ TASK_ECHO,
+ TASK_ERROR,
+ TASK_MULTIPING,
+ TASK_PING,
+)
from swh.scheduler.utils import create_task_dict
+# Queues to subscribe. Due to the rerouting of high priority tasks, this module requires
+# to declare all queues/task names
+TEST_QUEUES = [
+ "celery",
+ TASK_ECHO,
+ TASK_ERROR,
+ TASK_PING,
+ TASK_ADD,
+ TASK_MULTIPING,
+ # and the high priority queue
+ f"save_code_now:{TASK_ADD}",
+]
+
+
+@pytest.fixture(scope="session")
+def swh_scheduler_celery_app(swh_scheduler_celery_app):
+ swh_scheduler_celery_app.add_defaults(
+ {
+ "task_queues": [
+ Queue(queue, Exchange(queue), routing_key=queue)
+ for queue in TEST_QUEUES
+ ],
+ }
+ )
+ return swh_scheduler_celery_app
+
def test_ping(swh_scheduler_celery_app, swh_scheduler_celery_worker):
- res = swh_scheduler_celery_app.send_task("swh.scheduler.tests.tasks.ping")
+ res = swh_scheduler_celery_app.send_task(TASK_PING)
assert res
res.wait()
assert res.successful()
@@ -24,9 +58,7 @@
def test_ping_with_kw(swh_scheduler_celery_app, swh_scheduler_celery_worker):
- res = swh_scheduler_celery_app.send_task(
- "swh.scheduler.tests.tasks.ping", kwargs={"a": 1}
- )
+ res = swh_scheduler_celery_app.send_task(TASK_PING, kwargs={"a": 1})
assert res
res.wait()
assert res.successful()
@@ -35,9 +67,7 @@
def test_multiping(swh_scheduler_celery_app, swh_scheduler_celery_worker):
"Test that a task that spawns subtasks (group) works"
- res = swh_scheduler_celery_app.send_task(
- "swh.scheduler.tests.tasks.multiping", kwargs={"n": 5}
- )
+ res = swh_scheduler_celery_app.send_task(TASK_MULTIPING, kwargs={"n": 5})
assert res
res.wait()
@@ -66,7 +96,7 @@
task_type = swh_scheduler.get_task_type("swh-test-ping")
assert task_type
- assert task_type["backend_name"] == "swh.scheduler.tests.tasks.ping"
+ assert task_type["backend_name"] == TASK_PING
swh_scheduler.create_tasks([create_task_dict("swh-test-ping", "oneshot")])
@@ -81,7 +111,7 @@
swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler
):
"""Ensure scheduler runner schedules tasks ready for scheduling"""
- task_type_name, backend_name = "swh-test-add", "swh.scheduler.tests.tasks.add"
+ task_type_name, backend_name = "swh-test-add", TASK_ADD
task_type = swh_scheduler.get_task_type(task_type_name)
assert task_type
assert task_type["backend_name"] == backend_name
@@ -114,8 +144,8 @@
assert task["status"] == "next_run_scheduled"
assert task["id"] in task_ids
+ # Ensure each task is indeed scheduled to the queue backend
for i, (_, args) in enumerate(task_inputs):
- # take one of the task and read it from the queue backend
task = backend_tasks[i]
value = AsyncResult(id=task["backend_id"]).get()
assert value == sum(args)
@@ -125,7 +155,7 @@
swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler
):
"""Ensure scheduler runner schedules priority tasks ready for scheduling"""
- task_type_name, backend_name = "swh-test-add", "swh.scheduler.tests.tasks.add"
+ task_type_name, backend_name = "swh-test-add", TASK_ADD
task_type = swh_scheduler.get_task_type(task_type_name)
assert task_type
assert task_type["backend_name"] == backend_name
@@ -158,12 +188,11 @@
assert task["status"] == "next_run_scheduled"
assert task["id"] in task_ids
- # TODO: Make the worker consume those messages so this can go green
- # for i, (_, args, _) in enumerate(task_inputs):
- # # take one of the task and read it from the queue backend
- # task = backend_tasks[i]
- # value = AsyncResult(id=task["backend_id"]).get()
- # assert value == sum(args)
+ # Ensure each priority task is indeed scheduled to the queue backend
+ for i, (_, args, _) in enumerate(task_inputs):
+ task = backend_tasks[i]
+ value = AsyncResult(id=task["backend_id"]).get()
+ assert value == sum(args)
def test_task_exception(
@@ -171,7 +200,7 @@
):
task_type = swh_scheduler.get_task_type("swh-test-error")
assert task_type
- assert task_type["backend_name"] == "swh.scheduler.tests.tasks.error"
+ assert task_type["backend_name"] == TASK_ERROR
swh_scheduler.create_tasks([create_task_dict("swh-test-error", "oneshot")])
@@ -188,7 +217,7 @@
m = mocker.patch("swh.scheduler.task.Statsd._send_to_server")
mocker.patch("swh.scheduler.task.ts", side_effect=count())
mocker.patch("swh.core.statsd.monotonic", side_effect=count())
- res = swh_scheduler_celery_app.send_task("swh.scheduler.tests.tasks.echo")
+ res = swh_scheduler_celery_app.send_task(TASK_ECHO)
assert res
res.wait()
assert res.successful()
@@ -223,9 +252,7 @@
m = mocker.patch("swh.scheduler.task.Statsd._send_to_server")
mocker.patch("swh.scheduler.task.ts", side_effect=count())
mocker.patch("swh.core.statsd.monotonic", side_effect=count())
- res = swh_scheduler_celery_app.send_task(
- "swh.scheduler.tests.tasks.echo", kwargs={"status": "eventful"}
- )
+ res = swh_scheduler_celery_app.send_task(TASK_ECHO, kwargs={"status": "eventful"})
assert res
res.wait()
assert res.successful()

File Metadata

Mime Type
text/plain
Expires
Sun, Aug 17, 11:15 PM (2 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3216141

Event Timeline