diff --git a/swh/scheduler/celery_backend/config.py b/swh/scheduler/celery_backend/config.py --- a/swh/scheduler/celery_backend/config.py +++ b/swh/scheduler/celery_backend/config.py @@ -254,7 +254,8 @@ ) # Instantiate the Celery app -app = Celery(broker=CONFIG['task_broker']) +app = Celery(broker=CONFIG['task_broker'], + task_cls='swh.scheduler.task:SWHTask') app.add_defaults(CELERY_DEFAULT_CONFIG) # XXX for BW compat diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/tests/conftest.py @@ -0,0 +1,30 @@ +import pytest + + +@pytest.fixture(scope='session') +def celery_enable_logging(): + return True + + +@pytest.fixture(scope='session') +def celery_includes(): + return [ + 'swh.scheduler.tests.tasks', + ] + + +@pytest.fixture(scope='session') +def celery_parameters(): + return { + 'task_cls': 'swh.scheduler.task:SWHTask', + } + + +# override the celery_session_app fixture to monkeypatch the 'main' +# swh.scheduler.celery_backend.config.app Celery application +# with the test application. +@pytest.fixture(scope='session') +def swh_app(celery_session_app): + import swh.scheduler.celery_backend.config + swh.scheduler.celery_backend.config.app = celery_session_app + yield celery_session_app diff --git a/swh/scheduler/tests/tasks.py b/swh/scheduler/tests/tasks.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/tests/tasks.py @@ -0,0 +1,27 @@ +from celery import group + +from swh.scheduler.celery_backend.config import app + + +@app.task(name='swh.scheduler.tests.tasks.ping', + bind=True) +def ping(self, **kw): + # check this is a SWHTask + assert hasattr(self, 'log') + assert not hasattr(self, 'run_task') + assert 'SWHTask' in [x.__name__ for x in self.__class__.__mro__] + self.log.debug(self.name) + if kw: + return 'OK (kw=%s)' % kw + return 'OK' + + +@app.task(name='swh.scheduler.tests.tasks.multiping', + bind=True) +def multiping(self, n=10): + self.log.debug(self.name) + + promise = group(ping.s(i=i) for i in range(n))() + self.log.debug('%s OK (spawned %s subtasks)' % (self.name, n)) + promise.save() + return promise.id diff --git a/swh/scheduler/tests/test_celery_tasks.py b/swh/scheduler/tests/test_celery_tasks.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/tests/test_celery_tasks.py @@ -0,0 +1,35 @@ +from time import sleep +from celery.result import GroupResult + + +def test_ping(swh_app, celery_session_worker): + res = swh_app.send_task( + 'swh.scheduler.tests.tasks.ping') + assert res + res.wait() + assert res.successful() + assert res.result == 'OK' + + +def test_multiping(swh_app, celery_session_worker): + "Test that a task that spawns subtasks (group) works" + res = swh_app.send_task( + 'swh.scheduler.tests.tasks.multiping', n=5) + assert res + + res.wait() + assert res.successful() + + # retrieve the GroupResult for this task and wait for all the subtasks + # to complete + promise_id = res.result + assert promise_id + promise = GroupResult.restore(promise_id, app=swh_app) + for i in range(5): + if promise.ready(): + break + sleep(1) + + results = [x.get() for x in promise.results] + for i in range(5): + assert ("OK (kw={'i': %s})" % i) in results