diff --git a/swh/scheduler/celery_backend/recurrent_visits.py b/swh/scheduler/celery_backend/recurrent_visits.py --- a/swh/scheduler/celery_backend/recurrent_visits.py +++ b/swh/scheduler/celery_backend/recurrent_visits.py @@ -32,25 +32,25 @@ logger = logging.getLogger(__name__) -_VCS_POLICY_RATIOS = { - "already_visited_order_by_lag": 0.49, - "never_visited_oldest_update_first": 0.49, - "origins_without_last_update": 0.02, +_VCS_POLICY_WEIGHTS: Dict[str, float] = { + "already_visited_order_by_lag": 49, + "never_visited_oldest_update_first": 49, + "origins_without_last_update": 2, } -POLICY_RATIO: Dict[str, Dict[str, float]] = { +POLICY_WEIGHTS: Dict[str, Dict[str, float]] = { "default": { - "already_visited_order_by_lag": 0.5, - "never_visited_oldest_update_first": 0.5, + "already_visited_order_by_lag": 50, + "never_visited_oldest_update_first": 50, }, - "git": _VCS_POLICY_RATIOS, - "hg": _VCS_POLICY_RATIOS, - "svn": _VCS_POLICY_RATIOS, - "cvs": _VCS_POLICY_RATIOS, - "bzr": _VCS_POLICY_RATIOS, + "git": _VCS_POLICY_WEIGHTS, + "hg": _VCS_POLICY_WEIGHTS, + "svn": _VCS_POLICY_WEIGHTS, + "cvs": _VCS_POLICY_WEIGHTS, + "bzr": _VCS_POLICY_WEIGHTS, } -"""Scheduling policies to use to retrieve visits for the given visit types, with the -respective ratios""" +"""Scheduling policies to use to retrieve visits for the given visit types, with their +relative weights""" MIN_SLOTS_RATIO = 0.05 """Quantity of slots that need to be available (with respect to max_queue_length) for @@ -72,15 +72,15 @@ comparison)""" -def grab_next_visits_policy_ratio( +def grab_next_visits_policy_weights( scheduler: SchedulerInterface, visit_type: str, num_visits: int ) -> List[ListedOrigin]: """Get the next ``num_visits`` for the given ``visit_type`` using the corresponding set of scheduling policies. - The :py:data:`POLICY_RATIO` dict sets, for each visit type, the scheduling policies - used to pull the next tasks, and what proportion of the available num_visits - they take. + The :py:data:`POLICY_WEIGHTS` dict sets, for each visit type, the scheduling + policies used to pull the next tasks, and what proportion of the available + num_visits they take. This function emits a warning if the ratio of retrieved origins is off of the requested ratio by more than 5%. @@ -88,7 +88,15 @@ Returns: at most ``num_visits`` :py:class:`~swh.scheduler.model.ListedOrigin` objects """ - policy_ratio = POLICY_RATIO.get(visit_type, POLICY_RATIO["default"]) + policy_weights = POLICY_WEIGHTS.get(visit_type, POLICY_WEIGHTS["default"]) + total_weight = sum(policy_weights.values()) + + if not total_weight: + raise ValueError(f"No policy weights set for visit type {visit_type}") + + policy_ratio = { + policy: weight / total_weight for policy, weight in policy_weights.items() + } fetched_origins: Dict[str, List[ListedOrigin]] = {} @@ -146,7 +154,7 @@ there's not many jobs to queue. Once there's more than :py:data:`MIN_SLOTS_RATIO` slots available, we run - :py:func:`grab_next_visits_policy_ratio` to retrieve the next set of origin visits + :py:func:`grab_next_visits_policy_weights` to retrieve the next set of origin visits to schedule, and we send them to celery. If the last scheduling attempt didn't return any origins, we sleep for @@ -176,7 +184,7 @@ if available_slots < min_available_slots: return current_iteration_start + QUEUE_FULL_BACKOFF - origins = grab_next_visits_policy_ratio(scheduler, visit_type, available_slots) + origins = grab_next_visits_policy_weights(scheduler, visit_type, available_slots) if not origins: logger.debug("No origins to visit for type %s", visit_type)