diff --git a/swh/scheduler/celery_backend/recurrent_visits.py b/swh/scheduler/celery_backend/recurrent_visits.py --- a/swh/scheduler/celery_backend/recurrent_visits.py +++ b/swh/scheduler/celery_backend/recurrent_visits.py @@ -38,7 +38,6 @@ "origins_without_last_update": 0.02, } -# Default policy ratio, let's start that configuration in the module first POLICY_RATIO: Dict[str, Dict[str, float]] = { "default": { "already_visited_order_by_lag": 0.5, @@ -50,15 +49,16 @@ "cvs": _VCS_POLICY_RATIOS, "bzr": _VCS_POLICY_RATIOS, } - +"""Scheduling policies to use to retrieve visits for the given visit types, with the +respective ratios""" MIN_SLOTS_RATIO = 0.05 """Quantity of slots that need to be available (with respect to max_queue_length) for -`grab_next_visits` to trigger""" +:py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` to trigger""" QUEUE_FULL_BACKOFF = 60 -"""Backoff time (in seconds) if there's fewer than `MIN_SLOTS_RATIO` slots available in -the queue.""" +"""Backoff time (in seconds) if there's fewer than :py:data:`MIN_SLOTS_RATIO` slots +available in the queue.""" NO_ORIGINS_SCHEDULED_BACKOFF = 20 * 60 """Backoff time (in seconds) if no origins have been scheduled in the current @@ -75,10 +75,10 @@ def grab_next_visits_policy_ratio( scheduler: SchedulerInterface, visit_type: str, num_visits: int ) -> List[ListedOrigin]: - """Get the next `num_visits` for the given `visit_type` using the corresponding + """Get the next ``num_visits`` for the given ``visit_type`` using the corresponding set of scheduling policies. - The `POLICY_RATIO` dict sets, for each visit type, the scheduling policies + The :py:data:`POLICY_RATIO` dict sets, for each visit type, the scheduling policies used to pull the next tasks, and what proportion of the available num_visits they take. @@ -86,7 +86,7 @@ the requested ratio by more than 5%. Returns: - at most `num_visits` `ListedOrigin` objects + at most ``num_visits`` :py:class:`~swh.scheduler.model.ListedOrigin` objects """ policy_ratio = POLICY_RATIO.get(visit_type, POLICY_RATIO["default"]) @@ -135,26 +135,28 @@ def send_visits_for_visit_type( scheduler: SchedulerInterface, app, visit_type: str, task_type: Dict, ) -> float: - """Schedule the next batch of visits for the given `visit_type`. + """Schedule the next batch of visits for the given ``visit_type``. - First, we determine the number of available slots by introspecting the - RabbitMQ queue. + First, we determine the number of available slots by introspecting the RabbitMQ + queue. - If there's fewer than `MIN_SLOTS_RATIO` slots available in the queue, we wait - for `QUEUE_FULL_BACKOFF` seconds. This avoids running the expensive - `grab_next_visits` queries when there's not many jobs to queue. + If there's fewer than :py:data:`MIN_SLOTS_RATIO` slots available in the queue, we + wait for :py:data:`QUEUE_FULL_BACKOFF` seconds. This avoids running the expensive + :py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` queries when + there's not many jobs to queue. - Once there's more than `MIN_SLOTS_RATIO` slots available, we run - `get_next_visits` to retrieve the next set of origin visits to schedule, and - we send them to celery. + Once there's more than :py:data:`MIN_SLOTS_RATIO` slots available, we run + :py:func:`grab_next_visits_policy_ratio` to retrieve the next set of origin visits + to schedule, and we send them to celery. If the last scheduling attempt didn't return any origins, we sleep for - `NO_ORIGINS_SCHEDULED_BACKOFF` seconds. This avoids running the expensive - `grab_next_visits` queries too often if there's nothing left to schedule. + :py:data:`NO_ORIGINS_SCHEDULED_BACKOFF` seconds. This avoids running the expensive + :py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` queries too + often if there's nothing left to schedule. Returns: - the earliest `time.monotonic` value at which to run the next iteration of - the loop. + the earliest :py:func:`time.monotonic` value at which to run the next iteration + of the loop. """ queue_name = task_type["backend_name"] @@ -208,8 +210,8 @@ command_queue: Queue[object], exc_queue: Queue[Tuple[str, BaseException]], ): - """Target function for the visit sending thread, which initializes local - connections and handles exceptions by sending them back to the main thread.""" + """Target function for the visit sending thread, which initializes local connections + and handles exceptions by sending them back to the main thread.""" from swh.scheduler import get_scheduler from swh.scheduler.celery_backend.config import build_app @@ -259,7 +261,7 @@ config: Dict[str, Any], visit_type: str, ): - """Spawn a new thread to schedule the visits of type `visit_type`.""" + """Spawn a new thread to schedule the visits of type ``visit_type``.""" command_queue: Queue[object] = Queue() thread = Thread( target=visit_scheduler_thread,