diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -345,6 +345,8 @@ query_args: List[Any] = [] where_clauses = [] + # Depending on policies, we may have more updates to do + optional_update_query: str = "" # "NOT enabled" = the lister said the origin no longer exists where_clauses.append("enabled") @@ -425,12 +427,28 @@ ) DESC """ + elif policy == "origins_without_last_update": + where_clauses.append("last_update IS NULL") + order_by = "origin_visit_stats.next_visit_queue_position nulls first" + # This policy requires to update the global queue positions per visit type + optional_update_query = """, + update_queue_position AS ( + INSERT INTO visit_scheduler_queue_position(visit_type, position) + SELECT visit_type, COALESCE(MAX(next_visit_queue_position), now()) + FROM selected_origins + GROUP BY visit_type + ON CONFLICT(visit_type) DO UPDATE + SET position=GREATEST( + visit_scheduler_queue_position.position, EXCLUDED.position + ) + ) + """ else: raise UnknownPolicy(f"Unknown scheduling policy {policy}") select_query = f""" SELECT - {origin_select_cols} + {origin_select_cols}, next_visit_queue_position FROM listed_origins LEFT JOIN @@ -462,8 +480,9 @@ EXCLUDED.last_scheduled ) ) + {optional_update_query} SELECT - * + {origin_select_cols} FROM selected_origins """ diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -6,6 +6,7 @@ from collections import defaultdict import copy import datetime +from datetime import timedelta import inspect import random from typing import Any, Dict, List, Optional, Tuple @@ -29,7 +30,7 @@ tasks_with_priority_from_template, ) -ONEDAY = datetime.timedelta(days=1) +ONEDAY = timedelta(days=1) NUM_PRIORITY_TASKS = {None: 100, "high": 60, "normal": 30, "low": 20} @@ -819,7 +820,7 @@ visit_type=visit_type, count=len(expected) + 1, policy=policy, - timestamp=after + datetime.timedelta(days=7), + timestamp=after + timedelta(days=7), ) # We need to sort them because their 'last_scheduled' field is updated to # exactly the same value, so the order is not deterministic @@ -845,7 +846,7 @@ last_uneventful=None, last_failed=None, last_notfound=None, - last_scheduled=base_date - datetime.timedelta(seconds=i), + last_scheduled=base_date - timedelta(seconds=i), ) for i, origin in enumerate(origins[1:]) ] @@ -903,7 +904,7 @@ ] swh_scheduler.origin_visit_stats_upsert(visit_stats) - cooldown_td = datetime.timedelta(days=cooldown) + cooldown_td = timedelta(days=cooldown) cooldown_args = { "scheduled_cooldown": None, "failed_cooldown": None, @@ -915,7 +916,7 @@ visit_type=visit_type, count=len(expected) + 1, policy="oldest_scheduled_first", - timestamp=after + cooldown_td - datetime.timedelta(seconds=1), + timestamp=after + cooldown_td - timedelta(seconds=1), **cooldown_args, ) @@ -925,7 +926,7 @@ visit_type=visit_type, count=len(expected) + 1, policy="oldest_scheduled_first", - timestamp=after + cooldown_td + datetime.timedelta(seconds=1), + timestamp=after + cooldown_td + timedelta(seconds=1), **cooldown_args, ) @@ -943,7 +944,7 @@ # Update known origins with a `last_update` field that we control base_date = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) updated_origins = [ - attr.evolve(origin, last_update=base_date - datetime.timedelta(seconds=i)) + attr.evolve(origin, last_update=base_date - timedelta(seconds=i)) for i, origin in enumerate(origins) ] updated_origins = swh_scheduler.record_listed_origins(updated_origins) @@ -969,7 +970,7 @@ # Update known origins with a `last_update` field that we control base_date = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) updated_origins = [ - attr.evolve(origin, last_update=base_date - datetime.timedelta(seconds=i)) + attr.evolve(origin, last_update=base_date - timedelta(seconds=i)) for i, origin in enumerate(origins) ] updated_origins = swh_scheduler.record_listed_origins(updated_origins) @@ -1024,6 +1025,137 @@ assert len(ret) == 5 + def test_grab_next_visits_no_last_update_nor_visit_stats( + self, swh_scheduler, listed_origins_by_type + ): + """grab_next_visits should retrieve tasks without last update (nor visit stats) + + """ + visit_type = next(iter(listed_origins_by_type)) + + origins = [] + for origin in listed_origins_by_type[visit_type]: + origins.append( + attr.evolve(origin, last_update=None) + ) # void the last update so we are in the relevant context + + assert len(origins) > 0 + + swh_scheduler.record_listed_origins(origins) + + # Initially, we have no global queue position + actual_state = swh_scheduler.visit_scheduler_queue_position_get() + assert actual_state == {} + + # nor any visit statuses + actual_visit_stats = swh_scheduler.origin_visit_stats_get( + (o.url, o.visit_type) for o in origins + ) + assert len(actual_visit_stats) == 0 + + # Grab some new visits + next_visits = swh_scheduler.grab_next_visits( + visit_type, count=len(origins), policy="origins_without_last_update", + ) + # we do have the one without any last update + assert len(next_visits) == len(origins) + + # Now the global state got updated + actual_state = swh_scheduler.visit_scheduler_queue_position_get() + assert actual_state[visit_type] is not None + + actual_visit_stats = swh_scheduler.origin_visit_stats_get( + (o.url, o.visit_type) for o in next_visits + ) + + # Visit stats got algo created + assert len(actual_visit_stats) == len(origins) + + def test_grab_next_visits_no_last_update_with_visit_stats( + self, swh_scheduler, listed_origins_by_type + ): + """grab_next_visits should retrieve tasks without last update""" + visit_type = next(iter(listed_origins_by_type)) + + origins = [] + for origin in listed_origins_by_type[visit_type]: + origins.append( + attr.evolve(origin, last_update=None) + ) # void the last update so we are in the relevant context + + assert len(origins) > 0 + + swh_scheduler.record_listed_origins(origins) + + # Initially, we have no global queue position + actual_state = swh_scheduler.visit_scheduler_queue_position_get() + assert actual_state == {} + + date_now = utcnow() + # Simulate some of those origins have associated visit stats (some with an + # existing queue position and some without any) + visit_stats = ( + [ + OriginVisitStats( + url=origin.url, + visit_type=origin.visit_type, + last_eventful=utcnow(), + last_uneventful=None, + last_failed=None, + last_notfound=None, + next_visit_queue_position=date_now + + timedelta(days=random.uniform(-10, 1)), + ) + for origin in origins[:100] + ] + + [ + OriginVisitStats( + url=origin.url, + visit_type=origin.visit_type, + last_eventful=utcnow(), + last_uneventful=None, + last_failed=None, + last_notfound=None, + next_visit_queue_position=date_now + + timedelta(days=random.uniform(1, 10)), # definitely > now() + ) + for origin in origins[100:150] + ] + + [ + OriginVisitStats( + url=origin.url, + visit_type=origin.visit_type, + last_eventful=utcnow(), + last_uneventful=None, + last_failed=None, + last_notfound=None, + ) + for origin in origins[150:] + ] + ) + + swh_scheduler.origin_visit_stats_upsert(visit_stats) + + # Grab next visits + actual_visits = swh_scheduler.grab_next_visits( + visit_type, count=len(origins), policy="origins_without_last_update", + ) + assert len(actual_visits) == len(origins) + + actual_visit_stats = swh_scheduler.origin_visit_stats_get( + (o.url, o.visit_type) for o in actual_visits + ) + assert len(actual_visit_stats) == len(origins) + + actual_state = swh_scheduler.visit_scheduler_queue_position_get() + assert actual_state == { + visit_type: max( + s.next_visit_queue_position + for s in actual_visit_stats + if s.next_visit_queue_position is not None + ) + } + def test_grab_next_visits_unknown_policy(self, swh_scheduler): unknown_policy = "non_existing_policy" NUM_RESULTS = 5 @@ -1287,8 +1419,7 @@ OriginVisitStats( url=visited_origin.url, visit_type=visited_origin.visit_type, - last_eventful=visited_origin.last_update - - datetime.timedelta(days=1), + last_eventful=visited_origin.last_update - timedelta(days=1), last_uneventful=None, last_failed=None, last_notfound=None, @@ -1334,7 +1465,7 @@ ret = swh_scheduler.update_metrics(timestamp=ts) assert all(metric.last_update == ts for metric in ret) - second_ts = ts + datetime.timedelta(seconds=1) + second_ts = ts + timedelta(seconds=1) ret = swh_scheduler.update_metrics(timestamp=second_ts) assert all(metric.last_update == second_ts for metric in ret)