diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -335,69 +335,76 @@ if timestamp is None: timestamp = utcnow() - origin_select_cols = ", ".join(ListedOrigin.select_columns()) + cols = list(ListedOrigin.select_columns()) + cols = ["listed_origins." + col for col in cols] + origin_select_cols = ", ".join(cols) query_args: List[Any] = [] where_clauses = [] - # "NOT enabled" = the lister said the origin no longer exists - where_clauses.append("enabled") - # Only schedule visits of the given type - where_clauses.append("visit_type = %s") + where_clauses.append("origins_to_schedule.visit_type = %s") query_args.append(visit_type) # Don't re-schedule visits if they're already scheduled but we haven't # recorded a result yet, unless they've been scheduled more than a week # ago (it probably means we've lost them in flight somewhere). + where_clauses.append( + """origins_to_schedule.last_scheduled IS NULL + OR origins_to_schedule.last_scheduled < %s - '7 day'::interval + OR ( + origins_to_schedule.last_scheduled - origins_to_schedule.last_visit + < '0'::interval + ) + """ + ) + query_args.append(timestamp) + + # This is somewhat the same query as above, but it double-checks in case the + # view is outdated where_clauses.append( """origin_visit_stats.last_scheduled IS NULL - OR origin_visit_stats.last_scheduled < GREATEST( - %s - '7 day'::interval, + OR origin_visit_stats.last_scheduled < %s - '7 day'::interval + OR origin_visit_stats.last_scheduled - GREATEST( origin_visit_stats.last_eventful, origin_visit_stats.last_uneventful, origin_visit_stats.last_failed, origin_visit_stats.last_notfound - ) + ) < '0'::interval """ ) query_args.append(timestamp) if policy == "oldest_scheduled_first": - order_by = "origin_visit_stats.last_scheduled NULLS FIRST" + order_by = "origins_to_schedule.last_scheduled NULLS FIRST" elif policy == "never_visited_oldest_update_first": # never visited origins have a NULL last_snapshot - where_clauses.append("origin_visit_stats.last_snapshot IS NULL") + where_clauses.append("origins_to_schedule.last_snapshot IS NULL") # order by increasing last_update (oldest first) - where_clauses.append("listed_origins.last_update IS NOT NULL") - order_by = "listed_origins.last_update" + where_clauses.append("origins_to_schedule.last_update IS NOT NULL") + order_by = "origins_to_schedule.last_update" elif policy == "already_visited_order_by_lag": # TODO: store "visit lag" in a materialized view? # visited origins have a NOT NULL last_snapshot - where_clauses.append("origin_visit_stats.last_snapshot IS NOT NULL") + where_clauses.append("origins_to_schedule.last_snapshot IS NOT NULL") # ignore origins we have visited after the known last update - where_clauses.append("listed_origins.last_update IS NOT NULL") + where_clauses.append("origins_to_schedule.last_update IS NOT NULL") where_clauses.append( """ - listed_origins.last_update - > GREATEST( - origin_visit_stats.last_eventful, - origin_visit_stats.last_uneventful - ) + origins_to_schedule.last_update + - origins_to_schedule.last_successful_visit + > '0'::interval """ ) # order by decreasing visit lag order_by = """\ - listed_origins.last_update - - GREATEST( - origin_visit_stats.last_eventful, - origin_visit_stats.last_uneventful - ) + origins_to_schedule.last_update + - origins_to_schedule.last_successful_visit DESC """ else: @@ -407,9 +414,16 @@ SELECT {origin_select_cols} FROM - listed_origins - LEFT JOIN - origin_visit_stats USING (url, visit_type) + origins_to_schedule + INNER JOIN listed_origins ON ( + listed_origins.lister_id=origins_to_schedule.lister_id + AND listed_origins.url=origins_to_schedule.url + AND listed_origins.visit_type=origins_to_schedule.visit_type + ) + LEFT JOIN origin_visit_stats ON ( + origin_visit_stats.url=listed_origins.url + AND origin_visit_stats.visit_type=listed_origins.visit_type + ) WHERE ({") AND (".join(where_clauses)}) ORDER BY @@ -444,8 +458,22 @@ """ query_args.append(timestamp) + """ + print(cur.mogrify(query, tuple(query_args)).decode()) + assert False + """ cur.execute(query, tuple(query_args)) - return [ListedOrigin(**d) for d in cur] + res = [ListedOrigin(**d) for d in cur] + + if len(res) < count: + # not enough results; try again + # TODO: this probably needs to be refreshed more often than this + db.conn.rollback() + cur.execute("REFRESH MATERIALIZED VIEW origins_to_schedule") + + cur.execute(query, tuple(query_args)) + res = [ListedOrigin(**d) for d in cur] + return res task_create_keys = [ "type", diff --git a/swh/scheduler/sql/30-schema.sql b/swh/scheduler/sql/30-schema.sql --- a/swh/scheduler/sql/30-schema.sql +++ b/swh/scheduler/sql/30-schema.sql @@ -186,6 +186,34 @@ comment on column origin_visit_stats.last_snapshot is 'sha1_git of the last visit snapshot'; +create materialized view origins_to_schedule + as select + lister_id, + url, + visit_type, + last_scheduled, + last_update, + last_eventful, + last_uneventful, + last_failed, + last_notfound, + last_snapshot, + greatest( + last_eventful, + last_uneventful, + last_failed, + last_notfound + ) as last_visit, + greatest( + last_eventful, + last_uneventful + ) as last_successful_visit + from listed_origins + left join origin_visit_stats using (url, visit_type) + where + enabled -- "NOT enabled" = the lister said the origin no longer exists +; + create table scheduler_metrics ( lister_id uuid not null references listers(id), visit_type text not null, diff --git a/swh/scheduler/sql/60-indexes.sql b/swh/scheduler/sql/60-indexes.sql --- a/swh/scheduler/sql/60-indexes.sql +++ b/swh/scheduler/sql/60-indexes.sql @@ -21,3 +21,7 @@ -- visit stats create index on origin_visit_stats (url, visit_type); -- XXX probably add indexes on most (visit_type, last_xxx) couples + +-- for the "oldest_scheduled_first" policy +create index origins_to_schedule__oldest_scheduled_first + on origins_to_schedule (visit_type, last_scheduled NULLS FIRST);