Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/recurrent_visits.py
Show First 20 Lines • Show All 62 Lines • ▼ Show 20 Lines | |||||
MIN_SLOTS_RATIO = 0.05 | MIN_SLOTS_RATIO = 0.05 | ||||
"""Quantity of slots that need to be available (with respect to max_queue_length) for | """Quantity of slots that need to be available (with respect to max_queue_length) for | ||||
:py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` to trigger""" | :py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` to trigger""" | ||||
QUEUE_FULL_BACKOFF = 60 | QUEUE_FULL_BACKOFF = 60 | ||||
"""Backoff time (in seconds) if there's fewer than :py:data:`MIN_SLOTS_RATIO` slots | """Backoff time (in seconds) if there's fewer than :py:data:`MIN_SLOTS_RATIO` slots | ||||
available in the queue.""" | available in the queue.""" | ||||
NO_ORIGINS_SCHEDULED_BACKOFF = 20 * 60 | DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF = 20 * 60 | ||||
vlorentz: should be renamed to `DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF` | |||||
"""Backoff time (in seconds) if no origins have been scheduled in the current | """Backoff time (in seconds) if no origins have been scheduled in the current | ||||
iteration""" | iteration""" | ||||
BACKOFF_SPLAY = 5.0 | BACKOFF_SPLAY = 5.0 | ||||
"""Amplitude of the fuzziness between backoffs""" | """Amplitude of the fuzziness between backoffs""" | ||||
TERMINATE = object() | TERMINATE = object() | ||||
"""Termination request received from command queue (singleton used for identity | """Termination request received from command queue (singleton used for identity | ||||
▲ Show 20 Lines • Show All 90 Lines • ▼ Show 20 Lines | |||||
def send_visits_for_visit_type( | def send_visits_for_visit_type( | ||||
scheduler: SchedulerInterface, | scheduler: SchedulerInterface, | ||||
app, | app, | ||||
visit_type: str, | visit_type: str, | ||||
task_type: Dict, | task_type: Dict, | ||||
policy_cfg: List[Dict[str, Any]], | policy_cfg: List[Dict[str, Any]], | ||||
no_origins_scheduled_backoff: int = DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF, | |||||
) -> float: | ) -> float: | ||||
"""Schedule the next batch of visits for the given ``visit_type``. | """Schedule the next batch of visits for the given ``visit_type``. | ||||
First, we determine the number of available slots by introspecting the RabbitMQ | First, we determine the number of available slots by introspecting the RabbitMQ | ||||
queue. | queue. | ||||
If there's fewer than :py:data:`MIN_SLOTS_RATIO` slots available in the queue, we | If there's fewer than :py:data:`MIN_SLOTS_RATIO` slots available in the queue, we | ||||
wait for :py:data:`QUEUE_FULL_BACKOFF` seconds. This avoids running the expensive | wait for :py:data:`QUEUE_FULL_BACKOFF` seconds. This avoids running the expensive | ||||
:py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` queries when | :py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` queries when | ||||
there's not many jobs to queue. | there's not many jobs to queue. | ||||
Once there's more than :py:data:`MIN_SLOTS_RATIO` slots available, we run | Once there's more than :py:data:`MIN_SLOTS_RATIO` slots available, we run | ||||
:py:func:`grab_next_visits_policy_weights` to retrieve the next set of origin visits | :py:func:`grab_next_visits_policy_weights` to retrieve the next set of origin visits | ||||
to schedule, and we send them to celery. | to schedule, and we send them to celery. | ||||
If the last scheduling attempt didn't return any origins, we sleep for | If the last scheduling attempt didn't return any origins, we sleep by default for | ||||
:py:data:`NO_ORIGINS_SCHEDULED_BACKOFF` seconds. This avoids running the expensive | :py:data:`DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF` seconds. This avoids running the expensive | ||||
:py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` queries too | :py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` queries too | ||||
often if there's nothing left to schedule. | often if there's nothing left to schedule. | ||||
The :py:data:`POLICY_CFG` argument is the policy configuration used to | The :py:data:`POLICY_CFG` argument is the policy configuration used to | ||||
choose the next origins to visit. It is passed directly to the | choose the next origins to visit. It is passed directly to the | ||||
:py:func:`grab_next_visits_policy_weights()` function. | :py:func:`grab_next_visits_policy_weights()` function. | ||||
Returns: | Returns: | ||||
Show All 19 Lines | if available_slots < min_available_slots: | ||||
return current_iteration_start + QUEUE_FULL_BACKOFF | return current_iteration_start + QUEUE_FULL_BACKOFF | ||||
origins = grab_next_visits_policy_weights( | origins = grab_next_visits_policy_weights( | ||||
scheduler, visit_type, available_slots, policy_cfg | scheduler, visit_type, available_slots, policy_cfg | ||||
) | ) | ||||
if not origins: | if not origins: | ||||
logger.debug("No origins to visit for type %s", visit_type) | logger.debug("No origins to visit for type %s", visit_type) | ||||
return current_iteration_start + NO_ORIGINS_SCHEDULED_BACKOFF | return current_iteration_start + no_origins_scheduled_backoff | ||||
# Try to smooth the ingestion load, origins pulled by different | # Try to smooth the ingestion load, origins pulled by different | ||||
# scheduling policies have different resource usage patterns | # scheduling policies have different resource usage patterns | ||||
random.shuffle(origins) | random.shuffle(origins) | ||||
for task_dict in create_origin_task_dicts(origins, scheduler): | for task_dict in create_origin_task_dicts(origins, scheduler): | ||||
app.send_task( | app.send_task( | ||||
queue_name, | queue_name, | ||||
▲ Show 20 Lines • Show All 63 Lines • ▼ Show 20 Lines | try: | ||||
logger.warn("Received unexpected message %s in command queue", msg) | logger.warn("Received unexpected message %s in command queue", msg) | ||||
next_iteration = send_visits_for_visit_type( | next_iteration = send_visits_for_visit_type( | ||||
scheduler, | scheduler, | ||||
app, | app, | ||||
visit_type, | visit_type, | ||||
task_type, | task_type, | ||||
policy_cfg.get(visit_type, policy_cfg["default"]), | policy_cfg.get(visit_type, policy_cfg["default"]), | ||||
config.get( | |||||
"no_origins_scheduled_backoff", DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF | |||||
), | |||||
) | ) | ||||
except BaseException as e: | except BaseException as e: | ||||
exc_queue.put((visit_type, e)) | exc_queue.put((visit_type, e)) | ||||
VisitSchedulerThreads = Dict[str, Tuple[Thread, Queue]] | VisitSchedulerThreads = Dict[str, Tuple[Thread, Queue]] | ||||
"""Dict storing the visit scheduler threads and their command queues""" | """Dict storing the visit scheduler threads and their command queues""" | ||||
▲ Show 20 Lines • Show All 48 Lines • Show Last 20 Lines |
should be renamed to DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF