Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/backend.py
Show First 20 Lines • Show All 296 Lines • ▼ Show 20 Lines | ) -> List[ListedOrigin]: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
This will mark the origins as "being visited" in the listed_origins | This will mark the origins as "being visited" in the listed_origins | ||||||||||||||||||||||||||||||||||||||||||||||||||||
table, to avoid scheduling multiple visits to the same origin. | table, to avoid scheduling multiple visits to the same origin. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | """ | ||||||||||||||||||||||||||||||||||||||||||||||||||||
origin_select_cols = ", ".join(ListedOrigin.select_columns()) | origin_select_cols = ", ".join(ListedOrigin.select_columns()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||
if policy == "oldest_scheduled_first": | if policy == "oldest_scheduled_first": | ||||||||||||||||||||||||||||||||||||||||||||||||||||
query = f""" | query = f""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||
with filtered_origins as ( | WITH selected_origins AS ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||
select lister_id, url, visit_type | SELECT | ||||||||||||||||||||||||||||||||||||||||||||||||||||
from listed_origins | {origin_select_cols} | ||||||||||||||||||||||||||||||||||||||||||||||||||||
where visit_type = %s | FROM | ||||||||||||||||||||||||||||||||||||||||||||||||||||
order by last_scheduled nulls first | listed_origins | ||||||||||||||||||||||||||||||||||||||||||||||||||||
limit %s | LEFT JOIN | ||||||||||||||||||||||||||||||||||||||||||||||||||||
for update skip locked | origin_visit_stats USING (url, visit_type) | ||||||||||||||||||||||||||||||||||||||||||||||||||||
vlorentz: why did you remove this line? | |||||||||||||||||||||||||||||||||||||||||||||||||||||
Done Inline Actionsbecause postgresql would not let me have it... but as I'm a noob in sql, I am very welcome to any help :-) douardda: because postgresql would not let me have it... but as I'm a noob in sql, I am very welcome to… | |||||||||||||||||||||||||||||||||||||||||||||||||||||
) | WHERE | ||||||||||||||||||||||||||||||||||||||||||||||||||||
update listed_origins | visit_type = %s | ||||||||||||||||||||||||||||||||||||||||||||||||||||
set last_scheduled = now() | ORDER BY | ||||||||||||||||||||||||||||||||||||||||||||||||||||
where (lister_id, url, visit_type) in (select * from filtered_origins) | origin_visit_stats.last_scheduled NULLS FIRST | ||||||||||||||||||||||||||||||||||||||||||||||||||||
returning {origin_select_cols} | LIMIT %s | ||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ), | ||||||||||||||||||||||||||||||||||||||||||||||||||||
update_stats AS ( | |||||||||||||||||||||||||||||||||||||||||||||||||||||
INSERT INTO | |||||||||||||||||||||||||||||||||||||||||||||||||||||
origin_visit_stats ( | |||||||||||||||||||||||||||||||||||||||||||||||||||||
url, visit_type, last_scheduled | |||||||||||||||||||||||||||||||||||||||||||||||||||||
) | |||||||||||||||||||||||||||||||||||||||||||||||||||||
SELECT | |||||||||||||||||||||||||||||||||||||||||||||||||||||
Not Done Inline ActionsWe'll probably want to be able to set this now() timestamp from the outside, for simulation purposes. olasd: We'll probably want to be able to set this `now()` timestamp from the outside, for simulation… | |||||||||||||||||||||||||||||||||||||||||||||||||||||
url, visit_type, now() | |||||||||||||||||||||||||||||||||||||||||||||||||||||
FROM | |||||||||||||||||||||||||||||||||||||||||||||||||||||
selected_origins | |||||||||||||||||||||||||||||||||||||||||||||||||||||
ON CONFLICT (url, visit_type) DO UPDATE | |||||||||||||||||||||||||||||||||||||||||||||||||||||
SET last_scheduled = GREATEST(origin_visit_stats.last_scheduled, EXCLUDED.last_scheduled) | |||||||||||||||||||||||||||||||||||||||||||||||||||||
) | |||||||||||||||||||||||||||||||||||||||||||||||||||||
SELECT | |||||||||||||||||||||||||||||||||||||||||||||||||||||
* | |||||||||||||||||||||||||||||||||||||||||||||||||||||
FROM | |||||||||||||||||||||||||||||||||||||||||||||||||||||
selected_origins | |||||||||||||||||||||||||||||||||||||||||||||||||||||
""" # noqa | |||||||||||||||||||||||||||||||||||||||||||||||||||||
cur.execute(query, (visit_type, count)) | cur.execute(query, (visit_type, count)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||
Not Done Inline Actions
I've reorganized your query in three parts:
We don't need to chain all the data through using a returning in the update_stats CTE, we can just select all the interesting data in the first select, do the bulk update, and return the data from the first CTE directly. olasd: I've reorganized your query in three parts:
- selecting all the data for the next origins to… | |||||||||||||||||||||||||||||||||||||||||||||||||||||
Done Inline Actionsawesome, thx a lot douardda: awesome, thx a lot | |||||||||||||||||||||||||||||||||||||||||||||||||||||
return [ListedOrigin(**d) for d in cur] | return [ListedOrigin(**d) for d in cur] | ||||||||||||||||||||||||||||||||||||||||||||||||||||
else: | else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
raise UnknownPolicy(f"Unknown scheduling policy {policy}") | raise UnknownPolicy(f"Unknown scheduling policy {policy}") | ||||||||||||||||||||||||||||||||||||||||||||||||||||
task_create_keys = [ | task_create_keys = [ | ||||||||||||||||||||||||||||||||||||||||||||||||||||
"type", | "type", | ||||||||||||||||||||||||||||||||||||||||||||||||||||
"arguments", | "arguments", | ||||||||||||||||||||||||||||||||||||||||||||||||||||
▲ Show 20 Lines • Show All 467 Lines • ▼ Show 20 Lines | ) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
select max(notfound.date) from (values | select max(notfound.date) from (values | ||||||||||||||||||||||||||||||||||||||||||||||||||||
(excluded.last_notfound), | (excluded.last_notfound), | ||||||||||||||||||||||||||||||||||||||||||||||||||||
(ovi.last_notfound) | (ovi.last_notfound) | ||||||||||||||||||||||||||||||||||||||||||||||||||||
) as notfound(date) | ) as notfound(date) | ||||||||||||||||||||||||||||||||||||||||||||||||||||
), | ), | ||||||||||||||||||||||||||||||||||||||||||||||||||||
last_snapshot = (select | last_snapshot = (select | ||||||||||||||||||||||||||||||||||||||||||||||||||||
case | case | ||||||||||||||||||||||||||||||||||||||||||||||||||||
when ovi.last_eventful < excluded.last_eventful then excluded.last_snapshot | when ovi.last_eventful < excluded.last_eventful then excluded.last_snapshot | ||||||||||||||||||||||||||||||||||||||||||||||||||||
else ovi.last_snapshot | else coalesce(ovi.last_snapshot, excluded.last_snapshot) | ||||||||||||||||||||||||||||||||||||||||||||||||||||
Not Done Inline ActionsMaybe this would deserve a separate commit (no need for a separate diff)? olasd: Maybe this would deserve a separate commit (no need for a separate diff)? | |||||||||||||||||||||||||||||||||||||||||||||||||||||
Done Inline ActionsI can but this is really needed here as a side effect of the new implementation of grab_next_visits() (which inserts rows in origin_visit_stats) douardda: I can but this is really needed here as a side effect of the new implementation of… | |||||||||||||||||||||||||||||||||||||||||||||||||||||
end | end | ||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||
""" # noqa | """ # noqa | ||||||||||||||||||||||||||||||||||||||||||||||||||||
try: | try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
psycopg2.extras.execute_values( | psycopg2.extras.execute_values( | ||||||||||||||||||||||||||||||||||||||||||||||||||||
cur=cur, | cur=cur, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
sql=query, | sql=query, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
Show All 25 Lines |
why did you remove this line?