diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -342,6 +342,33 @@ # order by increasing last_update (oldest first) where_clauses.append("listed_origins.last_update IS NOT NULL") order_by = "listed_origins.last_update" + elif policy == "already_visited_order_by_lag": + # TODO: store "visit lag" in a materialized view? + + # visited origins have a NOT NULL last_snapshot + where_clauses.append("origin_visit_stats.last_snapshot IS NOT NULL") + + # ignore origins we have visited after the known last update + where_clauses.append("listed_origins.last_update IS NOT NULL") + where_clauses.append( + """ + listed_origins.last_update + > GREATEST( + origin_visit_stats.last_eventful, + origin_visit_stats.last_uneventful + ) + """ + ) + + # order by decreasing visit lag + order_by = """\ + listed_origins.last_update + - GREATEST( + origin_visit_stats.last_eventful, + origin_visit_stats.last_uneventful + ) + DESC + """ else: raise UnknownPolicy(f"Unknown scheduling policy {policy}") diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -851,6 +851,54 @@ expected=expected_origins, ) + def test_grab_next_visits_already_visited_order_by_lag( + self, swh_scheduler, listed_origins_by_type, + ): + visit_type, origins = self._grab_next_visits_setup( + swh_scheduler, listed_origins_by_type + ) + + # Update known origins with a `last_update` field that we control + base_date = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) + updated_origins = [ + attr.evolve(origin, last_update=base_date - datetime.timedelta(seconds=i)) + for i, origin in enumerate(origins) + ] + updated_origins = swh_scheduler.record_listed_origins(updated_origins) + + # Update the visit stats with a known visit at a controlled date for + # half the origins. Pick the date in the middle of the + # updated_origins' `last_update` range + visit_date = updated_origins[len(updated_origins) // 2].last_update + visited_origins = updated_origins[::2] + visit_stats = [ + OriginVisitStats( + url=origin.url, + visit_type=origin.visit_type, + last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + last_eventful=visit_date, + last_uneventful=None, + last_failed=None, + last_notfound=None, + ) + for origin in visited_origins + ] + swh_scheduler.origin_visit_stats_upsert(visit_stats) + + # We expect to retrieve visited origins with the largest lag, but only + # those which haven't been visited since their last update + expected_origins = sorted( + [origin for origin in visited_origins if origin.last_update > visit_date], + key=lambda o: visit_date - o.last_update, + ) + + self._check_grab_next_visit( + swh_scheduler, + visit_type=visit_type, + policy="already_visited_order_by_lag", + expected=expected_origins, + ) + def test_grab_next_visits_underflow(self, swh_scheduler, listed_origins_by_type): """Check that grab_next_visits works when there not enough origins in the database"""