Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/backend.py
Show First 20 Lines • Show All 311 Lines • ▼ Show 20 Lines | class SchedulerBackend: | ||||
@db_transaction() | @db_transaction() | ||||
def grab_next_visits( | def grab_next_visits( | ||||
self, | self, | ||||
visit_type: str, | visit_type: str, | ||||
count: int, | count: int, | ||||
policy: str, | policy: str, | ||||
timestamp: Optional[datetime.datetime] = None, | timestamp: Optional[datetime.datetime] = None, | ||||
scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7), | scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7), | ||||
failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14), | |||||
notfound_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31), | |||||
tablesample: Optional[float] = None, | |||||
db=None, | db=None, | ||||
cur=None, | cur=None, | ||||
) -> List[ListedOrigin]: | ) -> List[ListedOrigin]: | ||||
if timestamp is None: | if timestamp is None: | ||||
timestamp = utcnow() | timestamp = utcnow() | ||||
origin_select_cols = ", ".join(ListedOrigin.select_columns()) | origin_select_cols = ", ".join(ListedOrigin.select_columns()) | ||||
Show All 21 Lines | ) -> List[ListedOrigin]: | ||||
origin_visit_stats.last_failed, | origin_visit_stats.last_failed, | ||||
origin_visit_stats.last_notfound | origin_visit_stats.last_notfound | ||||
) | ) | ||||
""" | """ | ||||
) | ) | ||||
query_args.append(timestamp) | query_args.append(timestamp) | ||||
query_args.append(scheduled_cooldown) | query_args.append(scheduled_cooldown) | ||||
if failed_cooldown: | |||||
# Don't retry failed origins too often | |||||
where_clauses.append( | |||||
"origin_visit_stats.last_failed is null " | |||||
"or origin_visit_stats.last_failed < %s - %s" | |||||
) | |||||
query_args.append(timestamp) | |||||
query_args.append(failed_cooldown) | |||||
if notfound_cooldown: | |||||
# Don't retry not found origins too often | |||||
where_clauses.append( | |||||
"origin_visit_stats.last_notfound is null " | |||||
"or origin_visit_stats.last_notfound < %s - %s" | |||||
) | |||||
query_args.append(timestamp) | |||||
query_args.append(notfound_cooldown) | |||||
if policy == "oldest_scheduled_first": | if policy == "oldest_scheduled_first": | ||||
order_by = "origin_visit_stats.last_scheduled NULLS FIRST" | order_by = "origin_visit_stats.last_scheduled NULLS FIRST" | ||||
elif policy == "never_visited_oldest_update_first": | elif policy == "never_visited_oldest_update_first": | ||||
# never visited origins have a NULL last_snapshot | # never visited origins have a NULL last_snapshot | ||||
where_clauses.append("origin_visit_stats.last_snapshot IS NULL") | where_clauses.append("origin_visit_stats.last_snapshot IS NULL") | ||||
# order by increasing last_update (oldest first) | # order by increasing last_update (oldest first) | ||||
where_clauses.append("listed_origins.last_update IS NOT NULL") | where_clauses.append("listed_origins.last_update IS NOT NULL") | ||||
order_by = "listed_origins.last_update" | order_by = "listed_origins.last_update" | ||||
elif policy == "never_visited_unknown_last_update": | |||||
# never visited origins have a NULL last_snapshot | |||||
where_clauses.append("origin_visit_stats.last_snapshot IS NULL") | |||||
# Unknown last update | |||||
where_clauses.append("listed_origins.last_update IS NULL") | |||||
# Try to get at the oldest origins first | |||||
order_by = "listed_origins.first_seen" | |||||
elif policy == "already_visited_order_by_lag": | elif policy == "already_visited_order_by_lag": | ||||
# TODO: store "visit lag" in a materialized view? | # TODO: store "visit lag" in a materialized view? | ||||
# visited origins have a NOT NULL last_snapshot | # visited origins have a NOT NULL last_snapshot | ||||
where_clauses.append("origin_visit_stats.last_snapshot IS NOT NULL") | where_clauses.append("origin_visit_stats.last_snapshot IS NOT NULL") | ||||
# ignore origins we have visited after the known last update | # ignore origins we have visited after the known last update | ||||
where_clauses.append("listed_origins.last_update IS NOT NULL") | where_clauses.append("listed_origins.last_update IS NOT NULL") | ||||
Show All 14 Lines | ) -> List[ListedOrigin]: | ||||
origin_visit_stats.last_eventful, | origin_visit_stats.last_eventful, | ||||
origin_visit_stats.last_uneventful | origin_visit_stats.last_uneventful | ||||
) | ) | ||||
DESC | DESC | ||||
""" | """ | ||||
else: | else: | ||||
raise UnknownPolicy(f"Unknown scheduling policy {policy}") | raise UnknownPolicy(f"Unknown scheduling policy {policy}") | ||||
if tablesample: | |||||
table = "listed_origins tablesample SYSTEM (%s)" | |||||
query_args.insert(0, tablesample) | |||||
else: | |||||
table = "listed_origins" | |||||
select_query = f""" | select_query = f""" | ||||
SELECT | SELECT | ||||
{origin_select_cols} | {origin_select_cols} | ||||
FROM | FROM | ||||
listed_origins | {table} | ||||
LEFT JOIN | LEFT JOIN | ||||
origin_visit_stats USING (url, visit_type) | origin_visit_stats USING (url, visit_type) | ||||
WHERE | WHERE | ||||
({") AND (".join(where_clauses)}) | ({") AND (".join(where_clauses)}) | ||||
ORDER BY | ORDER BY | ||||
{order_by} | {order_by} | ||||
LIMIT %s | LIMIT %s | ||||
""" | """ | ||||
▲ Show 20 Lines • Show All 591 Lines • Show Last 20 Lines |