Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/backend.py
Show First 20 Lines • Show All 370 Lines • ▼ Show 20 Lines | class SchedulerBackend: | ||||
@db_transaction() | @db_transaction() | ||||
def grab_next_visits( | def grab_next_visits( | ||||
self, | self, | ||||
visit_type: str, | visit_type: str, | ||||
count: int, | count: int, | ||||
policy: str, | policy: str, | ||||
enabled: bool = True, | enabled: bool = True, | ||||
lister_uuid: Optional[str] = None, | lister_uuid: Optional[str] = None, | ||||
lister_name: Optional[str] = None, | |||||
lister_instance_name: Optional[str] = None, | |||||
timestamp: Optional[datetime.datetime] = None, | timestamp: Optional[datetime.datetime] = None, | ||||
absolute_cooldown: Optional[datetime.timedelta] = datetime.timedelta(hours=12), | absolute_cooldown: Optional[datetime.timedelta] = datetime.timedelta(hours=12), | ||||
scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7), | scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7), | ||||
failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14), | failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14), | ||||
not_found_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31), | not_found_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31), | ||||
tablesample: Optional[float] = None, | tablesample: Optional[float] = None, | ||||
db=None, | db=None, | ||||
cur=None, | cur=None, | ||||
) -> List[ListedOrigin]: | ) -> List[ListedOrigin]: | ||||
if timestamp is None: | if timestamp is None: | ||||
timestamp = utcnow() | timestamp = utcnow() | ||||
origin_select_cols = ", ".join(ListedOrigin.select_columns()) | origin_select_cols = ", ".join(ListedOrigin.select_columns()) | ||||
joins: Dict[str, str] = { | |||||
"origin_visit_stats": "USING (url, visit_type)", | |||||
} | |||||
query_args: List[Any] = [] | query_args: List[Any] = [] | ||||
where_clauses = [] | where_clauses = [] | ||||
# list of (name, query) handled as CTEs before the main query | # list of (name, query) handled as CTEs before the main query | ||||
common_table_expressions: List[Tuple[str, str]] = [] | common_table_expressions: List[Tuple[str, str]] = [] | ||||
# "NOT enabled" = the lister said the origin no longer exists | # "NOT enabled" = the lister said the origin no longer exists | ||||
▲ Show 20 Lines • Show All 105 Lines • ▼ Show 20 Lines | ) -> List[ListedOrigin]: | ||||
query_args.insert(0, tablesample) | query_args.insert(0, tablesample) | ||||
else: | else: | ||||
table = "listed_origins" | table = "listed_origins" | ||||
if lister_uuid: | if lister_uuid: | ||||
where_clauses.append("lister_id = %s") | where_clauses.append("lister_id = %s") | ||||
query_args.append(lister_uuid) | query_args.append(lister_uuid) | ||||
if lister_name: | |||||
joins["listers"] = "on listed_origins.lister_id=listers.id" | |||||
where_clauses.append("listers.name = %s") | |||||
query_args.append(lister_name) | |||||
if lister_instance_name: | |||||
joins["listers"] = "on listed_origins.lister_id=listers.id" | |||||
where_clauses.append("listers.instance_name = %s") | |||||
query_args.append(lister_instance_name) | |||||
join_clause = "\n".join( | |||||
f"left join {table} {clause}" for table, clause in joins.items() | |||||
) | |||||
# fmt: off | # fmt: off | ||||
common_table_expressions.insert(0, ("selected_origins", f""" | common_table_expressions.insert(0, ("selected_origins", f""" | ||||
SELECT | SELECT | ||||
{origin_select_cols}, next_visit_queue_position | {origin_select_cols}, next_visit_queue_position | ||||
FROM | FROM | ||||
{table} | {table} | ||||
LEFT JOIN | {join_clause} | ||||
origin_visit_stats USING (url, visit_type) | |||||
WHERE | WHERE | ||||
({") AND (".join(where_clauses)}) | ({") AND (".join(where_clauses)}) | ||||
ORDER BY | ORDER BY | ||||
{order_by} | {order_by} | ||||
LIMIT %s | LIMIT %s | ||||
""")) | """)) | ||||
# fmt: on | # fmt: on | ||||
▲ Show 20 Lines • Show All 627 Lines • Show Last 20 Lines |