Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/recurrent_visits.py
Show First 20 Lines • Show All 43 Lines • ▼ Show 20 Lines | "default": { | ||||
"never_visited_oldest_update_first": 50, | "never_visited_oldest_update_first": 50, | ||||
}, | }, | ||||
"git": _VCS_POLICY_WEIGHTS, | "git": _VCS_POLICY_WEIGHTS, | ||||
"hg": _VCS_POLICY_WEIGHTS, | "hg": _VCS_POLICY_WEIGHTS, | ||||
"svn": _VCS_POLICY_WEIGHTS, | "svn": _VCS_POLICY_WEIGHTS, | ||||
"cvs": _VCS_POLICY_WEIGHTS, | "cvs": _VCS_POLICY_WEIGHTS, | ||||
"bzr": _VCS_POLICY_WEIGHTS, | "bzr": _VCS_POLICY_WEIGHTS, | ||||
} | } | ||||
POLICY_ADDITIONAL_PARAMETERS: Dict[str, Dict[str, Any]] = { | |||||
"git": { | |||||
"already_visited_order_by_lag": {"tablesample": 0.1}, | |||||
"never_visited_oldest_update_first": {"tablesample": 0.1}, | |||||
"origins_without_last_update": {"tablesample": 0.1}, | |||||
} | |||||
} | |||||
"""Scheduling policies to use to retrieve visits for the given visit types, with their | """Scheduling policies to use to retrieve visits for the given visit types, with their | ||||
relative weights""" | relative weights""" | ||||
MIN_SLOTS_RATIO = 0.05 | MIN_SLOTS_RATIO = 0.05 | ||||
"""Quantity of slots that need to be available (with respect to max_queue_length) for | """Quantity of slots that need to be available (with respect to max_queue_length) for | ||||
:py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` to trigger""" | :py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` to trigger""" | ||||
QUEUE_FULL_BACKOFF = 60 | QUEUE_FULL_BACKOFF = 60 | ||||
Show All 35 Lines | if not total_weight: | ||||
raise ValueError(f"No policy weights set for visit type {visit_type}") | raise ValueError(f"No policy weights set for visit type {visit_type}") | ||||
policy_ratio = { | policy_ratio = { | ||||
policy: weight / total_weight for policy, weight in policy_weights.items() | policy: weight / total_weight for policy, weight in policy_weights.items() | ||||
} | } | ||||
fetched_origins: Dict[str, List[ListedOrigin]] = {} | fetched_origins: Dict[str, List[ListedOrigin]] = {} | ||||
for policy, ratio in policy_ratio.items(): | for policy, ratio in policy_ratio.items(): | ||||
ardumont: This one exists so might as well access it directly.
(Might even make the following assertion… | |||||
num_tasks_to_send = int(num_visits * ratio) | num_tasks_to_send = int(num_visits * ratio) | ||||
fetched_origins[policy] = scheduler.grab_next_visits( | fetched_origins[policy] = scheduler.grab_next_visits( | ||||
visit_type, num_tasks_to_send, policy=policy | visit_type, | ||||
num_tasks_to_send, | |||||
policy=policy, | |||||
**POLICY_ADDITIONAL_PARAMETERS.get(visit_type, {}).get(policy, {}), | |||||
) | ) | ||||
all_origins: List[ListedOrigin] = list( | all_origins: List[ListedOrigin] = list( | ||||
chain.from_iterable(fetched_origins.values()) | chain.from_iterable(fetched_origins.values()) | ||||
) | ) | ||||
if not all_origins: | if not all_origins: | ||||
return [] | return [] | ||||
▲ Show 20 Lines • Show All 197 Lines • Show Last 20 Lines |
This one exists so might as well access it directly.
(Might even make the following assertion useless?)