Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py
index e4e9e05..608277a 100644
--- a/swh/scheduler/tests/conftest.py
+++ b/swh/scheduler/tests/conftest.py
@@ -1,100 +1,105 @@
+# Copyright (C) 2016-2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+
import os
import pytest
import glob
from datetime import timedelta
import pkg_resources
from swh.core.utils import numfile_sortkey as sortkey
from swh.scheduler import get_scheduler
from swh.scheduler.tests import SQL_DIR
+from swh.scheduler.tests.tasks import register_test_tasks
+
# make sure we are not fooled by CELERY_ config environment vars
for var in [x for x in os.environ.keys() if x.startswith('CELERY')]:
os.environ.pop(var)
-import swh.scheduler.celery_backend.config # noqa
-# this import is needed here to enforce creation of the celery current app
-# BEFORE the swh_app fixture is called, otherwise the Celery app instance from
-# celery_backend.config becomes the celery.current_app
-
# test_cli tests depends on a en/C locale, so ensure it
os.environ['LC_ALL'] = 'C.UTF-8'
DUMP_FILES = os.path.join(SQL_DIR, '*.sql')
# celery tasks for testing purpose; tasks themselves should be
# in swh/scheduler/tests/tasks.py
TASK_NAMES = ['ping', 'multiping', 'add', 'error']
@pytest.fixture(scope='session')
def celery_enable_logging():
return True
@pytest.fixture(scope='session')
def celery_includes():
task_modules = [
'swh.scheduler.tests.tasks',
]
for entrypoint in pkg_resources.iter_entry_points('swh.workers'):
task_modules.extend(entrypoint.load()().get('task_modules', []))
return task_modules
@pytest.fixture(scope='session')
def celery_parameters():
return {
'task_cls': 'swh.scheduler.task:SWHTask',
}
@pytest.fixture(scope='session')
def celery_config():
return {
'accept_content': ['application/x-msgpack', 'application/json'],
'task_serializer': 'msgpack',
'result_serializer': 'json',
}
# override the celery_session_app fixture to monkeypatch the 'main'
# swh.scheduler.celery_backend.config.app Celery application
-# with the test application.
+# with the test application (and also register test tasks)
@pytest.fixture(scope='session')
def swh_app(celery_session_app):
- swh.scheduler.celery_backend.config.app = celery_session_app
- yield celery_session_app
+ from swh.scheduler.celery_backend.config import app
+ register_test_tasks(celery_session_app)
+ app = celery_session_app # noqa
+ yield app
@pytest.fixture
def swh_scheduler(request, postgresql_proc, postgresql):
scheduler_config = {
'db': 'postgresql://{user}@{host}:{port}/{dbname}'.format(
host=postgresql_proc.host,
port=postgresql_proc.port,
user='postgres',
dbname='tests')
}
all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey)
cursor = postgresql.cursor()
for fname in all_dump_files:
with open(fname) as fobj:
cursor.execute(fobj.read())
postgresql.commit()
scheduler = get_scheduler('local', scheduler_config)
for taskname in TASK_NAMES:
scheduler.create_task_type({
'type': 'swh-test-{}'.format(taskname),
'description': 'The {} testing task'.format(taskname),
'backend_name': 'swh.scheduler.tests.tasks.{}'.format(taskname),
'default_interval': timedelta(days=1),
'min_interval': timedelta(hours=6),
'max_interval': timedelta(days=12),
})
return scheduler
diff --git a/swh/scheduler/tests/tasks.py b/swh/scheduler/tests/tasks.py
index b18df46..89f7476 100644
--- a/swh/scheduler/tests/tasks.py
+++ b/swh/scheduler/tests/tasks.py
@@ -1,31 +1,43 @@
-from celery import group, current_app as app
-
-
-@app.task(name='swh.scheduler.tests.tasks.ping', bind=True)
-def ping(self, **kw):
- # check this is a SWHTask
- assert hasattr(self, 'log')
- assert not hasattr(self, 'run_task')
- assert 'SWHTask' in [x.__name__ for x in self.__class__.__mro__]
- self.log.debug(self.name)
- if kw:
- return 'OK (kw=%s)' % kw
- return 'OK'
-
-
-@app.task(name='swh.scheduler.tests.tasks.multiping', bind=True)
-def multiping(self, n=10):
- promise = group(ping.s(i=i) for i in range(n))()
- self.log.debug('%s OK (spawned %s subtasks)' % (self.name, n))
- promise.save()
- return promise.id
-
-
-@app.task(name='swh.scheduler.tests.tasks.error')
-def not_implemented():
- raise NotImplementedError('Nope')
-
-
-@app.task(name='swh.scheduler.tests.tasks.add')
-def add(x, y):
- return x + y
+# Copyright (C) 2018-2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from celery import group
+
+
+def register_test_tasks(app):
+ """Register test tasks for the specific app passed as parameter.
+
+ In the test context, app is the swh_app and not the runtime one.
+
+ Args:
+ app: Celery app. Expects the tests application
+ (swh.scheduler.tests.conftest.swh_app)
+
+ """
+ @app.task(name='swh.scheduler.tests.tasks.ping', bind=True)
+ def ping(self, **kw):
+ # check this is a SWHTask
+ assert hasattr(self, 'log')
+ assert not hasattr(self, 'run_task')
+ assert 'SWHTask' in [x.__name__ for x in self.__class__.__mro__]
+ self.log.debug(self.name)
+ if kw:
+ return 'OK (kw=%s)' % kw
+ return 'OK'
+
+ @app.task(name='swh.scheduler.tests.tasks.multiping', bind=True)
+ def multiping(self, n=10):
+ promise = group(ping.s(i=i) for i in range(n))()
+ self.log.debug('%s OK (spawned %s subtasks)' % (self.name, n))
+ promise.save()
+ return promise.id
+
+ @app.task(name='swh.scheduler.tests.tasks.error')
+ def not_implemented():
+ raise NotImplementedError('Nope')
+
+ @app.task(name='swh.scheduler.tests.tasks.add')
+ def add(x, y):
+ return x + y

File Metadata

Mime Type
text/x-diff
Expires
Thu, Jul 3, 10:47 AM (1 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3249144

Event Timeline