Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/backend.py
Show All 13 Lines | |||||||||||||||||||||
import psycopg2.extras | import psycopg2.extras | ||||||||||||||||||||
import psycopg2.pool | import psycopg2.pool | ||||||||||||||||||||
from swh.core.db import BaseDb | from swh.core.db import BaseDb | ||||||||||||||||||||
from swh.core.db.common import db_transaction | from swh.core.db.common import db_transaction | ||||||||||||||||||||
from swh.scheduler.utils import utcnow | from swh.scheduler.utils import utcnow | ||||||||||||||||||||
from .exc import SchedulerException, StaleData, UnknownPolicy | from .exc import SchedulerException, StaleData, UnknownPolicy | ||||||||||||||||||||
from .interface import ListedOriginPageToken, PaginatedListedOriginList | from .interface import ( | ||||||||||||||||||||
SCHEDULING_POLICY_ORIGINS_NO_LAST_UPDATE, | |||||||||||||||||||||
ListedOriginPageToken, | |||||||||||||||||||||
PaginatedListedOriginList, | |||||||||||||||||||||
) | |||||||||||||||||||||
from .model import ListedOrigin, Lister, OriginVisitStats, SchedulerMetrics | from .model import ListedOrigin, Lister, OriginVisitStats, SchedulerMetrics | ||||||||||||||||||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||||||||||||||||||
psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) | psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) | ||||||||||||||||||||
psycopg2.extras.register_uuid() | psycopg2.extras.register_uuid() | ||||||||||||||||||||
▲ Show 20 Lines • Show All 304 Lines • ▼ Show 20 Lines | def grab_next_visits( | ||||||||||||||||||||
failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14), | failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14), | ||||||||||||||||||||
notfound_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31), | notfound_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31), | ||||||||||||||||||||
db=None, | db=None, | ||||||||||||||||||||
cur=None, | cur=None, | ||||||||||||||||||||
) -> List[ListedOrigin]: | ) -> List[ListedOrigin]: | ||||||||||||||||||||
if timestamp is None: | if timestamp is None: | ||||||||||||||||||||
timestamp = utcnow() | timestamp = utcnow() | ||||||||||||||||||||
origin_select_cols = ", ".join(ListedOrigin.select_columns()) | origin_select_cols = ", ".join( | ||||||||||||||||||||
f"lo.{col}" for col in ListedOrigin.select_columns() | |||||||||||||||||||||
) | |||||||||||||||||||||
query_args: List[Any] = [] | query_args: List[Any] = [] | ||||||||||||||||||||
where_clauses = [] | where_clauses = [] | ||||||||||||||||||||
# "NOT enabled" = the lister said the origin no longer exists | # "NOT enabled" = the lister said the origin no longer exists | ||||||||||||||||||||
where_clauses.append("enabled") | where_clauses.append("enabled") | ||||||||||||||||||||
# Only schedule visits of the given type | # Only schedule visits of the given type | ||||||||||||||||||||
where_clauses.append("visit_type = %s") | where_clauses.append("lo.visit_type = %s") | ||||||||||||||||||||
query_args.append(visit_type) | query_args.append(visit_type) | ||||||||||||||||||||
if scheduled_cooldown: | if scheduled_cooldown: | ||||||||||||||||||||
# Don't re-schedule visits if they're already scheduled but we haven't | # Don't re-schedule visits if they're already scheduled but we haven't | ||||||||||||||||||||
# recorded a result yet, unless they've been scheduled more than a week | # recorded a result yet, unless they've been scheduled more than a week | ||||||||||||||||||||
# ago (it probably means we've lost them in flight somewhere). | # ago (it probably means we've lost them in flight somewhere). | ||||||||||||||||||||
where_clauses.append( | where_clauses.append( | ||||||||||||||||||||
"""origin_visit_stats.last_scheduled IS NULL | """origin_visit_stats.last_scheduled IS NULL | ||||||||||||||||||||
Show All 29 Lines | ) -> List[ListedOrigin]: | ||||||||||||||||||||
if policy == "oldest_scheduled_first": | if policy == "oldest_scheduled_first": | ||||||||||||||||||||
order_by = "origin_visit_stats.last_scheduled NULLS FIRST" | order_by = "origin_visit_stats.last_scheduled NULLS FIRST" | ||||||||||||||||||||
elif policy == "never_visited_oldest_update_first": | elif policy == "never_visited_oldest_update_first": | ||||||||||||||||||||
# never visited origins have a NULL last_snapshot | # never visited origins have a NULL last_snapshot | ||||||||||||||||||||
where_clauses.append("origin_visit_stats.last_snapshot IS NULL") | where_clauses.append("origin_visit_stats.last_snapshot IS NULL") | ||||||||||||||||||||
# order by increasing last_update (oldest first) | # order by increasing last_update (oldest first) | ||||||||||||||||||||
where_clauses.append("listed_origins.last_update IS NOT NULL") | where_clauses.append("lo.last_update IS NOT NULL") | ||||||||||||||||||||
order_by = "listed_origins.last_update" | order_by = "lo.last_update" | ||||||||||||||||||||
elif policy == "already_visited_order_by_lag": | elif policy == "already_visited_order_by_lag": | ||||||||||||||||||||
# TODO: store "visit lag" in a materialized view? | # TODO: store "visit lag" in a materialized view? | ||||||||||||||||||||
# visited origins have a NOT NULL last_snapshot | # visited origins have a NOT NULL last_snapshot | ||||||||||||||||||||
where_clauses.append("origin_visit_stats.last_snapshot IS NOT NULL") | where_clauses.append("origin_visit_stats.last_snapshot IS NOT NULL") | ||||||||||||||||||||
# ignore origins we have visited after the known last update | # ignore origins we have visited after the known last update | ||||||||||||||||||||
where_clauses.append("listed_origins.last_update IS NOT NULL") | where_clauses.append("lo.last_update IS NOT NULL") | ||||||||||||||||||||
where_clauses.append( | where_clauses.append( | ||||||||||||||||||||
""" | """ | ||||||||||||||||||||
listed_origins.last_update | lo.last_update | ||||||||||||||||||||
> GREATEST( | > GREATEST( | ||||||||||||||||||||
origin_visit_stats.last_eventful, | origin_visit_stats.last_eventful, | ||||||||||||||||||||
origin_visit_stats.last_uneventful | origin_visit_stats.last_uneventful | ||||||||||||||||||||
) | ) | ||||||||||||||||||||
""" | """ | ||||||||||||||||||||
) | ) | ||||||||||||||||||||
# order by decreasing visit lag | # order by decreasing visit lag | ||||||||||||||||||||
order_by = """\ | order_by = """\ | ||||||||||||||||||||
listed_origins.last_update | lo.last_update | ||||||||||||||||||||
- GREATEST( | - GREATEST( | ||||||||||||||||||||
origin_visit_stats.last_eventful, | origin_visit_stats.last_eventful, | ||||||||||||||||||||
origin_visit_stats.last_uneventful | origin_visit_stats.last_uneventful | ||||||||||||||||||||
) | ) | ||||||||||||||||||||
DESC | DESC | ||||||||||||||||||||
""" | """ | ||||||||||||||||||||
ardumont: We discussed the possibility to make this an enum with olasd (but that's a refactoring for… | |||||||||||||||||||||
elif policy == SCHEDULING_POLICY_ORIGINS_NO_LAST_UPDATE: | |||||||||||||||||||||
where_clauses.append("lo.last_update IS NULL") | |||||||||||||||||||||
where_clauses.append( | |||||||||||||||||||||
"origin_visit_stats.next_visit_queue_position IS NOT NULL" | |||||||||||||||||||||
) | |||||||||||||||||||||
where_clauses.append( | |||||||||||||||||||||
"vsqp.position <= origin_visit_stats.next_visit_queue_position" | |||||||||||||||||||||
) | |||||||||||||||||||||
# TODO: Determine if it's actually what we want... this chooses an interval | |||||||||||||||||||||
# of one day to fetch visits to schedule... | |||||||||||||||||||||
where_clauses.append( | |||||||||||||||||||||
"origin_visit_stats.next_visit_queue_position < vsqp.position + '1 day'" | |||||||||||||||||||||
) | |||||||||||||||||||||
order_by = "lo.last_update" | |||||||||||||||||||||
ardumontUnsubmitted Done Inline Actions
Use the limit parameter in the call to the method. ardumont: Use the limit parameter in the call to the method. | |||||||||||||||||||||
else: | else: | ||||||||||||||||||||
raise UnknownPolicy(f"Unknown scheduling policy {policy}") | raise UnknownPolicy(f"Unknown scheduling policy {policy}") | ||||||||||||||||||||
select_query = f""" | select_query = f""" | ||||||||||||||||||||
SELECT | SELECT | ||||||||||||||||||||
{origin_select_cols} | {origin_select_cols} | ||||||||||||||||||||
ardumontUnsubmitted Done Inline Actions
ardumont: | |||||||||||||||||||||
FROM | FROM | ||||||||||||||||||||
listed_origins | listed_origins lo | ||||||||||||||||||||
LEFT JOIN | LEFT JOIN | ||||||||||||||||||||
origin_visit_stats USING (url, visit_type) | origin_visit_stats USING (url, visit_type) | ||||||||||||||||||||
LEFT JOIN | |||||||||||||||||||||
visit_scheduler_queue_position vsqp | |||||||||||||||||||||
Done Inline Actionswe probably want those joins to be computed as well now. ardumont: we probably want those joins to be computed as well now. | |||||||||||||||||||||
on vsqp.visit_type = lo.visit_type | |||||||||||||||||||||
ardumontUnsubmitted Done Inline ActionsDrop this join which is no longer needed with the previous comment. ardumont: Drop this join which is no longer needed with the previous comment. | |||||||||||||||||||||
WHERE | WHERE | ||||||||||||||||||||
({") AND (".join(where_clauses)}) | ({") AND (".join(where_clauses)}) | ||||||||||||||||||||
ORDER BY | ORDER BY | ||||||||||||||||||||
{order_by} | {order_by} | ||||||||||||||||||||
LIMIT %s | LIMIT %s | ||||||||||||||||||||
""" | """ | ||||||||||||||||||||
query_args.append(count) | query_args.append(count) | ||||||||||||||||||||
Show All 10 Lines | ) -> List[ListedOrigin]: | ||||||||||||||||||||
url, visit_type, %s | url, visit_type, %s | ||||||||||||||||||||
FROM | FROM | ||||||||||||||||||||
selected_origins | selected_origins | ||||||||||||||||||||
ON CONFLICT (url, visit_type) DO UPDATE | ON CONFLICT (url, visit_type) DO UPDATE | ||||||||||||||||||||
SET last_scheduled = GREATEST( | SET last_scheduled = GREATEST( | ||||||||||||||||||||
origin_visit_stats.last_scheduled, | origin_visit_stats.last_scheduled, | ||||||||||||||||||||
EXCLUDED.last_scheduled | EXCLUDED.last_scheduled | ||||||||||||||||||||
) | ) | ||||||||||||||||||||
) | ) | ||||||||||||||||||||
SELECT | SELECT | ||||||||||||||||||||
ardumontUnsubmitted Done Inline Actions
This need to be a template and conditionned upon the right policy (the new one from this diff) ardumont: This need to be a template and conditionned upon the right policy (the new one from this diff) | |||||||||||||||||||||
* | * | ||||||||||||||||||||
ardumontUnsubmitted Done Inline Actions
ardumont: | |||||||||||||||||||||
FROM | FROM | ||||||||||||||||||||
selected_origins | selected_origins | ||||||||||||||||||||
""" | """ | ||||||||||||||||||||
query_args.append(timestamp) | query_args.append(timestamp) | ||||||||||||||||||||
cur.execute(query, tuple(query_args)) | cur.execute(query, tuple(query_args)) | ||||||||||||||||||||
return [ListedOrigin(**d) for d in cur] | return [ListedOrigin(**d) for d in cur] | ||||||||||||||||||||
▲ Show 20 Lines • Show All 579 Lines • Show Last 20 Lines |
We discussed the possibility to make this an enum with olasd (but that's a refactoring for another time).