Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/test_celery_tasks.py
Show First 20 Lines • Show All 94 Lines • ▼ Show 20 Lines | ): | ||||
"Test that the scheduler fixture works properly" | "Test that the scheduler fixture works properly" | ||||
task_type = swh_scheduler.get_task_type("swh-test-ping") | task_type = swh_scheduler.get_task_type("swh-test-ping") | ||||
assert task_type | assert task_type | ||||
assert task_type["backend_name"] == TASK_PING | assert task_type["backend_name"] == TASK_PING | ||||
swh_scheduler.create_tasks([create_task_dict("swh-test-ping", "oneshot")]) | swh_scheduler.create_tasks([create_task_dict("swh-test-ping", "oneshot")]) | ||||
backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) | task_types = swh_scheduler.get_task_types() | ||||
backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app, task_types) | |||||
assert backend_tasks | assert backend_tasks | ||||
for task in backend_tasks: | for task in backend_tasks: | ||||
# Make sure the task completed | # Make sure the task completed | ||||
AsyncResult(id=task["backend_id"]).get() | AsyncResult(id=task["backend_id"]).get() | ||||
def test_run_ready_task_standard( | def test_run_ready_task_standard( | ||||
swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler | swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler | ||||
Show All 18 Lines | ): | ||||
assert len(tasks) == len(task_inputs) | assert len(tasks) == len(task_inputs) | ||||
task_ids = set() | task_ids = set() | ||||
for task in tasks: | for task in tasks: | ||||
assert task["status"] == "next_run_not_scheduled" | assert task["status"] == "next_run_not_scheduled" | ||||
assert task["priority"] is None | assert task["priority"] is None | ||||
task_ids.add(task["id"]) | task_ids.add(task["id"]) | ||||
backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) | task_types = swh_scheduler.get_task_types() | ||||
backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app, task_types) | |||||
assert len(backend_tasks) == len(tasks) | assert len(backend_tasks) == len(tasks) | ||||
scheduled_tasks = swh_scheduler.search_tasks(task_type=task_type_name) | scheduled_tasks = swh_scheduler.search_tasks(task_type=task_type_name) | ||||
assert len(scheduled_tasks) == len(tasks) | assert len(scheduled_tasks) == len(tasks) | ||||
for task in scheduled_tasks: | for task in scheduled_tasks: | ||||
assert task["status"] == "next_run_scheduled" | assert task["status"] == "next_run_scheduled" | ||||
assert task["id"] in task_ids | assert task["id"] in task_ids | ||||
Show All 27 Lines | ): | ||||
assert len(tasks) == len(task_inputs) | assert len(tasks) == len(task_inputs) | ||||
task_ids = set() | task_ids = set() | ||||
for task in tasks: | for task in tasks: | ||||
assert task["status"] == "next_run_not_scheduled" | assert task["status"] == "next_run_not_scheduled" | ||||
assert task["priority"] is not None | assert task["priority"] is not None | ||||
task_ids.add(task["id"]) | task_ids.add(task["id"]) | ||||
task_types = swh_scheduler.get_task_types() | |||||
backend_tasks = run_ready_tasks( | backend_tasks = run_ready_tasks( | ||||
swh_scheduler, swh_scheduler_celery_app, task_types=[], with_priority=True | swh_scheduler, swh_scheduler_celery_app, task_types, with_priority=True | ||||
) | ) | ||||
assert len(backend_tasks) == len(tasks) | assert len(backend_tasks) == len(tasks) | ||||
scheduled_tasks = swh_scheduler.search_tasks(task_type=task_type_name) | scheduled_tasks = swh_scheduler.search_tasks(task_type=task_type_name) | ||||
assert len(scheduled_tasks) == len(tasks) | assert len(scheduled_tasks) == len(tasks) | ||||
for task in scheduled_tasks: | for task in scheduled_tasks: | ||||
assert task["status"] == "next_run_scheduled" | assert task["status"] == "next_run_scheduled" | ||||
assert task["id"] in task_ids | assert task["id"] in task_ids | ||||
Show All 9 Lines | def test_task_exception( | ||||
swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler | swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler | ||||
): | ): | ||||
task_type = swh_scheduler.get_task_type("swh-test-error") | task_type = swh_scheduler.get_task_type("swh-test-error") | ||||
assert task_type | assert task_type | ||||
assert task_type["backend_name"] == TASK_ERROR | assert task_type["backend_name"] == TASK_ERROR | ||||
swh_scheduler.create_tasks([create_task_dict("swh-test-error", "oneshot")]) | swh_scheduler.create_tasks([create_task_dict("swh-test-error", "oneshot")]) | ||||
backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) | task_types = swh_scheduler.get_task_types() | ||||
backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app, task_types) | |||||
assert len(backend_tasks) == 1 | assert len(backend_tasks) == 1 | ||||
task = backend_tasks[0] | task = backend_tasks[0] | ||||
result = AsyncResult(id=task["backend_id"]) | result = AsyncResult(id=task["backend_id"]) | ||||
with pytest.raises(NotImplementedError): | with pytest.raises(NotImplementedError): | ||||
result.get() | result.get() | ||||
▲ Show 20 Lines • Show All 66 Lines • Show Last 20 Lines |