diff --git a/swh/scheduler/celery_backend/recurrent_visits.py b/swh/scheduler/celery_backend/recurrent_visits.py index 04d2045..823c586 100644 --- a/swh/scheduler/celery_backend/recurrent_visits.py +++ b/swh/scheduler/celery_backend/recurrent_visits.py @@ -1,301 +1,303 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This schedules the recurrent visits, for listed origins, in Celery. For "oneshot" (save code now, lister) tasks, check the :mod:`swh.scheduler.celery_backend.runner` and :mod:`swh.scheduler.celery_backend.pika_listener` modules. """ from __future__ import annotations from itertools import chain import logging from queue import Empty, Queue import random from threading import Thread import time from typing import TYPE_CHECKING, Any, Dict, List, Tuple from kombu.utils.uuid import uuid from swh.scheduler.celery_backend.config import get_available_slots if TYPE_CHECKING: from ..interface import SchedulerInterface from ..model import ListedOrigin logger = logging.getLogger(__name__) _VCS_POLICY_RATIOS = { "already_visited_order_by_lag": 0.49, "never_visited_oldest_update_first": 0.49, "origins_without_last_update": 0.02, } -# Default policy ratio, let's start that configuration in the module first POLICY_RATIO: Dict[str, Dict[str, float]] = { "default": { "already_visited_order_by_lag": 0.5, "never_visited_oldest_update_first": 0.5, }, "git": _VCS_POLICY_RATIOS, "hg": _VCS_POLICY_RATIOS, "svn": _VCS_POLICY_RATIOS, "cvs": _VCS_POLICY_RATIOS, "bzr": _VCS_POLICY_RATIOS, } - +"""Scheduling policies to use to retrieve visits for the given visit types, with the +respective ratios""" MIN_SLOTS_RATIO = 0.05 """Quantity of slots that need to be available (with respect to max_queue_length) for -`grab_next_visits` to trigger""" +:py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` to trigger""" QUEUE_FULL_BACKOFF = 60 -"""Backoff time (in seconds) if there's fewer than `MIN_SLOTS_RATIO` slots available in -the queue.""" +"""Backoff time (in seconds) if there's fewer than :py:data:`MIN_SLOTS_RATIO` slots +available in the queue.""" NO_ORIGINS_SCHEDULED_BACKOFF = 20 * 60 """Backoff time (in seconds) if no origins have been scheduled in the current iteration""" BACKOFF_SPLAY = 5.0 """Amplitude of the fuzziness between backoffs""" TERMINATE = object() """Termination request received from command queue (singleton used for identity comparison)""" def grab_next_visits_policy_ratio( scheduler: SchedulerInterface, visit_type: str, num_visits: int ) -> List[ListedOrigin]: - """Get the next `num_visits` for the given `visit_type` using the corresponding + """Get the next ``num_visits`` for the given ``visit_type`` using the corresponding set of scheduling policies. - The `POLICY_RATIO` dict sets, for each visit type, the scheduling policies + The :py:data:`POLICY_RATIO` dict sets, for each visit type, the scheduling policies used to pull the next tasks, and what proportion of the available num_visits they take. This function emits a warning if the ratio of retrieved origins is off of the requested ratio by more than 5%. Returns: - at most `num_visits` `ListedOrigin` objects + at most ``num_visits`` :py:class:`~swh.scheduler.model.ListedOrigin` objects """ policy_ratio = POLICY_RATIO.get(visit_type, POLICY_RATIO["default"]) fetched_origins: Dict[str, List[ListedOrigin]] = {} for policy, ratio in policy_ratio.items(): num_tasks_to_send = int(num_visits * ratio) fetched_origins[policy] = scheduler.grab_next_visits( visit_type, num_tasks_to_send, policy=policy ) all_origins: List[ListedOrigin] = list( chain.from_iterable(fetched_origins.values()) ) if not all_origins: return [] # Check whether the ratios of origins fetched are skewed with respect to the # ones we requested fetched_origin_ratios = { policy: len(origins) / len(all_origins) for policy, origins in fetched_origins.items() } for policy, expected_ratio in policy_ratio.items(): # 5% of skew with respect to request if abs(fetched_origin_ratios[policy] - expected_ratio) / expected_ratio > 0.05: logger.info( "Skewed fetch for visit type %s with policy %s: fetched %s, " "requested %s", visit_type, policy, fetched_origin_ratios[policy], expected_ratio, ) return all_origins def splay(): """Return a random short interval by which to vary the backoffs for the visit scheduling threads""" return random.uniform(0, BACKOFF_SPLAY) def send_visits_for_visit_type( scheduler: SchedulerInterface, app, visit_type: str, task_type: Dict, ) -> float: - """Schedule the next batch of visits for the given `visit_type`. + """Schedule the next batch of visits for the given ``visit_type``. - First, we determine the number of available slots by introspecting the - RabbitMQ queue. + First, we determine the number of available slots by introspecting the RabbitMQ + queue. - If there's fewer than `MIN_SLOTS_RATIO` slots available in the queue, we wait - for `QUEUE_FULL_BACKOFF` seconds. This avoids running the expensive - `grab_next_visits` queries when there's not many jobs to queue. + If there's fewer than :py:data:`MIN_SLOTS_RATIO` slots available in the queue, we + wait for :py:data:`QUEUE_FULL_BACKOFF` seconds. This avoids running the expensive + :py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` queries when + there's not many jobs to queue. - Once there's more than `MIN_SLOTS_RATIO` slots available, we run - `get_next_visits` to retrieve the next set of origin visits to schedule, and - we send them to celery. + Once there's more than :py:data:`MIN_SLOTS_RATIO` slots available, we run + :py:func:`grab_next_visits_policy_ratio` to retrieve the next set of origin visits + to schedule, and we send them to celery. If the last scheduling attempt didn't return any origins, we sleep for - `NO_ORIGINS_SCHEDULED_BACKOFF` seconds. This avoids running the expensive - `grab_next_visits` queries too often if there's nothing left to schedule. + :py:data:`NO_ORIGINS_SCHEDULED_BACKOFF` seconds. This avoids running the expensive + :py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` queries too + often if there's nothing left to schedule. Returns: - the earliest `time.monotonic` value at which to run the next iteration of - the loop. + the earliest :py:func:`time.monotonic` value at which to run the next iteration + of the loop. """ queue_name = task_type["backend_name"] max_queue_length = task_type.get("max_queue_length") or 0 min_available_slots = max_queue_length * MIN_SLOTS_RATIO current_iteration_start = time.monotonic() # Check queue level available_slots = get_available_slots(app, queue_name, max_queue_length) logger.debug( "%s available slots for visit type %s in queue %s", available_slots, visit_type, queue_name, ) if available_slots < min_available_slots: return current_iteration_start + QUEUE_FULL_BACKOFF origins = grab_next_visits_policy_ratio(scheduler, visit_type, available_slots) if not origins: logger.debug("No origins to visit for type %s", visit_type) return current_iteration_start + NO_ORIGINS_SCHEDULED_BACKOFF # Try to smooth the ingestion load, origins pulled by different # scheduling policies have different resource usage patterns random.shuffle(origins) for origin in origins: task_dict = origin.as_task_dict() app.send_task( queue_name, task_id=uuid(), args=task_dict["arguments"]["args"], kwargs=task_dict["arguments"]["kwargs"], queue=queue_name, ) logger.info( "%s: %s visits scheduled in queue %s", visit_type, len(origins), queue_name, ) # When everything worked, we can try to schedule origins again ASAP. return time.monotonic() def visit_scheduler_thread( config: Dict, visit_type: str, command_queue: Queue[object], exc_queue: Queue[Tuple[str, BaseException]], ): - """Target function for the visit sending thread, which initializes local - connections and handles exceptions by sending them back to the main thread.""" + """Target function for the visit sending thread, which initializes local connections + and handles exceptions by sending them back to the main thread.""" from swh.scheduler import get_scheduler from swh.scheduler.celery_backend.config import build_app try: # We need to reinitialize these connections because they're not generally # thread-safe app = build_app(config.get("celery")) scheduler = get_scheduler(**config["scheduler"]) task_type = scheduler.get_task_type(f"load-{visit_type}") if task_type is None: raise ValueError(f"Unknown task type: load-{visit_type}") next_iteration = time.monotonic() while True: # vary the next iteration time a little bit next_iteration = next_iteration + splay() while time.monotonic() < next_iteration: # Wait for next iteration to start. Listen for termination message. try: msg = command_queue.get(block=True, timeout=1) except Empty: continue if msg is TERMINATE: return else: logger.warn("Received unexpected message %s in command queue", msg) next_iteration = send_visits_for_visit_type( scheduler, app, visit_type, task_type ) except BaseException as e: exc_queue.put((visit_type, e)) VisitSchedulerThreads = Dict[str, Tuple[Thread, Queue]] """Dict storing the visit scheduler threads and their command queues""" def spawn_visit_scheduler_thread( threads: VisitSchedulerThreads, exc_queue: Queue[Tuple[str, BaseException]], config: Dict[str, Any], visit_type: str, ): - """Spawn a new thread to schedule the visits of type `visit_type`.""" + """Spawn a new thread to schedule the visits of type ``visit_type``.""" command_queue: Queue[object] = Queue() thread = Thread( target=visit_scheduler_thread, kwargs={ "config": config, "visit_type": visit_type, "command_queue": command_queue, "exc_queue": exc_queue, }, ) threads[visit_type] = (thread, command_queue) thread.start() def terminate_visit_scheduler_threads(threads: VisitSchedulerThreads) -> List[str]: """Terminate all visit scheduler threads""" logger.info("Termination requested...") for _, command_queue in threads.values(): command_queue.put(TERMINATE) loops = 0 while threads and loops < 10: logger.info( "Terminating visit scheduling threads: %s", ", ".join(sorted(threads)) ) loops += 1 for visit_type, (thread, _) in list(threads.items()): thread.join(timeout=1) if not thread.is_alive(): logger.debug("Thread %s terminated", visit_type) del threads[visit_type] if threads: logger.warn( "Could not reap the following threads after 10 attempts: %s", ", ".join(sorted(threads)), ) return list(sorted(threads))