diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -346,6 +346,9 @@ where_clauses = [] + # list of (name, query) handled as CTEs before the main query + common_table_expressions: List[Tuple[str, str]] = [] + # "NOT enabled" = the lister said the origin no longer exists where_clauses.append("enabled") @@ -425,49 +428,78 @@ ) DESC """ + elif policy == "origins_without_last_update": + where_clauses.append("last_update IS NULL") + order_by = "origin_visit_stats.next_visit_queue_position nulls first" + + # fmt: off + + # This policy requires updating the global queue position for this + # visit type + common_table_expressions.append(("update_queue_position", """ + INSERT INTO + visit_scheduler_queue_position(visit_type, position) + SELECT + visit_type, COALESCE(MAX(next_visit_queue_position), now()) + FROM selected_origins + GROUP BY visit_type + ON CONFLICT(visit_type) DO UPDATE + SET position=GREATEST( + visit_scheduler_queue_position.position, EXCLUDED.position + ) + """)) + # fmt: on else: raise UnknownPolicy(f"Unknown scheduling policy {policy}") - select_query = f""" - SELECT - {origin_select_cols} - FROM - listed_origins - LEFT JOIN - origin_visit_stats USING (url, visit_type) - WHERE - ({") AND (".join(where_clauses)}) - ORDER BY - {order_by} - LIMIT %s - """ + # fmt: off + common_table_expressions.insert(0, ("selected_origins", f""" + SELECT + {origin_select_cols}, next_visit_queue_position + FROM + listed_origins + LEFT JOIN + origin_visit_stats USING (url, visit_type) + WHERE + ({") AND (".join(where_clauses)}) + ORDER BY + {order_by} + LIMIT %s + """)) + # fmt: on + query_args.append(count) + # fmt: off + common_table_expressions.append(("update_stats", """ + INSERT INTO + origin_visit_stats (url, visit_type, last_scheduled) + SELECT + url, visit_type, %s + FROM + selected_origins + ON CONFLICT (url, visit_type) DO UPDATE + SET last_scheduled = GREATEST( + origin_visit_stats.last_scheduled, + EXCLUDED.last_scheduled + ) + """)) + # fmt: on + + query_args.append(timestamp) + + formatted_ctes = ",\n".join( + f"{name} AS (\n{cte}\n)" for name, cte in common_table_expressions + ) + query = f""" - WITH selected_origins AS ( - {select_query} - ), - update_stats AS ( - INSERT INTO - origin_visit_stats ( - url, visit_type, last_scheduled - ) - SELECT - url, visit_type, %s - FROM - selected_origins - ON CONFLICT (url, visit_type) DO UPDATE - SET last_scheduled = GREATEST( - origin_visit_stats.last_scheduled, - EXCLUDED.last_scheduled - ) - ) + WITH + {formatted_ctes} SELECT - * + {origin_select_cols} FROM selected_origins """ - query_args.append(timestamp) cur.execute(query, tuple(query_args)) return [ListedOrigin(**d) for d in cur] diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -6,6 +6,7 @@ from collections import defaultdict import copy import datetime +from datetime import timedelta import inspect import random from typing import Any, Dict, List, Optional, Tuple @@ -29,7 +30,7 @@ tasks_with_priority_from_template, ) -ONEDAY = datetime.timedelta(days=1) +ONEDAY = timedelta(days=1) NUM_PRIORITY_TASKS = {None: 100, "high": 60, "normal": 30, "low": 20} @@ -819,7 +820,7 @@ visit_type=visit_type, count=len(expected) + 1, policy=policy, - timestamp=after + datetime.timedelta(days=7), + timestamp=after + timedelta(days=7), ) # We need to sort them because their 'last_scheduled' field is updated to # exactly the same value, so the order is not deterministic @@ -845,7 +846,7 @@ last_uneventful=None, last_failed=None, last_notfound=None, - last_scheduled=base_date - datetime.timedelta(seconds=i), + last_scheduled=base_date - timedelta(seconds=i), ) for i, origin in enumerate(origins[1:]) ] @@ -903,7 +904,7 @@ ] swh_scheduler.origin_visit_stats_upsert(visit_stats) - cooldown_td = datetime.timedelta(days=cooldown) + cooldown_td = timedelta(days=cooldown) cooldown_args = { "scheduled_cooldown": None, "failed_cooldown": None, @@ -915,7 +916,7 @@ visit_type=visit_type, count=len(expected) + 1, policy="oldest_scheduled_first", - timestamp=after + cooldown_td - datetime.timedelta(seconds=1), + timestamp=after + cooldown_td - timedelta(seconds=1), **cooldown_args, ) @@ -925,7 +926,7 @@ visit_type=visit_type, count=len(expected) + 1, policy="oldest_scheduled_first", - timestamp=after + cooldown_td + datetime.timedelta(seconds=1), + timestamp=after + cooldown_td + timedelta(seconds=1), **cooldown_args, ) @@ -943,7 +944,7 @@ # Update known origins with a `last_update` field that we control base_date = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) updated_origins = [ - attr.evolve(origin, last_update=base_date - datetime.timedelta(seconds=i)) + attr.evolve(origin, last_update=base_date - timedelta(seconds=i)) for i, origin in enumerate(origins) ] updated_origins = swh_scheduler.record_listed_origins(updated_origins) @@ -969,7 +970,7 @@ # Update known origins with a `last_update` field that we control base_date = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) updated_origins = [ - attr.evolve(origin, last_update=base_date - datetime.timedelta(seconds=i)) + attr.evolve(origin, last_update=base_date - timedelta(seconds=i)) for i, origin in enumerate(origins) ] updated_origins = swh_scheduler.record_listed_origins(updated_origins) @@ -1024,6 +1025,137 @@ assert len(ret) == 5 + def test_grab_next_visits_no_last_update_nor_visit_stats( + self, swh_scheduler, listed_origins_by_type + ): + """grab_next_visits should retrieve tasks without last update (nor visit stats) + + """ + visit_type = next(iter(listed_origins_by_type)) + + origins = [] + for origin in listed_origins_by_type[visit_type]: + origins.append( + attr.evolve(origin, last_update=None) + ) # void the last update so we are in the relevant context + + assert len(origins) > 0 + + swh_scheduler.record_listed_origins(origins) + + # Initially, we have no global queue position + actual_state = swh_scheduler.visit_scheduler_queue_position_get() + assert actual_state == {} + + # nor any visit statuses + actual_visit_stats = swh_scheduler.origin_visit_stats_get( + (o.url, o.visit_type) for o in origins + ) + assert len(actual_visit_stats) == 0 + + # Grab some new visits + next_visits = swh_scheduler.grab_next_visits( + visit_type, count=len(origins), policy="origins_without_last_update", + ) + # we do have the one without any last update + assert len(next_visits) == len(origins) + + # Now the global state got updated + actual_state = swh_scheduler.visit_scheduler_queue_position_get() + assert actual_state[visit_type] is not None + + actual_visit_stats = swh_scheduler.origin_visit_stats_get( + (o.url, o.visit_type) for o in next_visits + ) + + # Visit stats got algo created + assert len(actual_visit_stats) == len(origins) + + def test_grab_next_visits_no_last_update_with_visit_stats( + self, swh_scheduler, listed_origins_by_type + ): + """grab_next_visits should retrieve tasks without last update""" + visit_type = next(iter(listed_origins_by_type)) + + origins = [] + for origin in listed_origins_by_type[visit_type]: + origins.append( + attr.evolve(origin, last_update=None) + ) # void the last update so we are in the relevant context + + assert len(origins) > 0 + + swh_scheduler.record_listed_origins(origins) + + # Initially, we have no global queue position + actual_state = swh_scheduler.visit_scheduler_queue_position_get() + assert actual_state == {} + + date_now = utcnow() + # Simulate some of those origins have associated visit stats (some with an + # existing queue position and some without any) + visit_stats = ( + [ + OriginVisitStats( + url=origin.url, + visit_type=origin.visit_type, + last_eventful=utcnow(), + last_uneventful=None, + last_failed=None, + last_notfound=None, + next_visit_queue_position=date_now + + timedelta(days=random.uniform(-10, 1)), + ) + for origin in origins[:100] + ] + + [ + OriginVisitStats( + url=origin.url, + visit_type=origin.visit_type, + last_eventful=utcnow(), + last_uneventful=None, + last_failed=None, + last_notfound=None, + next_visit_queue_position=date_now + + timedelta(days=random.uniform(1, 10)), # definitely > now() + ) + for origin in origins[100:150] + ] + + [ + OriginVisitStats( + url=origin.url, + visit_type=origin.visit_type, + last_eventful=utcnow(), + last_uneventful=None, + last_failed=None, + last_notfound=None, + ) + for origin in origins[150:] + ] + ) + + swh_scheduler.origin_visit_stats_upsert(visit_stats) + + # Grab next visits + actual_visits = swh_scheduler.grab_next_visits( + visit_type, count=len(origins), policy="origins_without_last_update", + ) + assert len(actual_visits) == len(origins) + + actual_visit_stats = swh_scheduler.origin_visit_stats_get( + (o.url, o.visit_type) for o in actual_visits + ) + assert len(actual_visit_stats) == len(origins) + + actual_state = swh_scheduler.visit_scheduler_queue_position_get() + assert actual_state == { + visit_type: max( + s.next_visit_queue_position + for s in actual_visit_stats + if s.next_visit_queue_position is not None + ) + } + def test_grab_next_visits_unknown_policy(self, swh_scheduler): unknown_policy = "non_existing_policy" NUM_RESULTS = 5 @@ -1287,8 +1419,7 @@ OriginVisitStats( url=visited_origin.url, visit_type=visited_origin.visit_type, - last_eventful=visited_origin.last_update - - datetime.timedelta(days=1), + last_eventful=visited_origin.last_update - timedelta(days=1), last_uneventful=None, last_failed=None, last_notfound=None, @@ -1334,7 +1465,7 @@ ret = swh_scheduler.update_metrics(timestamp=ts) assert all(metric.last_update == ts for metric in ret) - second_ts = ts + datetime.timedelta(seconds=1) + second_ts = ts + timedelta(seconds=1) ret = swh_scheduler.update_metrics(timestamp=second_ts) assert all(metric.last_update == second_ts for metric in ret)