Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/test_cli_origin.py
Show First 20 Lines • Show All 104 Lines • ▼ Show 20 Lines | scheduled_tasks = { | ||||
(task["type"], task["arguments"]["kwargs"]["url"]) for task in tasks | (task["type"], task["arguments"]["kwargs"]["url"]) for task in tasks | ||||
} | } | ||||
all_possible_tasks = { | all_possible_tasks = { | ||||
(f"load-{origin.visit_type}", origin.url) | (f"load-{origin.visit_type}", origin.url) | ||||
for origin in listed_origins_by_type[visit_type] | for origin in listed_origins_by_type[visit_type] | ||||
} | } | ||||
assert scheduled_tasks <= all_possible_tasks | assert scheduled_tasks <= all_possible_tasks | ||||
def test_update_metrics(swh_scheduler, listed_origins): | |||||
swh_scheduler.record_listed_origins(listed_origins) | |||||
assert swh_scheduler.get_metrics() == [] | |||||
result = invoke(swh_scheduler, args=("update-metrics",)) | |||||
assert result.exit_code == 0 | |||||
assert swh_scheduler.get_metrics() != [] |