Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9341884
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
6 KB
Subscribers
None
View Options
diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py
index fbfb5d2..118c543 100644
--- a/swh/scheduler/tests/conftest.py
+++ b/swh/scheduler/tests/conftest.py
@@ -1,105 +1,103 @@
# Copyright (C) 2016-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import pytest
import glob
from datetime import timedelta
import pkg_resources
from swh.core.utils import numfile_sortkey as sortkey
from swh.scheduler import get_scheduler
from swh.scheduler.tests import SQL_DIR
-from swh.scheduler.tests.tasks import register_test_tasks
# make sure we are not fooled by CELERY_ config environment vars
for var in [x for x in os.environ.keys() if x.startswith('CELERY')]:
os.environ.pop(var)
# test_cli tests depends on a en/C locale, so ensure it
os.environ['LC_ALL'] = 'C.UTF-8'
DUMP_FILES = os.path.join(SQL_DIR, '*.sql')
# celery tasks for testing purpose; tasks themselves should be
# in swh/scheduler/tests/tasks.py
TASK_NAMES = ['ping', 'multiping', 'add', 'error']
@pytest.fixture(scope='session')
def celery_enable_logging():
return True
@pytest.fixture(scope='session')
def celery_includes():
task_modules = [
'swh.scheduler.tests.tasks',
]
for entrypoint in pkg_resources.iter_entry_points('swh.workers'):
task_modules.extend(entrypoint.load()().get('task_modules', []))
return task_modules
@pytest.fixture(scope='session')
def celery_parameters():
return {
'task_cls': 'swh.scheduler.task:SWHTask',
}
@pytest.fixture(scope='session')
def celery_config():
return {
'accept_content': ['application/x-msgpack', 'application/json'],
'task_serializer': 'msgpack',
'result_serializer': 'json',
}
-# override the celery_session_app fixture to monkeypatch the 'main'
+# use the celery_session_app fixture to monkeypatch the 'main'
# swh.scheduler.celery_backend.config.app Celery application
-# with the test application (and also register test tasks)
+# with the test application
@pytest.fixture(scope='session')
def swh_app(celery_session_app):
- from swh.scheduler.celery_backend.config import app
- register_test_tasks(celery_session_app)
- app = celery_session_app # noqa
- yield app
+ from swh.scheduler.celery_backend import config
+ config.app = celery_session_app
+ yield celery_session_app
@pytest.fixture
def swh_scheduler(postgresql):
scheduler_config = {
'db': postgresql.dsn,
}
all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey)
cursor = postgresql.cursor()
for fname in all_dump_files:
with open(fname) as fobj:
cursor.execute(fobj.read())
postgresql.commit()
scheduler = get_scheduler('local', scheduler_config)
for taskname in TASK_NAMES:
scheduler.create_task_type({
'type': 'swh-test-{}'.format(taskname),
'description': 'The {} testing task'.format(taskname),
'backend_name': 'swh.scheduler.tests.tasks.{}'.format(taskname),
'default_interval': timedelta(days=1),
'min_interval': timedelta(hours=6),
'max_interval': timedelta(days=12),
})
return scheduler
# this alias is used to be able to easily instantiate a db-backed Scheduler
# eg. for the RPC client/server test suite.
swh_db_scheduler = swh_scheduler
diff --git a/swh/scheduler/tests/tasks.py b/swh/scheduler/tests/tasks.py
index 89f7476..93ee0f1 100644
--- a/swh/scheduler/tests/tasks.py
+++ b/swh/scheduler/tests/tasks.py
@@ -1,43 +1,38 @@
# Copyright (C) 2018-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from celery import group
+from swh.scheduler.celery_backend.config import app
-def register_test_tasks(app):
- """Register test tasks for the specific app passed as parameter.
-
- In the test context, app is the swh_app and not the runtime one.
-
- Args:
- app: Celery app. Expects the tests application
- (swh.scheduler.tests.conftest.swh_app)
-
- """
- @app.task(name='swh.scheduler.tests.tasks.ping', bind=True)
- def ping(self, **kw):
- # check this is a SWHTask
- assert hasattr(self, 'log')
- assert not hasattr(self, 'run_task')
- assert 'SWHTask' in [x.__name__ for x in self.__class__.__mro__]
- self.log.debug(self.name)
- if kw:
- return 'OK (kw=%s)' % kw
- return 'OK'
-
- @app.task(name='swh.scheduler.tests.tasks.multiping', bind=True)
- def multiping(self, n=10):
- promise = group(ping.s(i=i) for i in range(n))()
- self.log.debug('%s OK (spawned %s subtasks)' % (self.name, n))
- promise.save()
- return promise.id
-
- @app.task(name='swh.scheduler.tests.tasks.error')
- def not_implemented():
- raise NotImplementedError('Nope')
-
- @app.task(name='swh.scheduler.tests.tasks.add')
- def add(x, y):
- return x + y
+
+@app.task(name='swh.scheduler.tests.tasks.ping', bind=True)
+def ping(self, **kw):
+ # check this is a SWHTask
+ assert hasattr(self, 'log')
+ assert not hasattr(self, 'run_task')
+ assert 'SWHTask' in [x.__name__ for x in self.__class__.__mro__]
+ self.log.debug(self.name)
+ if kw:
+ return 'OK (kw=%s)' % kw
+ return 'OK'
+
+
+@app.task(name='swh.scheduler.tests.tasks.multiping', bind=True)
+def multiping(self, n=10):
+ promise = group(ping.s(i=i) for i in range(n))()
+ self.log.debug('%s OK (spawned %s subtasks)' % (self.name, n))
+ promise.save()
+ return promise.id
+
+
+@app.task(name='swh.scheduler.tests.tasks.error')
+def not_implemented():
+ raise NotImplementedError('Nope')
+
+
+@app.task(name='swh.scheduler.tests.tasks.add')
+def add(x, y):
+ return x + y
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Jul 4, 12:20 PM (2 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3266971
Attached To
rDSCH Scheduling utilities
Event Timeline
Log In to Comment