Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/test_celery_tasks.py
from time import sleep | from time import sleep | ||||
from celery.result import GroupResult | from celery.result import GroupResult | ||||
from celery.result import AsyncResult | |||||
import pytest | |||||
from swh.scheduler.utils import create_task_dict | |||||
from swh.scheduler.celery_backend.runner import run_ready_tasks | |||||
def test_ping(swh_app, celery_session_worker): | def test_ping(swh_app, celery_session_worker): | ||||
res = swh_app.send_task( | res = swh_app.send_task( | ||||
'swh.scheduler.tests.tasks.ping') | 'swh.scheduler.tests.tasks.ping') | ||||
assert res | assert res | ||||
res.wait() | res.wait() | ||||
assert res.successful() | assert res.successful() | ||||
Show All 17 Lines | def test_multiping(swh_app, celery_session_worker): | ||||
for i in range(5): | for i in range(5): | ||||
if promise.ready(): | if promise.ready(): | ||||
break | break | ||||
sleep(1) | sleep(1) | ||||
results = [x.get() for x in promise.results] | results = [x.get() for x in promise.results] | ||||
for i in range(5): | for i in range(5): | ||||
assert ("OK (kw={'i': %s})" % i) in results | assert ("OK (kw={'i': %s})" % i) in results | ||||
def test_scheduler_fixture(swh_app, celery_session_worker, swh_scheduler): | |||||
"Test that the scheduler fixture works properly" | |||||
task_type = swh_scheduler.get_task_type('swh-test-ping') | |||||
assert task_type | |||||
assert task_type['backend_name'] == 'swh.scheduler.tests.tasks.ping' | |||||
swh_scheduler.create_tasks([create_task_dict( | |||||
'swh-test-ping', 'oneshot')]) | |||||
backend_tasks = run_ready_tasks(swh_scheduler, swh_app) | |||||
assert backend_tasks | |||||
for task in backend_tasks: | |||||
# Make sure the task completed | |||||
AsyncResult(id=task['backend_id']).get() | |||||
def test_task_return_value(swh_app, celery_session_worker, swh_scheduler): | |||||
task_type = swh_scheduler.get_task_type('swh-test-add') | |||||
assert task_type | |||||
assert task_type['backend_name'] == 'swh.scheduler.tests.tasks.add' | |||||
swh_scheduler.create_tasks([create_task_dict( | |||||
'swh-test-add', 'oneshot', 12, 30)]) | |||||
backend_tasks = run_ready_tasks(swh_scheduler, swh_app) | |||||
assert len(backend_tasks) == 1 | |||||
task = backend_tasks[0] | |||||
value = AsyncResult(id=task['backend_id']).get() | |||||
assert value == 42 | |||||
def test_task_exception(swh_app, celery_session_worker, swh_scheduler): | |||||
task_type = swh_scheduler.get_task_type('swh-test-error') | |||||
assert task_type | |||||
assert task_type['backend_name'] == 'swh.scheduler.tests.tasks.error' | |||||
swh_scheduler.create_tasks([create_task_dict( | |||||
'swh-test-error', 'oneshot')]) | |||||
backend_tasks = run_ready_tasks(swh_scheduler, swh_app) | |||||
assert len(backend_tasks) == 1 | |||||
task = backend_tasks[0] | |||||
result = AsyncResult(id=task['backend_id']) | |||||
with pytest.raises(NotImplementedError): | |||||
result.get() |