Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9312261
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
6 KB
Subscribers
None
View Options
diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py
index e4e9e05..608277a 100644
--- a/swh/scheduler/tests/conftest.py
+++ b/swh/scheduler/tests/conftest.py
@@ -1,100 +1,105 @@
+# Copyright (C) 2016-2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+
import os
import pytest
import glob
from datetime import timedelta
import pkg_resources
from swh.core.utils import numfile_sortkey as sortkey
from swh.scheduler import get_scheduler
from swh.scheduler.tests import SQL_DIR
+from swh.scheduler.tests.tasks import register_test_tasks
+
# make sure we are not fooled by CELERY_ config environment vars
for var in [x for x in os.environ.keys() if x.startswith('CELERY')]:
os.environ.pop(var)
-import swh.scheduler.celery_backend.config # noqa
-# this import is needed here to enforce creation of the celery current app
-# BEFORE the swh_app fixture is called, otherwise the Celery app instance from
-# celery_backend.config becomes the celery.current_app
-
# test_cli tests depends on a en/C locale, so ensure it
os.environ['LC_ALL'] = 'C.UTF-8'
DUMP_FILES = os.path.join(SQL_DIR, '*.sql')
# celery tasks for testing purpose; tasks themselves should be
# in swh/scheduler/tests/tasks.py
TASK_NAMES = ['ping', 'multiping', 'add', 'error']
@pytest.fixture(scope='session')
def celery_enable_logging():
return True
@pytest.fixture(scope='session')
def celery_includes():
task_modules = [
'swh.scheduler.tests.tasks',
]
for entrypoint in pkg_resources.iter_entry_points('swh.workers'):
task_modules.extend(entrypoint.load()().get('task_modules', []))
return task_modules
@pytest.fixture(scope='session')
def celery_parameters():
return {
'task_cls': 'swh.scheduler.task:SWHTask',
}
@pytest.fixture(scope='session')
def celery_config():
return {
'accept_content': ['application/x-msgpack', 'application/json'],
'task_serializer': 'msgpack',
'result_serializer': 'json',
}
# override the celery_session_app fixture to monkeypatch the 'main'
# swh.scheduler.celery_backend.config.app Celery application
-# with the test application.
+# with the test application (and also register test tasks)
@pytest.fixture(scope='session')
def swh_app(celery_session_app):
- swh.scheduler.celery_backend.config.app = celery_session_app
- yield celery_session_app
+ from swh.scheduler.celery_backend.config import app
+ register_test_tasks(celery_session_app)
+ app = celery_session_app # noqa
+ yield app
@pytest.fixture
def swh_scheduler(request, postgresql_proc, postgresql):
scheduler_config = {
'db': 'postgresql://{user}@{host}:{port}/{dbname}'.format(
host=postgresql_proc.host,
port=postgresql_proc.port,
user='postgres',
dbname='tests')
}
all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey)
cursor = postgresql.cursor()
for fname in all_dump_files:
with open(fname) as fobj:
cursor.execute(fobj.read())
postgresql.commit()
scheduler = get_scheduler('local', scheduler_config)
for taskname in TASK_NAMES:
scheduler.create_task_type({
'type': 'swh-test-{}'.format(taskname),
'description': 'The {} testing task'.format(taskname),
'backend_name': 'swh.scheduler.tests.tasks.{}'.format(taskname),
'default_interval': timedelta(days=1),
'min_interval': timedelta(hours=6),
'max_interval': timedelta(days=12),
})
return scheduler
diff --git a/swh/scheduler/tests/tasks.py b/swh/scheduler/tests/tasks.py
index b18df46..89f7476 100644
--- a/swh/scheduler/tests/tasks.py
+++ b/swh/scheduler/tests/tasks.py
@@ -1,31 +1,43 @@
-from celery import group, current_app as app
-
-
-@app.task(name='swh.scheduler.tests.tasks.ping', bind=True)
-def ping(self, **kw):
- # check this is a SWHTask
- assert hasattr(self, 'log')
- assert not hasattr(self, 'run_task')
- assert 'SWHTask' in [x.__name__ for x in self.__class__.__mro__]
- self.log.debug(self.name)
- if kw:
- return 'OK (kw=%s)' % kw
- return 'OK'
-
-
-@app.task(name='swh.scheduler.tests.tasks.multiping', bind=True)
-def multiping(self, n=10):
- promise = group(ping.s(i=i) for i in range(n))()
- self.log.debug('%s OK (spawned %s subtasks)' % (self.name, n))
- promise.save()
- return promise.id
-
-
-@app.task(name='swh.scheduler.tests.tasks.error')
-def not_implemented():
- raise NotImplementedError('Nope')
-
-
-@app.task(name='swh.scheduler.tests.tasks.add')
-def add(x, y):
- return x + y
+# Copyright (C) 2018-2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from celery import group
+
+
+def register_test_tasks(app):
+ """Register test tasks for the specific app passed as parameter.
+
+ In the test context, app is the swh_app and not the runtime one.
+
+ Args:
+ app: Celery app. Expects the tests application
+ (swh.scheduler.tests.conftest.swh_app)
+
+ """
+ @app.task(name='swh.scheduler.tests.tasks.ping', bind=True)
+ def ping(self, **kw):
+ # check this is a SWHTask
+ assert hasattr(self, 'log')
+ assert not hasattr(self, 'run_task')
+ assert 'SWHTask' in [x.__name__ for x in self.__class__.__mro__]
+ self.log.debug(self.name)
+ if kw:
+ return 'OK (kw=%s)' % kw
+ return 'OK'
+
+ @app.task(name='swh.scheduler.tests.tasks.multiping', bind=True)
+ def multiping(self, n=10):
+ promise = group(ping.s(i=i) for i in range(n))()
+ self.log.debug('%s OK (spawned %s subtasks)' % (self.name, n))
+ promise.save()
+ return promise.id
+
+ @app.task(name='swh.scheduler.tests.tasks.error')
+ def not_implemented():
+ raise NotImplementedError('Nope')
+
+ @app.task(name='swh.scheduler.tests.tasks.add')
+ def add(x, y):
+ return x + y
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Thu, Jul 3, 10:47 AM (1 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3249144
Attached To
rDSCH Scheduling utilities
Event Timeline
Log In to Comment