diff --git a/swh/scheduler/celery_backend/recurrent_visits.py b/swh/scheduler/celery_backend/recurrent_visits.py --- a/swh/scheduler/celery_backend/recurrent_visits.py +++ b/swh/scheduler/celery_backend/recurrent_visits.py @@ -32,30 +32,28 @@ logger = logging.getLogger(__name__) -_VCS_POLICY_WEIGHTS: Dict[str, float] = { - "already_visited_order_by_lag": 49, - "never_visited_oldest_update_first": 49, - "origins_without_last_update": 2, -} - -POLICY_WEIGHTS: Dict[str, Dict[str, float]] = { - "default": { - "already_visited_order_by_lag": 50, - "never_visited_oldest_update_first": 50, - }, - "git": _VCS_POLICY_WEIGHTS, - "hg": _VCS_POLICY_WEIGHTS, - "svn": _VCS_POLICY_WEIGHTS, - "cvs": _VCS_POLICY_WEIGHTS, - "bzr": _VCS_POLICY_WEIGHTS, -} - -POLICY_ADDITIONAL_PARAMETERS: Dict[str, Dict[str, Any]] = { - "git": { - "already_visited_order_by_lag": {"tablesample": 0.1}, - "never_visited_oldest_update_first": {"tablesample": 0.1}, - "origins_without_last_update": {"tablesample": 0.1}, - } +DEFAULT_POLICY = [ + {"policy": "already_visited_order_by_lag", "weight": 50}, + {"policy": "never_visited_oldest_update_first", "weight": 50}, +] +DEFAULT_DVCS_POLICY = [ + {"policy": "already_visited_order_by_lag", "weight": 49}, + {"policy": "never_visited_oldest_update_first", "weight": 49}, + {"policy": "origins_without_last_update", "weight": 2}, +] +DEFAULT_GIT_POLICY = [ + {"policy": "already_visited_order_by_lag", "weight": 49, "tablesample": 0.1}, + {"policy": "never_visited_oldest_update_first", "weight": 49, "tablesample": 0.1}, + {"policy": "origins_without_last_update", "weight": 2, "tablesample": 0.1}, +] + +DEFAULT_POLICY_CONFIG: Dict[str, List[Dict[str, Any]]] = { + "default": DEFAULT_POLICY, + "hg": DEFAULT_DVCS_POLICY, + "svn": DEFAULT_DVCS_POLICY, + "cvs": DEFAULT_DVCS_POLICY, + "bzr": DEFAULT_DVCS_POLICY, + "git": DEFAULT_GIT_POLICY, } """Scheduling policies to use to retrieve visits for the given visit types, with their @@ -82,40 +80,58 @@ def grab_next_visits_policy_weights( - scheduler: SchedulerInterface, visit_type: str, num_visits: int + scheduler: SchedulerInterface, + visit_type: str, + num_visits: int, + policy_cfg: List[Dict[str, Any]], ) -> List[ListedOrigin]: """Get the next ``num_visits`` for the given ``visit_type`` using the corresponding set of scheduling policies. - The :py:data:`POLICY_WEIGHTS` dict sets, for each visit type, the scheduling - policies used to pull the next tasks, and what proportion of the available - num_visits they take. + The :py:data:`POLICY_CFG` list sets, for the current visit type, the + scheduling policies used to pull the next tasks. Each policy config entry + in the list should at least have a 'policy' (policy name) and a 'weight' + entry. For each policy in this policy_cfg list, the number of returned + origins to visit will be weighted using this weight config option so that + the total number of returned origins is around num_visits. Any other + key/value entry in the policy configuration will be passed to the + scheduler.grab_next_visit() method. This function emits a warning if the ratio of retrieved origins is off of the requested ratio by more than 5%. Returns: at most ``num_visits`` :py:class:`~swh.scheduler.model.ListedOrigin` objects + """ - policy_weights = POLICY_WEIGHTS.get(visit_type, POLICY_WEIGHTS["default"]) - total_weight = sum(policy_weights.values()) + policies = [cfg["policy"] for cfg in policy_cfg] + if len(set(policies)) != len(policies): + raise ValueError( + "A policy weights can only appear once; check your policy " + f"configuration for visit type {visit_type}" + ) + + weights = [cfg["weight"] for cfg in policy_cfg] + total_weight = sum(weights) if not total_weight: raise ValueError(f"No policy weights set for visit type {visit_type}") - policy_ratio = { - policy: weight / total_weight for policy, weight in policy_weights.items() - } + ratios = [weight / total_weight for weight in weights] - fetched_origins: Dict[str, List[ListedOrigin]] = {} + extra_kws = [ + {k: v for k, v in cfg.items() if k not in ("weight", "policy")} + for cfg in policy_cfg + ] - for policy, ratio in policy_ratio.items(): + fetched_origins: Dict[str, List[ListedOrigin]] = {} + for policy, ratio, extra_kw in zip(policies, ratios, extra_kws): num_tasks_to_send = int(num_visits * ratio) fetched_origins[policy] = scheduler.grab_next_visits( visit_type, num_tasks_to_send, policy=policy, - **POLICY_ADDITIONAL_PARAMETERS.get(visit_type, {}).get(policy, {}), + **extra_kw, ) all_origins: List[ListedOrigin] = list( @@ -131,7 +147,7 @@ for policy, origins in fetched_origins.items() } - for policy, expected_ratio in policy_ratio.items(): + for policy, expected_ratio in zip(policies, ratios): # 5% of skew with respect to request if abs(fetched_origin_ratios[policy] - expected_ratio) / expected_ratio > 0.05: logger.info( @@ -157,6 +173,7 @@ app, visit_type: str, task_type: Dict, + policy_cfg: List[Dict[str, Any]], ) -> float: """Schedule the next batch of visits for the given ``visit_type``. @@ -177,6 +194,10 @@ :py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` queries too often if there's nothing left to schedule. + The :py:data:`POLICY_CFG` argument is the policy configuration used to + choose the next origins to visit. It is passed directly to the + :py:func:`grab_next_visits_policy_weights()` function. + Returns: the earliest :py:func:`time.monotonic` value at which to run the next iteration of the loop. @@ -199,7 +220,9 @@ if available_slots < min_available_slots: return current_iteration_start + QUEUE_FULL_BACKOFF - origins = grab_next_visits_policy_weights(scheduler, visit_type, available_slots) + origins = grab_next_visits_policy_weights( + scheduler, visit_type, available_slots, policy_cfg + ) if not origins: logger.debug("No origins to visit for type %s", visit_type) @@ -248,10 +271,19 @@ app = build_app(config.get("celery")) scheduler = get_scheduler(**config["scheduler"]) task_type = scheduler.get_task_type(f"load-{visit_type}") - if task_type is None: raise ValueError(f"Unknown task type: load-{visit_type}") + policy_cfg = config.get("scheduling_policy", DEFAULT_POLICY_CONFIG) + for policies in policy_cfg.values(): + for policy in policies: + if "weight" not in policy or "policy" not in policy: + raise ValueError( + "Each policy configuration needs at least a 'policy' " + "and a 'weight' entry" + ) + policy_cfg = {**DEFAULT_POLICY_CONFIG, **policy_cfg} + next_iteration = time.monotonic() while True: @@ -270,7 +302,11 @@ logger.warn("Received unexpected message %s in command queue", msg) next_iteration = send_visits_for_visit_type( - scheduler, app, visit_type, task_type + scheduler, + app, + visit_type, + task_type, + policy_cfg.get(visit_type, policy_cfg["default"]), ) except BaseException as e: diff --git a/swh/scheduler/tests/test_recurrent_visits.py b/swh/scheduler/tests/test_recurrent_visits.py --- a/swh/scheduler/tests/test_recurrent_visits.py +++ b/swh/scheduler/tests/test_recurrent_visits.py @@ -6,12 +6,12 @@ from datetime import timedelta import logging from queue import Queue -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock import pytest from swh.scheduler.celery_backend.recurrent_visits import ( - POLICY_ADDITIONAL_PARAMETERS, + DEFAULT_DVCS_POLICY, VisitSchedulerThreads, grab_next_visits_policy_weights, send_visits_for_visit_type, @@ -120,7 +120,11 @@ for visit_type in ["test-git", "test-svn"]: task_type = f"load-{visit_type}" send_visits_for_visit_type( - swh_scheduler, mock_celery_app, visit_type, all_task_types[task_type] + swh_scheduler, + mock_celery_app, + visit_type, + all_task_types[task_type], + DEFAULT_DVCS_POLICY, ) assert mock_available_slots.called, "The available slots functions should be called" @@ -143,27 +147,24 @@ assert expected_record in set(records) -@patch.dict( - POLICY_ADDITIONAL_PARAMETERS, {"test-git": POLICY_ADDITIONAL_PARAMETERS["git"]} -) @pytest.mark.parametrize( - "visit_type, tablesamples", - [("test-hg", {}), ("test-git", POLICY_ADDITIONAL_PARAMETERS["git"])], + "visit_type, extras", + [("test-hg", {}), ("test-git", {"tablesample": 0.1})], ) def test_recurrent_visit_additional_parameters( - swh_scheduler, mocker, visit_type, tablesamples + swh_scheduler, mocker, visit_type, extras ): """Testing additional policy parameters""" mock_grab_next_visits = mocker.patch.object(swh_scheduler, "grab_next_visits") mock_grab_next_visits.return_value = [] - - grab_next_visits_policy_weights(swh_scheduler, visit_type, 10) + policy_cfg = DEFAULT_DVCS_POLICY[:] + for policy in policy_cfg: + policy.update(extras) + grab_next_visits_policy_weights(swh_scheduler, visit_type, 10, policy_cfg) for call in mock_grab_next_visits.call_args_list: - assert call[1].get("tablesample") == tablesamples.get( - call[1]["policy"], {} - ).get("tablesample") + assert call[1].get("tablesample") == extras.get("tablesample") @pytest.fixture