diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -19,7 +19,11 @@ from swh.scheduler.utils import utcnow from .exc import SchedulerException, StaleData, UnknownPolicy -from .interface import ListedOriginPageToken, PaginatedListedOriginList +from .interface import ( + SCHEDULING_POLICY_ORIGINS_NO_LAST_UPDATE, + ListedOriginPageToken, + PaginatedListedOriginList, +) from .model import ListedOrigin, Lister, OriginVisitStats, SchedulerMetrics logger = logging.getLogger(__name__) @@ -340,7 +344,9 @@ if timestamp is None: timestamp = utcnow() - origin_select_cols = ", ".join(ListedOrigin.select_columns()) + origin_select_cols = ", ".join( + f"lo.{col}" for col in ListedOrigin.select_columns() + ) query_args: List[Any] = [] @@ -350,7 +356,7 @@ where_clauses.append("enabled") # Only schedule visits of the given type - where_clauses.append("visit_type = %s") + where_clauses.append("lo.visit_type = %s") query_args.append(visit_type) if scheduled_cooldown: @@ -396,8 +402,8 @@ where_clauses.append("origin_visit_stats.last_snapshot IS NULL") # order by increasing last_update (oldest first) - where_clauses.append("listed_origins.last_update IS NOT NULL") - order_by = "listed_origins.last_update" + where_clauses.append("lo.last_update IS NOT NULL") + order_by = "lo.last_update" elif policy == "already_visited_order_by_lag": # TODO: store "visit lag" in a materialized view? @@ -405,10 +411,10 @@ where_clauses.append("origin_visit_stats.last_snapshot IS NOT NULL") # ignore origins we have visited after the known last update - where_clauses.append("listed_origins.last_update IS NOT NULL") + where_clauses.append("lo.last_update IS NOT NULL") where_clauses.append( """ - listed_origins.last_update + lo.last_update > GREATEST( origin_visit_stats.last_eventful, origin_visit_stats.last_uneventful @@ -418,13 +424,27 @@ # order by decreasing visit lag order_by = """\ - listed_origins.last_update + lo.last_update - GREATEST( origin_visit_stats.last_eventful, origin_visit_stats.last_uneventful ) DESC """ + elif policy == SCHEDULING_POLICY_ORIGINS_NO_LAST_UPDATE: + where_clauses.append("lo.last_update IS NULL") + where_clauses.append( + "origin_visit_stats.next_visit_queue_position IS NOT NULL" + ) + where_clauses.append( + "vsqp.position <= origin_visit_stats.next_visit_queue_position" + ) + # TODO: Determine if it's actually what we want... this chooses an interval + # of one day to fetch visits to schedule... + where_clauses.append( + "origin_visit_stats.next_visit_queue_position < vsqp.position + '1 day'" + ) + order_by = "lo.last_update" else: raise UnknownPolicy(f"Unknown scheduling policy {policy}") @@ -432,9 +452,12 @@ SELECT {origin_select_cols} FROM - listed_origins + listed_origins lo LEFT JOIN origin_visit_stats USING (url, visit_type) + LEFT JOIN + visit_scheduler_queue_position vsqp + on vsqp.visit_type = lo.visit_type WHERE ({") AND (".join(where_clauses)}) ORDER BY diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py --- a/swh/scheduler/interface.py +++ b/swh/scheduler/interface.py @@ -16,6 +16,8 @@ ListedOriginPageToken = Tuple[str, str] +SCHEDULING_POLICY_ORIGINS_NO_LAST_UPDATE = "origins_without_last_update" + class PaginatedListedOriginList(PagedResult[ListedOrigin, ListedOriginPageToken]): """A list of listed origins, with a continuation token""" diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -6,6 +6,7 @@ from collections import defaultdict import copy import datetime +from datetime import timedelta import inspect import random from typing import Any, Dict, List, Optional, Tuple @@ -17,7 +18,11 @@ from swh.model.hashutil import hash_to_bytes from swh.scheduler.exc import SchedulerException, StaleData, UnknownPolicy -from swh.scheduler.interface import ListedOriginPageToken, SchedulerInterface +from swh.scheduler.interface import ( + SCHEDULING_POLICY_ORIGINS_NO_LAST_UPDATE, + ListedOriginPageToken, + SchedulerInterface, +) from swh.scheduler.model import ListedOrigin, OriginVisitStats, SchedulerMetrics from swh.scheduler.utils import utcnow @@ -29,7 +34,7 @@ tasks_with_priority_from_template, ) -ONEDAY = datetime.timedelta(days=1) +ONEDAY = timedelta(days=1) NUM_PRIORITY_TASKS = {None: 100, "high": 60, "normal": 30, "low": 20} @@ -819,7 +824,7 @@ visit_type=visit_type, count=len(expected) + 1, policy=policy, - timestamp=after + datetime.timedelta(days=7), + timestamp=after + timedelta(days=7), ) # We need to sort them because their 'last_scheduled' field is updated to # exactly the same value, so the order is not deterministic @@ -845,7 +850,7 @@ last_uneventful=None, last_failed=None, last_notfound=None, - last_scheduled=base_date - datetime.timedelta(seconds=i), + last_scheduled=base_date - timedelta(seconds=i), ) for i, origin in enumerate(origins[1:]) ] @@ -903,7 +908,7 @@ ] swh_scheduler.origin_visit_stats_upsert(visit_stats) - cooldown_td = datetime.timedelta(days=cooldown) + cooldown_td = timedelta(days=cooldown) cooldown_args = { "scheduled_cooldown": None, "failed_cooldown": None, @@ -915,7 +920,7 @@ visit_type=visit_type, count=len(expected) + 1, policy="oldest_scheduled_first", - timestamp=after + cooldown_td - datetime.timedelta(seconds=1), + timestamp=after + cooldown_td - timedelta(seconds=1), **cooldown_args, ) @@ -925,7 +930,7 @@ visit_type=visit_type, count=len(expected) + 1, policy="oldest_scheduled_first", - timestamp=after + cooldown_td + datetime.timedelta(seconds=1), + timestamp=after + cooldown_td + timedelta(seconds=1), **cooldown_args, ) @@ -943,7 +948,7 @@ # Update known origins with a `last_update` field that we control base_date = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) updated_origins = [ - attr.evolve(origin, last_update=base_date - datetime.timedelta(seconds=i)) + attr.evolve(origin, last_update=base_date - timedelta(seconds=i)) for i, origin in enumerate(origins) ] updated_origins = swh_scheduler.record_listed_origins(updated_origins) @@ -969,7 +974,7 @@ # Update known origins with a `last_update` field that we control base_date = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) updated_origins = [ - attr.evolve(origin, last_update=base_date - datetime.timedelta(seconds=i)) + attr.evolve(origin, last_update=base_date - timedelta(seconds=i)) for i, origin in enumerate(origins) ] updated_origins = swh_scheduler.record_listed_origins(updated_origins) @@ -1024,6 +1029,98 @@ assert len(ret) == 5 + def test_grab_next_visits_no_last_update( + self, swh_scheduler, listed_origins_by_type + ): + """Check that grab_next_visits retrieve tasks without last update information""" + visit_type = next(iter(listed_origins_by_type)) + + origins = [] + for origin in listed_origins_by_type[visit_type]: + origins.append( + attr.evolve(origin, last_update=None) + ) # void the last update + + assert len(origins) > 0 + + swh_scheduler.record_listed_origins(origins[:900]) + + # As visit_scheduler_queue_position state and no origin_visit_stats are prepared + # yet + actual_state = swh_scheduler.visit_scheduler_queue_position_get() + assert actual_state == {} + + ret = swh_scheduler.grab_next_visits( + visit_type, len(origins), policy=SCHEDULING_POLICY_ORIGINS_NO_LAST_UPDATE, + ) + + # Nothing will get fetched + assert len(ret) == 0 + + state_position = utcnow() - timedelta(days=1) + # Prepare the visit_scheduler_queue_position state for the visit type concerned + # here + swh_scheduler.visit_scheduler_queue_position_set(visit_type, state_position) + + ret = swh_scheduler.grab_next_visits( + visit_type, len(origins), policy=SCHEDULING_POLICY_ORIGINS_NO_LAST_UPDATE, + ) + + # No origin visit stats so nothing get scheduled + assert len(ret) == 0 + + relevant_origin_index = 900 + # Simulate the journal client updated information in the right range (1 day) + visit_stats = [ + OriginVisitStats( + url=origin.url, + visit_type=origin.visit_type, + last_eventful=utcnow(), + last_uneventful=None, + last_failed=None, + last_notfound=None, + next_visit_queue_position=state_position + + timedelta(days=random.choice([d * 0.1 for d in range(0, 10)])), + ) + for origin in origins[:relevant_origin_index] # are relevant for test + ] + [ + # those visit statuses whose queue position is "far" in the future gets + # ignored + OriginVisitStats( + url=origin.url, + visit_type=origin.visit_type, + last_eventful=utcnow(), + last_uneventful=None, + last_failed=None, + last_notfound=None, + next_visit_queue_position=state_position + + timedelta(days=random.choice(range(2, 10))), + ) + for origin in origins[relevant_origin_index:] # will get ignored + ] + + swh_scheduler.origin_visit_stats_upsert(visit_stats) + actual_visits = swh_scheduler.grab_next_visits( + visit_type, len(origins), policy=SCHEDULING_POLICY_ORIGINS_NO_LAST_UPDATE, + ) + assert len(actual_visits) == len(origins[:900]) + + visit_stats = swh_scheduler.origin_visit_stats_get( + (o.url, o.visit_type) for o in actual_visits + ) + + assert len(visit_stats) == len(actual_visits) + for visit_stat, origin in zip(visit_stats, actual_visits): + assert origin.url == visit_stat.url + assert origin.visit_type == visit_stat.visit_type + assert origin.last_update is None + + assert ( + state_position <= visit_stat.next_visit_queue_position + and visit_stat.next_visit_queue_position + < state_position + timedelta(days=1) + ) + def test_grab_next_visits_unknown_policy(self, swh_scheduler): unknown_policy = "non_existing_policy" NUM_RESULTS = 5 @@ -1287,8 +1384,7 @@ OriginVisitStats( url=visited_origin.url, visit_type=visited_origin.visit_type, - last_eventful=visited_origin.last_update - - datetime.timedelta(days=1), + last_eventful=visited_origin.last_update - timedelta(days=1), last_uneventful=None, last_failed=None, last_notfound=None, @@ -1334,7 +1430,7 @@ ret = swh_scheduler.update_metrics(timestamp=ts) assert all(metric.last_update == ts for metric in ret) - second_ts = ts + datetime.timedelta(seconds=1) + second_ts = ts + timedelta(seconds=1) ret = swh_scheduler.update_metrics(timestamp=second_ts) assert all(metric.last_update == second_ts for metric in ret)