Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/backend.py
Show First 20 Lines • Show All 283 Lines • ▼ Show 20 Lines | ) -> PaginatedListedOriginList: | ||||
page_token = (origins[-1].lister_id, origins[-1].url) | page_token = (origins[-1].lister_id, origins[-1].url) | ||||
else: | else: | ||||
page_token = None | page_token = None | ||||
return PaginatedListedOriginList(origins, page_token) | return PaginatedListedOriginList(origins, page_token) | ||||
@db_transaction() | @db_transaction() | ||||
def grab_next_visits( | def grab_next_visits( | ||||
self, count: int, policy: str, db=None, cur=None, | self, visit_type: str, count: int, policy: str, db=None, cur=None, | ||||
) -> List[ListedOrigin]: | ) -> List[ListedOrigin]: | ||||
"""Get at most the `count` next origins that need to be visited | """Get at most the `count` next origins that need to be visited with | ||||
according to the given scheduling `policy`. | the `visit_type` loader according to the given scheduling `policy`. | ||||
This will mark the origins as "being visited" in the listed_origins | This will mark the origins as "being visited" in the listed_origins | ||||
table, to avoid scheduling multiple visits to the same origin. | table, to avoid scheduling multiple visits to the same origin. | ||||
""" | """ | ||||
origin_select_cols = ", ".join(ListedOrigin.select_columns()) | origin_select_cols = ", ".join(ListedOrigin.select_columns()) | ||||
if policy == "oldest_scheduled_first": | if policy == "oldest_scheduled_first": | ||||
query = f""" | query = f""" | ||||
with filtered_origins as ( | with filtered_origins as ( | ||||
select lister_id, url, visit_type | select lister_id, url, visit_type | ||||
from listed_origins | from listed_origins | ||||
where visit_type = %s | |||||
order by last_scheduled nulls first | order by last_scheduled nulls first | ||||
limit %s | limit %s | ||||
for update skip locked | for update skip locked | ||||
) | ) | ||||
update listed_origins | update listed_origins | ||||
set last_scheduled = now() | set last_scheduled = now() | ||||
where (lister_id, url, visit_type) in (select * from filtered_origins) | where (lister_id, url, visit_type) in (select * from filtered_origins) | ||||
returning {origin_select_cols} | returning {origin_select_cols} | ||||
""" | """ | ||||
cur.execute(query, (count,)) | cur.execute(query, (visit_type, count)) | ||||
return [ListedOrigin(**d) for d in cur] | return [ListedOrigin(**d) for d in cur] | ||||
else: | else: | ||||
raise UnknownPolicy(f"Unknown scheduling policy {policy}") | raise UnknownPolicy(f"Unknown scheduling policy {policy}") | ||||
task_create_keys = [ | task_create_keys = [ | ||||
"type", | "type", | ||||
"arguments", | "arguments", | ||||
▲ Show 20 Lines • Show All 491 Lines • Show Last 20 Lines |