diff --git a/requirements-test.txt b/requirements-test.txt index 2416744..d8d5d62 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,3 +1,4 @@ hypothesis pytest +pytest-postgresql celery >= 4 diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py index 4facd4b..8b2dbce 100644 --- a/swh/scheduler/tests/conftest.py +++ b/swh/scheduler/tests/conftest.py @@ -1,30 +1,84 @@ +import os import pytest +import glob +from datetime import timedelta + +from swh.core.utils import numfile_sortkey as sortkey +from swh.scheduler import get_scheduler +from swh.scheduler.tests import SQL_DIR + +DUMP_FILES = os.path.join(SQL_DIR, '*.sql') + +# celery tasks for testing purpose; tasks themselves should be +# in swh/scheduler/tests/celery_tasks.py +TASK_NAMES = ['ping', 'multiping'] @pytest.fixture(scope='session') def celery_enable_logging(): return True @pytest.fixture(scope='session') def celery_includes(): return [ 'swh.scheduler.tests.tasks', ] @pytest.fixture(scope='session') def celery_parameters(): return { - 'task_cls': 'swh.scheduler.task:SWHTask', + 'task_cls': 'swh.scheduler.task:SWHTask', + } + + +@pytest.fixture(scope='session') +def celery_config(): + return { + 'accept_content': ['application/x-msgpack', 'application/json'], + 'task_serializer': 'msgpack', + 'result_serializer': 'msgpack', } # override the celery_session_app fixture to monkeypatch the 'main' # swh.scheduler.celery_backend.config.app Celery application # with the test application. @pytest.fixture(scope='session') def swh_app(celery_session_app): import swh.scheduler.celery_backend.config swh.scheduler.celery_backend.config.app = celery_session_app yield celery_session_app + + +@pytest.fixture +def swh_scheduler(request, postgresql_proc, postgresql): + scheduler_config = { + 'scheduling_db': 'postgresql://{user}@{host}:{port}/{dbname}'.format( + host=postgresql_proc.host, + port=postgresql_proc.port, + user='postgres', + dbname='tests') + } + + all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) + + cursor = postgresql.cursor() + for fname in all_dump_files: + with open(fname) as fobj: + cursor.execute(fobj.read()) + postgresql.commit() + + scheduler = get_scheduler('local', scheduler_config) + for taskname in TASK_NAMES: + scheduler.create_task_type({ + 'type': 'swh-test-{}'.format(taskname), + 'description': 'The {} testing task'.format(taskname), + 'backend_name': 'swh.scheduler.tests.tasks.{}'.format(taskname), + 'default_interval': timedelta(days=1), + 'min_interval': timedelta(hours=6), + 'max_interval': timedelta(days=12), + }) + + return scheduler diff --git a/swh/scheduler/tests/test_celery_tasks.py b/swh/scheduler/tests/test_celery_tasks.py index 8811bb3..8c53c38 100644 --- a/swh/scheduler/tests/test_celery_tasks.py +++ b/swh/scheduler/tests/test_celery_tasks.py @@ -1,35 +1,55 @@ from time import sleep from celery.result import GroupResult +from celery.result import AsyncResult + +from swh.scheduler.utils import create_task_dict +from swh.scheduler.celery_backend.runner import run_ready_tasks def test_ping(swh_app, celery_session_worker): res = swh_app.send_task( 'swh.scheduler.tests.tasks.ping') assert res res.wait() assert res.successful() assert res.result == 'OK' def test_multiping(swh_app, celery_session_worker): "Test that a task that spawns subtasks (group) works" res = swh_app.send_task( 'swh.scheduler.tests.tasks.multiping', n=5) assert res res.wait() assert res.successful() # retrieve the GroupResult for this task and wait for all the subtasks # to complete promise_id = res.result assert promise_id promise = GroupResult.restore(promise_id, app=swh_app) for i in range(5): if promise.ready(): break sleep(1) results = [x.get() for x in promise.results] for i in range(5): assert ("OK (kw={'i': %s})" % i) in results + + +def test_scheduler_fixture(swh_app, celery_session_worker, swh_scheduler): + "Test that the scheduler fixture works properly" + task_type = swh_scheduler.get_task_type('swh-test-ping') + assert task_type + assert task_type['backend_name'] == 'swh.scheduler.tests.tasks.ping' + + swh_scheduler.create_tasks([create_task_dict( + 'swh-test-ping', 'oneshot')]) + + backend_tasks = run_ready_tasks(swh_scheduler, swh_app) + assert backend_tasks + for task in backend_tasks: + # Make sure the task completed + AsyncResult(id=task['backend_id']).get()