diff --git a/swh/scheduler/tests/tasks.py b/swh/scheduler/tests/tasks.py --- a/swh/scheduler/tests/tasks.py +++ b/swh/scheduler/tests/tasks.py @@ -1,12 +1,18 @@ -# Copyright (C) 2018-2019 The Software Heritage developers +# Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import group, shared_task +TASK_PING = "swh.scheduler.tests.tasks.ping" +TASK_MULTIPING = "swh.scheduler.tests.tasks.multiping" +TASK_ERROR = "swh.scheduler.tests.tasks.error" +TASK_ADD = "swh.scheduler.tests.tasks.add" +TASK_ECHO = "swh.scheduler.tests.tasks.echo" -@shared_task(name="swh.scheduler.tests.tasks.ping", bind=True) + +@shared_task(name=TASK_PING, bind=True) def ping(self, **kw): # check this is a SWHTask assert hasattr(self, "log") @@ -18,7 +24,7 @@ return "OK" -@shared_task(name="swh.scheduler.tests.tasks.multiping", bind=True) +@shared_task(name=TASK_MULTIPING, bind=True) def multiping(self, n=10): promise = group(ping.s(i=i) for i in range(n))() self.log.debug("%s OK (spawned %s subtasks)" % (self.name, n)) @@ -26,17 +32,17 @@ return promise.id -@shared_task(name="swh.scheduler.tests.tasks.error") +@shared_task(name=TASK_ERROR) def not_implemented(): raise NotImplementedError("Nope") -@shared_task(name="swh.scheduler.tests.tasks.add") +@shared_task(name=TASK_ADD) def add(x, y): return x + y -@shared_task(name="swh.scheduler.tests.tasks.echo") +@shared_task(name=TASK_ECHO) def echo(**kw): "Does nothing, just return the given kwargs dict" return kw diff --git a/swh/scheduler/tests/test_celery_tasks.py b/swh/scheduler/tests/test_celery_tasks.py --- a/swh/scheduler/tests/test_celery_tasks.py +++ b/swh/scheduler/tests/test_celery_tasks.py @@ -9,14 +9,48 @@ from time import sleep from celery.result import AsyncResult, GroupResult +from kombu import Exchange, Queue import pytest from swh.scheduler.celery_backend.runner import run_ready_tasks +from swh.scheduler.tests.tasks import ( + TASK_ADD, + TASK_ECHO, + TASK_ERROR, + TASK_MULTIPING, + TASK_PING, +) from swh.scheduler.utils import create_task_dict +# Queues to subscribe. Due to the rerouting of high priority tasks, this module requires +# to declare all queues/task names +TEST_QUEUES = [ + "celery", + TASK_ECHO, + TASK_ERROR, + TASK_PING, + TASK_ADD, + TASK_MULTIPING, + # and the high priority queue + f"save_code_now:{TASK_ADD}", +] + + +@pytest.fixture(scope="session") +def swh_scheduler_celery_app(swh_scheduler_celery_app): + swh_scheduler_celery_app.add_defaults( + { + "task_queues": [ + Queue(queue, Exchange(queue), routing_key=queue) + for queue in TEST_QUEUES + ], + } + ) + return swh_scheduler_celery_app + def test_ping(swh_scheduler_celery_app, swh_scheduler_celery_worker): - res = swh_scheduler_celery_app.send_task("swh.scheduler.tests.tasks.ping") + res = swh_scheduler_celery_app.send_task(TASK_PING) assert res res.wait() assert res.successful() @@ -24,9 +58,7 @@ def test_ping_with_kw(swh_scheduler_celery_app, swh_scheduler_celery_worker): - res = swh_scheduler_celery_app.send_task( - "swh.scheduler.tests.tasks.ping", kwargs={"a": 1} - ) + res = swh_scheduler_celery_app.send_task(TASK_PING, kwargs={"a": 1}) assert res res.wait() assert res.successful() @@ -35,9 +67,7 @@ def test_multiping(swh_scheduler_celery_app, swh_scheduler_celery_worker): "Test that a task that spawns subtasks (group) works" - res = swh_scheduler_celery_app.send_task( - "swh.scheduler.tests.tasks.multiping", kwargs={"n": 5} - ) + res = swh_scheduler_celery_app.send_task(TASK_MULTIPING, kwargs={"n": 5}) assert res res.wait() @@ -66,7 +96,7 @@ task_type = swh_scheduler.get_task_type("swh-test-ping") assert task_type - assert task_type["backend_name"] == "swh.scheduler.tests.tasks.ping" + assert task_type["backend_name"] == TASK_PING swh_scheduler.create_tasks([create_task_dict("swh-test-ping", "oneshot")]) @@ -81,7 +111,7 @@ swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler ): """Ensure scheduler runner schedules tasks ready for scheduling""" - task_type_name, backend_name = "swh-test-add", "swh.scheduler.tests.tasks.add" + task_type_name, backend_name = "swh-test-add", TASK_ADD task_type = swh_scheduler.get_task_type(task_type_name) assert task_type assert task_type["backend_name"] == backend_name @@ -114,8 +144,8 @@ assert task["status"] == "next_run_scheduled" assert task["id"] in task_ids + # Ensure each task is indeed scheduled to the queue backend for i, (_, args) in enumerate(task_inputs): - # take one of the task and read it from the queue backend task = backend_tasks[i] value = AsyncResult(id=task["backend_id"]).get() assert value == sum(args) @@ -125,7 +155,7 @@ swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler ): """Ensure scheduler runner schedules priority tasks ready for scheduling""" - task_type_name, backend_name = "swh-test-add", "swh.scheduler.tests.tasks.add" + task_type_name, backend_name = "swh-test-add", TASK_ADD task_type = swh_scheduler.get_task_type(task_type_name) assert task_type assert task_type["backend_name"] == backend_name @@ -158,12 +188,11 @@ assert task["status"] == "next_run_scheduled" assert task["id"] in task_ids - # TODO: Make the worker consume those messages so this can go green - # for i, (_, args, _) in enumerate(task_inputs): - # # take one of the task and read it from the queue backend - # task = backend_tasks[i] - # value = AsyncResult(id=task["backend_id"]).get() - # assert value == sum(args) + # Ensure each priority task is indeed scheduled to the queue backend + for i, (_, args, _) in enumerate(task_inputs): + task = backend_tasks[i] + value = AsyncResult(id=task["backend_id"]).get() + assert value == sum(args) def test_task_exception( @@ -171,7 +200,7 @@ ): task_type = swh_scheduler.get_task_type("swh-test-error") assert task_type - assert task_type["backend_name"] == "swh.scheduler.tests.tasks.error" + assert task_type["backend_name"] == TASK_ERROR swh_scheduler.create_tasks([create_task_dict("swh-test-error", "oneshot")]) @@ -188,7 +217,7 @@ m = mocker.patch("swh.scheduler.task.Statsd._send_to_server") mocker.patch("swh.scheduler.task.ts", side_effect=count()) mocker.patch("swh.core.statsd.monotonic", side_effect=count()) - res = swh_scheduler_celery_app.send_task("swh.scheduler.tests.tasks.echo") + res = swh_scheduler_celery_app.send_task(TASK_ECHO) assert res res.wait() assert res.successful() @@ -223,9 +252,7 @@ m = mocker.patch("swh.scheduler.task.Statsd._send_to_server") mocker.patch("swh.scheduler.task.ts", side_effect=count()) mocker.patch("swh.core.statsd.monotonic", side_effect=count()) - res = swh_scheduler_celery_app.send_task( - "swh.scheduler.tests.tasks.echo", kwargs={"status": "eventful"} - ) + res = swh_scheduler_celery_app.send_task(TASK_ECHO, kwargs={"status": "eventful"}) assert res res.wait() assert res.successful()