Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/utils.py
# Copyright (C) 2017-2022 The Software Heritage developers | # Copyright (C) 2017-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from datetime import datetime, timezone | from datetime import datetime, timezone | ||||
from typing import Any, Dict | from typing import Any, Dict, List | ||||
from .model import ListedOrigin | from .interface import SchedulerInterface | ||||
from .model import ListedOrigin, Lister | |||||
def utcnow(): | def utcnow(): | ||||
return datetime.now(tz=timezone.utc) | return datetime.now(tz=timezone.utc) | ||||
def get_task(task_name): | def get_task(task_name): | ||||
"""Retrieve task object in our application instance by its fully | """Retrieve task object in our application instance by its fully | ||||
▲ Show 20 Lines • Show All 43 Lines • ▼ Show 20 Lines | task = { | ||||
"args": args if args else [], | "args": args if args else [], | ||||
"kwargs": kwargs if kwargs else {}, | "kwargs": kwargs if kwargs else {}, | ||||
}, | }, | ||||
} | } | ||||
task.update(task_extra) | task.update(task_extra) | ||||
return task | return task | ||||
def create_origin_task_dict(origin: ListedOrigin) -> Dict[str, Any]: | def create_origin_task_dict(origin: ListedOrigin, lister: Lister) -> Dict[str, Any]: | ||||
if origin.lister_id != lister.id: | |||||
raise ValueError( | |||||
"origin.lister_id and lister.id differ", origin.lister_id, lister.id | |||||
) | |||||
return { | return { | ||||
"type": f"load-{origin.visit_type}", | "type": f"load-{origin.visit_type}", | ||||
"arguments": { | "arguments": { | ||||
"args": [], | "args": [], | ||||
"kwargs": {"url": origin.url, **origin.extra_loader_arguments}, | "kwargs": { | ||||
"url": origin.url, | |||||
"lister_name": lister.name, | |||||
**origin.extra_loader_arguments, | |||||
}, | }, | ||||
}, | |||||
} | |||||
def create_origin_task_dicts( | |||||
origins: List[ListedOrigin], scheduler: SchedulerInterface | |||||
) -> List[Dict[str, Any]]: | |||||
"""Returns a task dict for each origin, in the same order.""" | |||||
lister_ids = {o.lister_id for o in origins} | |||||
listers = { | |||||
lister.id: lister | |||||
for lister in scheduler.get_listers_by_id(list(map(str, lister_ids))) | |||||
} | } | ||||
missing_lister_ids = lister_ids - set(listers) | |||||
assert not missing_lister_ids, f"Missing listers: {missing_lister_ids}" | |||||
return [create_origin_task_dict(o, listers[o.lister_id]) for o in origins] | |||||
def create_oneshot_task_dict(type, *args, **kwargs): | def create_oneshot_task_dict(type, *args, **kwargs): | ||||
"""Create a oneshot task scheduled for as soon as possible. | """Create a oneshot task scheduled for as soon as possible. | ||||
Args: | Args: | ||||
type (str): Type of oneshot task as per swh-scheduler's db | type (str): Type of oneshot task as per swh-scheduler's db | ||||
table task_type's column (Ex: load-git, | table task_type's column (Ex: load-git, | ||||
check-deposit) | check-deposit) | ||||
Returns: | Returns: | ||||
Expected dictionary for the one-shot task scheduling api | Expected dictionary for the one-shot task scheduling api | ||||
(swh.scheduler.backend.create_tasks) | (swh.scheduler.backend.create_tasks) | ||||
""" | """ | ||||
return create_task_dict(type, "oneshot", *args, **kwargs) | return create_task_dict(type, "oneshot", *args, **kwargs) |