Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/test_scheduler.py
Show First 20 Lines • Show All 1,198 Lines • ▼ Show 20 Lines | def test_origin_visit_stats_upsert_cardinality_failing(self, swh_scheduler) -> None: | ||||
last_uneventful=utcnow(), | last_uneventful=utcnow(), | ||||
last_notfound=None, | last_notfound=None, | ||||
last_failed=None, | last_failed=None, | ||||
last_snapshot=None, | last_snapshot=None, | ||||
), | ), | ||||
] | ] | ||||
) | ) | ||||
def test_visit_scheduler_queue_position( | |||||
self, swh_scheduler, listed_origins | |||||
) -> None: | |||||
result = swh_scheduler.visit_scheduler_queue_position_get() | |||||
assert result == {} | |||||
expected_result = {} | |||||
visit_types = set() | |||||
for origin in listed_origins: | |||||
visit_type = origin.visit_type | |||||
if visit_type in visit_types: | |||||
continue | |||||
visit_types.add(visit_type) | |||||
position = utcnow() | |||||
swh_scheduler.visit_scheduler_queue_position_set(visit_type, position) | |||||
expected_result[visit_type] = position | |||||
result = swh_scheduler.visit_scheduler_queue_position_get() | |||||
assert result == expected_result | |||||
def test_metrics_origins_known(self, swh_scheduler, listed_origins): | def test_metrics_origins_known(self, swh_scheduler, listed_origins): | ||||
swh_scheduler.record_listed_origins(listed_origins) | swh_scheduler.record_listed_origins(listed_origins) | ||||
ret = swh_scheduler.update_metrics() | ret = swh_scheduler.update_metrics() | ||||
assert sum(metric.origins_known for metric in ret) == len(listed_origins) | assert sum(metric.origins_known for metric in ret) == len(listed_origins) | ||||
def test_metrics_origins_enabled(self, swh_scheduler, listed_origins): | def test_metrics_origins_enabled(self, swh_scheduler, listed_origins): | ||||
▲ Show 20 Lines • Show All 140 Lines • Show Last 20 Lines |