diff --git a/swh/scheduler/api/server.py b/swh/scheduler/api/server.py --- a/swh/scheduler/api/server.py +++ b/swh/scheduler/api/server.py @@ -4,7 +4,6 @@ # See top-level LICENSE file for more information import logging -import click from flask import request @@ -140,18 +139,5 @@ return app(environ, start_response) -@click.command() -@click.argument('config-path', required=1) -@click.option('--host', default='0.0.0.0', - help="Host to run the scheduler server api") -@click.option('--port', default=5008, type=click.INT, - help="Binding port of the server") -@click.option('--debug/--nodebug', default=True, - help="Indicates if the server should run in debug mode") -def launch(config_path, host, port, debug): - app.config.update(config.read(config_path, DEFAULT_CONFIG)) - app.run(host, port=port, debug=bool(debug)) - - if __name__ == '__main__': - launch() + print('Please use the "swh-scheduler api-server" command') diff --git a/swh/scheduler/celery_backend/config.py b/swh/scheduler/celery_backend/config.py --- a/swh/scheduler/celery_backend/config.py +++ b/swh/scheduler/celery_backend/config.py @@ -254,7 +254,8 @@ ) # Instantiate the Celery app -app = Celery(broker=CONFIG['task_broker']) +app = Celery(broker=CONFIG['task_broker'], + task_cls='swh.scheduler.task:SWHTask') app.add_defaults(CELERY_DEFAULT_CONFIG) # XXX for BW compat diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/tests/conftest.py @@ -0,0 +1,30 @@ +import pytest + + +@pytest.fixture(scope='session') +def celery_enable_logging(): + return True + + +@pytest.fixture(scope='session') +def celery_includes(): + return [ + 'swh.scheduler.tests.tasks', + ] + + +@pytest.fixture(scope='session') +def celery_parameters(): + return { + 'task_cls': 'swh.scheduler.task:SWHTask', + } + + +# override the celery_session_app fixture to monkeypatch the 'main' +# swh.scheduler.celery_backend.config.app Celery application +# with the test application. +@pytest.fixture(scope='session') +def swh_app(celery_session_app): + import swh.scheduler.celery_backend.config + swh.scheduler.celery_backend.config.app = celery_session_app + yield celery_session_app diff --git a/swh/scheduler/tests/tasks.py b/swh/scheduler/tests/tasks.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/tests/tasks.py @@ -0,0 +1,27 @@ +from celery import group + +from swh.scheduler.celery_backend.config import app + + +@app.task(name='swh.scheduler.tests.tasks.ping', + bind=True) +def ping(self, **kw): + # check this is a SWHTask + assert hasattr(self, 'log') + assert not hasattr(self, 'run_task') + assert 'SWHTask' in [x.__name__ for x in self.__class__.__mro__] + self.log.debug(self.name) + if kw: + return 'OK (kw=%s)' % kw + return 'OK' + + +@app.task(name='swh.scheduler.tests.tasks.multiping', + bind=True) +def multiping(self, n=10): + self.log.debug(self.name) + + promise = group(ping.s(i=i) for i in range(n))() + self.log.debug('%s OK (spawned %s subtasks)' % (self.name, n)) + promise.save() + return promise.id diff --git a/swh/scheduler/tests/test_celery_tasks.py b/swh/scheduler/tests/test_celery_tasks.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/tests/test_celery_tasks.py @@ -0,0 +1,35 @@ +from time import sleep +from celery.result import GroupResult + + +def test_ping(swh_app, celery_session_worker): + res = swh_app.send_task( + 'swh.scheduler.tests.tasks.ping') + assert res + res.wait() + assert res.successful() + assert res.result == 'OK' + + +def test_multiping(swh_app, celery_session_worker): + "Test that a task that spawns subtasks (group) works" + res = swh_app.send_task( + 'swh.scheduler.tests.tasks.multiping', n=5) + assert res + + res.wait() + assert res.successful() + + # retrieve the GroupResult for this task and wait for all the subtasks + # to complete + promise_id = res.result + assert promise_id + promise = GroupResult.restore(promise_id, app=swh_app) + for i in range(5): + if promise.ready(): + break + sleep(1) + + results = [x.get() for x in promise.results] + for i in range(5): + assert ("OK (kw={'i': %s})" % i) in results