diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py index e4e9e05..608277a 100644 --- a/swh/scheduler/tests/conftest.py +++ b/swh/scheduler/tests/conftest.py @@ -1,100 +1,105 @@ +# Copyright (C) 2016-2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + import os import pytest import glob from datetime import timedelta import pkg_resources from swh.core.utils import numfile_sortkey as sortkey from swh.scheduler import get_scheduler from swh.scheduler.tests import SQL_DIR +from swh.scheduler.tests.tasks import register_test_tasks + # make sure we are not fooled by CELERY_ config environment vars for var in [x for x in os.environ.keys() if x.startswith('CELERY')]: os.environ.pop(var) -import swh.scheduler.celery_backend.config # noqa -# this import is needed here to enforce creation of the celery current app -# BEFORE the swh_app fixture is called, otherwise the Celery app instance from -# celery_backend.config becomes the celery.current_app - # test_cli tests depends on a en/C locale, so ensure it os.environ['LC_ALL'] = 'C.UTF-8' DUMP_FILES = os.path.join(SQL_DIR, '*.sql') # celery tasks for testing purpose; tasks themselves should be # in swh/scheduler/tests/tasks.py TASK_NAMES = ['ping', 'multiping', 'add', 'error'] @pytest.fixture(scope='session') def celery_enable_logging(): return True @pytest.fixture(scope='session') def celery_includes(): task_modules = [ 'swh.scheduler.tests.tasks', ] for entrypoint in pkg_resources.iter_entry_points('swh.workers'): task_modules.extend(entrypoint.load()().get('task_modules', [])) return task_modules @pytest.fixture(scope='session') def celery_parameters(): return { 'task_cls': 'swh.scheduler.task:SWHTask', } @pytest.fixture(scope='session') def celery_config(): return { 'accept_content': ['application/x-msgpack', 'application/json'], 'task_serializer': 'msgpack', 'result_serializer': 'json', } # override the celery_session_app fixture to monkeypatch the 'main' # swh.scheduler.celery_backend.config.app Celery application -# with the test application. +# with the test application (and also register test tasks) @pytest.fixture(scope='session') def swh_app(celery_session_app): - swh.scheduler.celery_backend.config.app = celery_session_app - yield celery_session_app + from swh.scheduler.celery_backend.config import app + register_test_tasks(celery_session_app) + app = celery_session_app # noqa + yield app @pytest.fixture def swh_scheduler(request, postgresql_proc, postgresql): scheduler_config = { 'db': 'postgresql://{user}@{host}:{port}/{dbname}'.format( host=postgresql_proc.host, port=postgresql_proc.port, user='postgres', dbname='tests') } all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) cursor = postgresql.cursor() for fname in all_dump_files: with open(fname) as fobj: cursor.execute(fobj.read()) postgresql.commit() scheduler = get_scheduler('local', scheduler_config) for taskname in TASK_NAMES: scheduler.create_task_type({ 'type': 'swh-test-{}'.format(taskname), 'description': 'The {} testing task'.format(taskname), 'backend_name': 'swh.scheduler.tests.tasks.{}'.format(taskname), 'default_interval': timedelta(days=1), 'min_interval': timedelta(hours=6), 'max_interval': timedelta(days=12), }) return scheduler diff --git a/swh/scheduler/tests/tasks.py b/swh/scheduler/tests/tasks.py index b18df46..89f7476 100644 --- a/swh/scheduler/tests/tasks.py +++ b/swh/scheduler/tests/tasks.py @@ -1,31 +1,43 @@ -from celery import group, current_app as app - - -@app.task(name='swh.scheduler.tests.tasks.ping', bind=True) -def ping(self, **kw): - # check this is a SWHTask - assert hasattr(self, 'log') - assert not hasattr(self, 'run_task') - assert 'SWHTask' in [x.__name__ for x in self.__class__.__mro__] - self.log.debug(self.name) - if kw: - return 'OK (kw=%s)' % kw - return 'OK' - - -@app.task(name='swh.scheduler.tests.tasks.multiping', bind=True) -def multiping(self, n=10): - promise = group(ping.s(i=i) for i in range(n))() - self.log.debug('%s OK (spawned %s subtasks)' % (self.name, n)) - promise.save() - return promise.id - - -@app.task(name='swh.scheduler.tests.tasks.error') -def not_implemented(): - raise NotImplementedError('Nope') - - -@app.task(name='swh.scheduler.tests.tasks.add') -def add(x, y): - return x + y +# Copyright (C) 2018-2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from celery import group + + +def register_test_tasks(app): + """Register test tasks for the specific app passed as parameter. + + In the test context, app is the swh_app and not the runtime one. + + Args: + app: Celery app. Expects the tests application + (swh.scheduler.tests.conftest.swh_app) + + """ + @app.task(name='swh.scheduler.tests.tasks.ping', bind=True) + def ping(self, **kw): + # check this is a SWHTask + assert hasattr(self, 'log') + assert not hasattr(self, 'run_task') + assert 'SWHTask' in [x.__name__ for x in self.__class__.__mro__] + self.log.debug(self.name) + if kw: + return 'OK (kw=%s)' % kw + return 'OK' + + @app.task(name='swh.scheduler.tests.tasks.multiping', bind=True) + def multiping(self, n=10): + promise = group(ping.s(i=i) for i in range(n))() + self.log.debug('%s OK (spawned %s subtasks)' % (self.name, n)) + promise.save() + return promise.id + + @app.task(name='swh.scheduler.tests.tasks.error') + def not_implemented(): + raise NotImplementedError('Nope') + + @app.task(name='swh.scheduler.tests.tasks.add') + def add(x, y): + return x + y