Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/test_cli_origin.py
Show First 20 Lines • Show All 106 Lines • ▼ Show 20 Lines | def test_schedule_next(swh_scheduler, listed_origins_by_type): | ||||
all_possible_tasks = { | all_possible_tasks = { | ||||
(f"load-{origin.visit_type}", origin.url) | (f"load-{origin.visit_type}", origin.url) | ||||
for origin in listed_origins_by_type[visit_type] | for origin in listed_origins_by_type[visit_type] | ||||
} | } | ||||
assert scheduled_tasks <= all_possible_tasks | assert scheduled_tasks <= all_possible_tasks | ||||
def test_send_to_celery( | |||||
mocker, swh_scheduler, swh_scheduler_celery_app, listed_origins_by_type, | |||||
): | |||||
for task_type in TASK_TYPES.values(): | |||||
swh_scheduler.create_task_type(task_type) | |||||
visit_type = next(iter(listed_origins_by_type)) | |||||
for origins in listed_origins_by_type.values(): | |||||
swh_scheduler.record_listed_origins(origins) | |||||
get_queue_length = mocker.patch( | |||||
"swh.scheduler.celery_backend.config.get_queue_length" | |||||
) | |||||
get_queue_length.return_value = None | |||||
send_task = mocker.patch.object(swh_scheduler_celery_app, "send_task") | |||||
send_task.return_value = None | |||||
result = invoke(swh_scheduler, args=("send-to-celery", visit_type)) | |||||
assert result.exit_code == 0 | |||||
scheduled_tasks = { | |||||
(call[0][0], call[1]["kwargs"]["url"]) for call in send_task.call_args_list | |||||
} | |||||
expected_tasks = { | |||||
(TASK_TYPES[origin.visit_type]["backend_name"], origin.url) | |||||
for origin in listed_origins_by_type[visit_type] | |||||
} | |||||
assert expected_tasks == scheduled_tasks | |||||
def test_update_metrics(swh_scheduler, listed_origins): | def test_update_metrics(swh_scheduler, listed_origins): | ||||
swh_scheduler.record_listed_origins(listed_origins) | swh_scheduler.record_listed_origins(listed_origins) | ||||
assert swh_scheduler.get_metrics() == [] | assert swh_scheduler.get_metrics() == [] | ||||
result = invoke(swh_scheduler, args=("update-metrics",)) | result = invoke(swh_scheduler, args=("update-metrics",)) | ||||
assert result.exit_code == 0 | assert result.exit_code == 0 | ||||
assert swh_scheduler.get_metrics() != [] | assert swh_scheduler.get_metrics() != [] |