Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/test_celery_tasks.py
Show All 11 Lines | def test_ping(swh_app, celery_session_worker): | ||||
res = swh_app.send_task( | res = swh_app.send_task( | ||||
'swh.scheduler.tests.tasks.ping') | 'swh.scheduler.tests.tasks.ping') | ||||
assert res | assert res | ||||
res.wait() | res.wait() | ||||
assert res.successful() | assert res.successful() | ||||
assert res.result == 'OK' | assert res.result == 'OK' | ||||
def test_ping_with_kw(swh_app, celery_session_worker): | |||||
res = swh_app.send_task( | |||||
'swh.scheduler.tests.tasks.ping', kwargs={'a': 1}) | |||||
assert res | |||||
res.wait() | |||||
assert res.successful() | |||||
assert res.result == "OK (kw={'a': 1})" | |||||
def test_multiping(swh_app, celery_session_worker): | def test_multiping(swh_app, celery_session_worker): | ||||
"Test that a task that spawns subtasks (group) works" | "Test that a task that spawns subtasks (group) works" | ||||
res = swh_app.send_task( | res = swh_app.send_task( | ||||
'swh.scheduler.tests.tasks.multiping', n=5) | 'swh.scheduler.tests.tasks.multiping', kwargs={'n': 5}) | ||||
assert res | assert res | ||||
res.wait() | res.wait() | ||||
assert res.successful() | assert res.successful() | ||||
# retrieve the GroupResult for this task and wait for all the subtasks | # retrieve the GroupResult for this task and wait for all the subtasks | ||||
# to complete | # to complete | ||||
promise_id = res.result | promise_id = res.result | ||||
assert promise_id | assert promise_id | ||||
promise = GroupResult.restore(promise_id, app=swh_app) | promise = GroupResult.restore(promise_id, app=swh_app) | ||||
for i in range(5): | for i in range(5): | ||||
if promise.ready(): | if promise.ready(): | ||||
break | break | ||||
sleep(1) | sleep(1) | ||||
results = [x.get() for x in promise.results] | results = [x.get() for x in promise.results] | ||||
assert len(results) == 5 | |||||
for i in range(5): | for i in range(5): | ||||
assert ("OK (kw={'i': %s})" % i) in results | assert ("OK (kw={'i': %s})" % i) in results | ||||
@pytest.mark.db | @pytest.mark.db | ||||
def test_scheduler_fixture(swh_app, celery_session_worker, swh_scheduler): | def test_scheduler_fixture(swh_app, celery_session_worker, swh_scheduler): | ||||
"Test that the scheduler fixture works properly" | "Test that the scheduler fixture works properly" | ||||
task_type = swh_scheduler.get_task_type('swh-test-ping') | task_type = swh_scheduler.get_task_type('swh-test-ping') | ||||
▲ Show 20 Lines • Show All 46 Lines • Show Last 20 Lines |