diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py index fbfb5d2..118c543 100644 --- a/swh/scheduler/tests/conftest.py +++ b/swh/scheduler/tests/conftest.py @@ -1,105 +1,103 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest import glob from datetime import timedelta import pkg_resources from swh.core.utils import numfile_sortkey as sortkey from swh.scheduler import get_scheduler from swh.scheduler.tests import SQL_DIR -from swh.scheduler.tests.tasks import register_test_tasks # make sure we are not fooled by CELERY_ config environment vars for var in [x for x in os.environ.keys() if x.startswith('CELERY')]: os.environ.pop(var) # test_cli tests depends on a en/C locale, so ensure it os.environ['LC_ALL'] = 'C.UTF-8' DUMP_FILES = os.path.join(SQL_DIR, '*.sql') # celery tasks for testing purpose; tasks themselves should be # in swh/scheduler/tests/tasks.py TASK_NAMES = ['ping', 'multiping', 'add', 'error'] @pytest.fixture(scope='session') def celery_enable_logging(): return True @pytest.fixture(scope='session') def celery_includes(): task_modules = [ 'swh.scheduler.tests.tasks', ] for entrypoint in pkg_resources.iter_entry_points('swh.workers'): task_modules.extend(entrypoint.load()().get('task_modules', [])) return task_modules @pytest.fixture(scope='session') def celery_parameters(): return { 'task_cls': 'swh.scheduler.task:SWHTask', } @pytest.fixture(scope='session') def celery_config(): return { 'accept_content': ['application/x-msgpack', 'application/json'], 'task_serializer': 'msgpack', 'result_serializer': 'json', } -# override the celery_session_app fixture to monkeypatch the 'main' +# use the celery_session_app fixture to monkeypatch the 'main' # swh.scheduler.celery_backend.config.app Celery application -# with the test application (and also register test tasks) +# with the test application @pytest.fixture(scope='session') def swh_app(celery_session_app): - from swh.scheduler.celery_backend.config import app - register_test_tasks(celery_session_app) - app = celery_session_app # noqa - yield app + from swh.scheduler.celery_backend import config + config.app = celery_session_app + yield celery_session_app @pytest.fixture def swh_scheduler(postgresql): scheduler_config = { 'db': postgresql.dsn, } all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) cursor = postgresql.cursor() for fname in all_dump_files: with open(fname) as fobj: cursor.execute(fobj.read()) postgresql.commit() scheduler = get_scheduler('local', scheduler_config) for taskname in TASK_NAMES: scheduler.create_task_type({ 'type': 'swh-test-{}'.format(taskname), 'description': 'The {} testing task'.format(taskname), 'backend_name': 'swh.scheduler.tests.tasks.{}'.format(taskname), 'default_interval': timedelta(days=1), 'min_interval': timedelta(hours=6), 'max_interval': timedelta(days=12), }) return scheduler # this alias is used to be able to easily instantiate a db-backed Scheduler # eg. for the RPC client/server test suite. swh_db_scheduler = swh_scheduler diff --git a/swh/scheduler/tests/tasks.py b/swh/scheduler/tests/tasks.py index 89f7476..93ee0f1 100644 --- a/swh/scheduler/tests/tasks.py +++ b/swh/scheduler/tests/tasks.py @@ -1,43 +1,38 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import group +from swh.scheduler.celery_backend.config import app -def register_test_tasks(app): - """Register test tasks for the specific app passed as parameter. - - In the test context, app is the swh_app and not the runtime one. - - Args: - app: Celery app. Expects the tests application - (swh.scheduler.tests.conftest.swh_app) - - """ - @app.task(name='swh.scheduler.tests.tasks.ping', bind=True) - def ping(self, **kw): - # check this is a SWHTask - assert hasattr(self, 'log') - assert not hasattr(self, 'run_task') - assert 'SWHTask' in [x.__name__ for x in self.__class__.__mro__] - self.log.debug(self.name) - if kw: - return 'OK (kw=%s)' % kw - return 'OK' - - @app.task(name='swh.scheduler.tests.tasks.multiping', bind=True) - def multiping(self, n=10): - promise = group(ping.s(i=i) for i in range(n))() - self.log.debug('%s OK (spawned %s subtasks)' % (self.name, n)) - promise.save() - return promise.id - - @app.task(name='swh.scheduler.tests.tasks.error') - def not_implemented(): - raise NotImplementedError('Nope') - - @app.task(name='swh.scheduler.tests.tasks.add') - def add(x, y): - return x + y + +@app.task(name='swh.scheduler.tests.tasks.ping', bind=True) +def ping(self, **kw): + # check this is a SWHTask + assert hasattr(self, 'log') + assert not hasattr(self, 'run_task') + assert 'SWHTask' in [x.__name__ for x in self.__class__.__mro__] + self.log.debug(self.name) + if kw: + return 'OK (kw=%s)' % kw + return 'OK' + + +@app.task(name='swh.scheduler.tests.tasks.multiping', bind=True) +def multiping(self, n=10): + promise = group(ping.s(i=i) for i in range(n))() + self.log.debug('%s OK (spawned %s subtasks)' % (self.name, n)) + promise.save() + return promise.id + + +@app.task(name='swh.scheduler.tests.tasks.error') +def not_implemented(): + raise NotImplementedError('Nope') + + +@app.task(name='swh.scheduler.tests.tasks.add') +def add(x, y): + return x + y