diff --git a/swh/scheduler/tests/tasks.py b/swh/scheduler/tests/tasks.py index 8751e2c..7ee5cc6 100644 --- a/swh/scheduler/tests/tasks.py +++ b/swh/scheduler/tests/tasks.py @@ -1,42 +1,48 @@ -# Copyright (C) 2018-2019 The Software Heritage developers +# Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import group, shared_task +TASK_PING = "swh.scheduler.tests.tasks.ping" +TASK_MULTIPING = "swh.scheduler.tests.tasks.multiping" +TASK_ERROR = "swh.scheduler.tests.tasks.error" +TASK_ADD = "swh.scheduler.tests.tasks.add" +TASK_ECHO = "swh.scheduler.tests.tasks.echo" -@shared_task(name="swh.scheduler.tests.tasks.ping", bind=True) + +@shared_task(name=TASK_PING, bind=True) def ping(self, **kw): # check this is a SWHTask assert hasattr(self, "log") assert not hasattr(self, "run_task") assert "SWHTask" in [x.__name__ for x in self.__class__.__mro__] self.log.debug(self.name) if kw: return "OK (kw=%s)" % kw return "OK" -@shared_task(name="swh.scheduler.tests.tasks.multiping", bind=True) +@shared_task(name=TASK_MULTIPING, bind=True) def multiping(self, n=10): promise = group(ping.s(i=i) for i in range(n))() self.log.debug("%s OK (spawned %s subtasks)" % (self.name, n)) promise.save() return promise.id -@shared_task(name="swh.scheduler.tests.tasks.error") +@shared_task(name=TASK_ERROR) def not_implemented(): raise NotImplementedError("Nope") -@shared_task(name="swh.scheduler.tests.tasks.add") +@shared_task(name=TASK_ADD) def add(x, y): return x + y -@shared_task(name="swh.scheduler.tests.tasks.echo") +@shared_task(name=TASK_ECHO) def echo(**kw): "Does nothing, just return the given kwargs dict" return kw diff --git a/swh/scheduler/tests/test_celery_tasks.py b/swh/scheduler/tests/test_celery_tasks.py index 1a20f31..1458e62 100644 --- a/swh/scheduler/tests/test_celery_tasks.py +++ b/swh/scheduler/tests/test_celery_tasks.py @@ -1,254 +1,281 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Module in charge of testing the scheduler runner module""" from itertools import count from time import sleep from celery.result import AsyncResult, GroupResult +from kombu import Exchange, Queue import pytest from swh.scheduler.celery_backend.runner import run_ready_tasks +from swh.scheduler.tests.tasks import ( + TASK_ADD, + TASK_ECHO, + TASK_ERROR, + TASK_MULTIPING, + TASK_PING, +) from swh.scheduler.utils import create_task_dict +# Queues to subscribe. Due to the rerouting of high priority tasks, this module requires +# to declare all queues/task names +TEST_QUEUES = [ + "celery", + TASK_ECHO, + TASK_ERROR, + TASK_PING, + TASK_ADD, + TASK_MULTIPING, + # and the high priority queue + f"save_code_now:{TASK_ADD}", +] + + +@pytest.fixture(scope="session") +def swh_scheduler_celery_app(swh_scheduler_celery_app): + swh_scheduler_celery_app.add_defaults( + { + "task_queues": [ + Queue(queue, Exchange(queue), routing_key=queue) + for queue in TEST_QUEUES + ], + } + ) + return swh_scheduler_celery_app + def test_ping(swh_scheduler_celery_app, swh_scheduler_celery_worker): - res = swh_scheduler_celery_app.send_task("swh.scheduler.tests.tasks.ping") + res = swh_scheduler_celery_app.send_task(TASK_PING) assert res res.wait() assert res.successful() assert res.result == "OK" def test_ping_with_kw(swh_scheduler_celery_app, swh_scheduler_celery_worker): - res = swh_scheduler_celery_app.send_task( - "swh.scheduler.tests.tasks.ping", kwargs={"a": 1} - ) + res = swh_scheduler_celery_app.send_task(TASK_PING, kwargs={"a": 1}) assert res res.wait() assert res.successful() assert res.result == "OK (kw={'a': 1})" def test_multiping(swh_scheduler_celery_app, swh_scheduler_celery_worker): "Test that a task that spawns subtasks (group) works" - res = swh_scheduler_celery_app.send_task( - "swh.scheduler.tests.tasks.multiping", kwargs={"n": 5} - ) + res = swh_scheduler_celery_app.send_task(TASK_MULTIPING, kwargs={"n": 5}) assert res res.wait() assert res.successful() # retrieve the GroupResult for this task and wait for all the subtasks # to complete promise_id = res.result assert promise_id promise = GroupResult.restore(promise_id, app=swh_scheduler_celery_app) for i in range(5): if promise.ready(): break sleep(1) results = [x.get() for x in promise.results] assert len(results) == 5 for i in range(5): assert ("OK (kw={'i': %s})" % i) in results def test_scheduler_fixture( swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler ): "Test that the scheduler fixture works properly" task_type = swh_scheduler.get_task_type("swh-test-ping") assert task_type - assert task_type["backend_name"] == "swh.scheduler.tests.tasks.ping" + assert task_type["backend_name"] == TASK_PING swh_scheduler.create_tasks([create_task_dict("swh-test-ping", "oneshot")]) backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) assert backend_tasks for task in backend_tasks: # Make sure the task completed AsyncResult(id=task["backend_id"]).get() def test_run_ready_task_standard( swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler ): """Ensure scheduler runner schedules tasks ready for scheduling""" - task_type_name, backend_name = "swh-test-add", "swh.scheduler.tests.tasks.add" + task_type_name, backend_name = "swh-test-add", TASK_ADD task_type = swh_scheduler.get_task_type(task_type_name) assert task_type assert task_type["backend_name"] == backend_name task_inputs = [ ("oneshot", (12, 30)), ("oneshot", (20, 10)), ("recurring", (30, 10)), ] tasks = swh_scheduler.create_tasks( create_task_dict(task_type_name, policy, *args) for (policy, args) in task_inputs ) assert len(tasks) == len(task_inputs) task_ids = set() for task in tasks: assert task["status"] == "next_run_not_scheduled" assert task["priority"] is None task_ids.add(task["id"]) backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) assert len(backend_tasks) == len(tasks) scheduled_tasks = swh_scheduler.search_tasks(task_type=task_type_name) assert len(scheduled_tasks) == len(tasks) for task in scheduled_tasks: assert task["status"] == "next_run_scheduled" assert task["id"] in task_ids + # Ensure each task is indeed scheduled to the queue backend for i, (_, args) in enumerate(task_inputs): - # take one of the task and read it from the queue backend task = backend_tasks[i] value = AsyncResult(id=task["backend_id"]).get() assert value == sum(args) def test_run_ready_task_with_priority( swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler ): """Ensure scheduler runner schedules priority tasks ready for scheduling""" - task_type_name, backend_name = "swh-test-add", "swh.scheduler.tests.tasks.add" + task_type_name, backend_name = "swh-test-add", TASK_ADD task_type = swh_scheduler.get_task_type(task_type_name) assert task_type assert task_type["backend_name"] == backend_name task_inputs = [ ("oneshot", (10, 22), "low"), ("oneshot", (20, 10), "normal"), ("recurring", (30, 10), "high"), ] tasks = swh_scheduler.create_tasks( create_task_dict(task_type_name, policy, *args, priority=priority) for (policy, args, priority) in task_inputs ) assert len(tasks) == len(task_inputs) task_ids = set() for task in tasks: assert task["status"] == "next_run_not_scheduled" assert task["priority"] is not None task_ids.add(task["id"]) backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) assert len(backend_tasks) == len(tasks) scheduled_tasks = swh_scheduler.search_tasks(task_type=task_type_name) assert len(scheduled_tasks) == len(tasks) for task in scheduled_tasks: assert task["status"] == "next_run_scheduled" assert task["id"] in task_ids - # TODO: Make the worker consume those messages so this can go green - # for i, (_, args, _) in enumerate(task_inputs): - # # take one of the task and read it from the queue backend - # task = backend_tasks[i] - # value = AsyncResult(id=task["backend_id"]).get() - # assert value == sum(args) + # Ensure each priority task is indeed scheduled to the queue backend + for i, (_, args, _) in enumerate(task_inputs): + task = backend_tasks[i] + value = AsyncResult(id=task["backend_id"]).get() + assert value == sum(args) def test_task_exception( swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler ): task_type = swh_scheduler.get_task_type("swh-test-error") assert task_type - assert task_type["backend_name"] == "swh.scheduler.tests.tasks.error" + assert task_type["backend_name"] == TASK_ERROR swh_scheduler.create_tasks([create_task_dict("swh-test-error", "oneshot")]) backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) assert len(backend_tasks) == 1 task = backend_tasks[0] result = AsyncResult(id=task["backend_id"]) with pytest.raises(NotImplementedError): result.get() def test_statsd(swh_scheduler_celery_app, swh_scheduler_celery_worker, mocker): m = mocker.patch("swh.scheduler.task.Statsd._send_to_server") mocker.patch("swh.scheduler.task.ts", side_effect=count()) mocker.patch("swh.core.statsd.monotonic", side_effect=count()) - res = swh_scheduler_celery_app.send_task("swh.scheduler.tests.tasks.echo") + res = swh_scheduler_celery_app.send_task(TASK_ECHO) assert res res.wait() assert res.successful() assert res.result == {} m.assert_any_call( "swh_task_called_count:1|c|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_start_ts:0|g|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_end_ts:1|g|" "#status:uneventful,task:swh.scheduler.tests.tasks.echo," "worker:unknown worker" ) m.assert_any_call( "swh_task_duration_seconds:1000|ms|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_success_count:1|c|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) def test_statsd_with_status( swh_scheduler_celery_app, swh_scheduler_celery_worker, mocker ): m = mocker.patch("swh.scheduler.task.Statsd._send_to_server") mocker.patch("swh.scheduler.task.ts", side_effect=count()) mocker.patch("swh.core.statsd.monotonic", side_effect=count()) - res = swh_scheduler_celery_app.send_task( - "swh.scheduler.tests.tasks.echo", kwargs={"status": "eventful"} - ) + res = swh_scheduler_celery_app.send_task(TASK_ECHO, kwargs={"status": "eventful"}) assert res res.wait() assert res.successful() assert res.result == {"status": "eventful"} m.assert_any_call( "swh_task_called_count:1|c|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_start_ts:0|g|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_end_ts:1|g|" "#status:eventful,task:swh.scheduler.tests.tasks.echo," "worker:unknown worker" ) m.assert_any_call( "swh_task_duration_seconds:1000|ms|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_success_count:1|c|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" )