Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/recurrent_visits.py
Show All 32 Lines | |||||
_VCS_POLICY_RATIOS = { | _VCS_POLICY_RATIOS = { | ||||
"already_visited_order_by_lag": 0.49, | "already_visited_order_by_lag": 0.49, | ||||
"never_visited_oldest_update_first": 0.49, | "never_visited_oldest_update_first": 0.49, | ||||
"origins_without_last_update": 0.02, | "origins_without_last_update": 0.02, | ||||
} | } | ||||
# Default policy ratio, let's start that configuration in the module first | |||||
POLICY_RATIO: Dict[str, Dict[str, float]] = { | POLICY_RATIO: Dict[str, Dict[str, float]] = { | ||||
"default": { | "default": { | ||||
"already_visited_order_by_lag": 0.5, | "already_visited_order_by_lag": 0.5, | ||||
"never_visited_oldest_update_first": 0.5, | "never_visited_oldest_update_first": 0.5, | ||||
}, | }, | ||||
"git": _VCS_POLICY_RATIOS, | "git": _VCS_POLICY_RATIOS, | ||||
"hg": _VCS_POLICY_RATIOS, | "hg": _VCS_POLICY_RATIOS, | ||||
"svn": _VCS_POLICY_RATIOS, | "svn": _VCS_POLICY_RATIOS, | ||||
"cvs": _VCS_POLICY_RATIOS, | "cvs": _VCS_POLICY_RATIOS, | ||||
"bzr": _VCS_POLICY_RATIOS, | "bzr": _VCS_POLICY_RATIOS, | ||||
} | } | ||||
"""Scheduling policies to use to retrieve visits for the given visit types, with the | |||||
respective ratios""" | |||||
MIN_SLOTS_RATIO = 0.05 | MIN_SLOTS_RATIO = 0.05 | ||||
"""Quantity of slots that need to be available (with respect to max_queue_length) for | """Quantity of slots that need to be available (with respect to max_queue_length) for | ||||
`grab_next_visits` to trigger""" | :py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` to trigger""" | ||||
QUEUE_FULL_BACKOFF = 60 | QUEUE_FULL_BACKOFF = 60 | ||||
"""Backoff time (in seconds) if there's fewer than `MIN_SLOTS_RATIO` slots available in | """Backoff time (in seconds) if there's fewer than :py:data:`MIN_SLOTS_RATIO` slots | ||||
the queue.""" | available in the queue.""" | ||||
NO_ORIGINS_SCHEDULED_BACKOFF = 20 * 60 | NO_ORIGINS_SCHEDULED_BACKOFF = 20 * 60 | ||||
"""Backoff time (in seconds) if no origins have been scheduled in the current | """Backoff time (in seconds) if no origins have been scheduled in the current | ||||
iteration""" | iteration""" | ||||
BACKOFF_SPLAY = 5.0 | BACKOFF_SPLAY = 5.0 | ||||
"""Amplitude of the fuzziness between backoffs""" | """Amplitude of the fuzziness between backoffs""" | ||||
TERMINATE = object() | TERMINATE = object() | ||||
"""Termination request received from command queue (singleton used for identity | """Termination request received from command queue (singleton used for identity | ||||
comparison)""" | comparison)""" | ||||
def grab_next_visits_policy_ratio( | def grab_next_visits_policy_ratio( | ||||
scheduler: SchedulerInterface, visit_type: str, num_visits: int | scheduler: SchedulerInterface, visit_type: str, num_visits: int | ||||
) -> List[ListedOrigin]: | ) -> List[ListedOrigin]: | ||||
"""Get the next `num_visits` for the given `visit_type` using the corresponding | """Get the next ``num_visits`` for the given ``visit_type`` using the corresponding | ||||
set of scheduling policies. | set of scheduling policies. | ||||
The `POLICY_RATIO` dict sets, for each visit type, the scheduling policies | The :py:data:`POLICY_RATIO` dict sets, for each visit type, the scheduling policies | ||||
used to pull the next tasks, and what proportion of the available num_visits | used to pull the next tasks, and what proportion of the available num_visits | ||||
they take. | they take. | ||||
This function emits a warning if the ratio of retrieved origins is off of | This function emits a warning if the ratio of retrieved origins is off of | ||||
the requested ratio by more than 5%. | the requested ratio by more than 5%. | ||||
Returns: | Returns: | ||||
at most `num_visits` `ListedOrigin` objects | at most ``num_visits`` :py:class:`~swh.scheduler.model.ListedOrigin` objects | ||||
vlorentz: typo?
same comment below | |||||
Done Inline ActionsNope, that's not a typo. This makes sphinx shorten the link text to just the class name, instead of the fully qualified class name. olasd: Nope, that's not a typo. This makes sphinx shorten the link text to just the class name… | |||||
""" | """ | ||||
policy_ratio = POLICY_RATIO.get(visit_type, POLICY_RATIO["default"]) | policy_ratio = POLICY_RATIO.get(visit_type, POLICY_RATIO["default"]) | ||||
fetched_origins: Dict[str, List[ListedOrigin]] = {} | fetched_origins: Dict[str, List[ListedOrigin]] = {} | ||||
for policy, ratio in policy_ratio.items(): | for policy, ratio in policy_ratio.items(): | ||||
num_tasks_to_send = int(num_visits * ratio) | num_tasks_to_send = int(num_visits * ratio) | ||||
fetched_origins[policy] = scheduler.grab_next_visits( | fetched_origins[policy] = scheduler.grab_next_visits( | ||||
Show All 32 Lines | def splay(): | ||||
"""Return a random short interval by which to vary the backoffs for the visit | """Return a random short interval by which to vary the backoffs for the visit | ||||
scheduling threads""" | scheduling threads""" | ||||
return random.uniform(0, BACKOFF_SPLAY) | return random.uniform(0, BACKOFF_SPLAY) | ||||
def send_visits_for_visit_type( | def send_visits_for_visit_type( | ||||
scheduler: SchedulerInterface, app, visit_type: str, task_type: Dict, | scheduler: SchedulerInterface, app, visit_type: str, task_type: Dict, | ||||
) -> float: | ) -> float: | ||||
"""Schedule the next batch of visits for the given `visit_type`. | """Schedule the next batch of visits for the given ``visit_type``. | ||||
First, we determine the number of available slots by introspecting the | First, we determine the number of available slots by introspecting the RabbitMQ | ||||
RabbitMQ queue. | queue. | ||||
If there's fewer than `MIN_SLOTS_RATIO` slots available in the queue, we wait | If there's fewer than :py:data:`MIN_SLOTS_RATIO` slots available in the queue, we | ||||
for `QUEUE_FULL_BACKOFF` seconds. This avoids running the expensive | wait for :py:data:`QUEUE_FULL_BACKOFF` seconds. This avoids running the expensive | ||||
`grab_next_visits` queries when there's not many jobs to queue. | :py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` queries when | ||||
there's not many jobs to queue. | |||||
Once there's more than `MIN_SLOTS_RATIO` slots available, we run | |||||
`get_next_visits` to retrieve the next set of origin visits to schedule, and | Once there's more than :py:data:`MIN_SLOTS_RATIO` slots available, we run | ||||
we send them to celery. | :py:func:`grab_next_visits_policy_ratio` to retrieve the next set of origin visits | ||||
to schedule, and we send them to celery. | |||||
If the last scheduling attempt didn't return any origins, we sleep for | If the last scheduling attempt didn't return any origins, we sleep for | ||||
`NO_ORIGINS_SCHEDULED_BACKOFF` seconds. This avoids running the expensive | :py:data:`NO_ORIGINS_SCHEDULED_BACKOFF` seconds. This avoids running the expensive | ||||
`grab_next_visits` queries too often if there's nothing left to schedule. | :py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` queries too | ||||
often if there's nothing left to schedule. | |||||
Returns: | Returns: | ||||
the earliest `time.monotonic` value at which to run the next iteration of | the earliest :py:func:`time.monotonic` value at which to run the next iteration | ||||
the loop. | of the loop. | ||||
""" | """ | ||||
queue_name = task_type["backend_name"] | queue_name = task_type["backend_name"] | ||||
max_queue_length = task_type.get("max_queue_length") or 0 | max_queue_length = task_type.get("max_queue_length") or 0 | ||||
min_available_slots = max_queue_length * MIN_SLOTS_RATIO | min_available_slots = max_queue_length * MIN_SLOTS_RATIO | ||||
current_iteration_start = time.monotonic() | current_iteration_start = time.monotonic() | ||||
Show All 37 Lines | |||||
def visit_scheduler_thread( | def visit_scheduler_thread( | ||||
config: Dict, | config: Dict, | ||||
visit_type: str, | visit_type: str, | ||||
command_queue: Queue[object], | command_queue: Queue[object], | ||||
exc_queue: Queue[Tuple[str, BaseException]], | exc_queue: Queue[Tuple[str, BaseException]], | ||||
): | ): | ||||
"""Target function for the visit sending thread, which initializes local | """Target function for the visit sending thread, which initializes local connections | ||||
connections and handles exceptions by sending them back to the main thread.""" | and handles exceptions by sending them back to the main thread.""" | ||||
from swh.scheduler import get_scheduler | from swh.scheduler import get_scheduler | ||||
from swh.scheduler.celery_backend.config import build_app | from swh.scheduler.celery_backend.config import build_app | ||||
try: | try: | ||||
# We need to reinitialize these connections because they're not generally | # We need to reinitialize these connections because they're not generally | ||||
# thread-safe | # thread-safe | ||||
app = build_app(config.get("celery")) | app = build_app(config.get("celery")) | ||||
Show All 33 Lines | |||||
def spawn_visit_scheduler_thread( | def spawn_visit_scheduler_thread( | ||||
threads: VisitSchedulerThreads, | threads: VisitSchedulerThreads, | ||||
exc_queue: Queue[Tuple[str, BaseException]], | exc_queue: Queue[Tuple[str, BaseException]], | ||||
config: Dict[str, Any], | config: Dict[str, Any], | ||||
visit_type: str, | visit_type: str, | ||||
): | ): | ||||
"""Spawn a new thread to schedule the visits of type `visit_type`.""" | """Spawn a new thread to schedule the visits of type ``visit_type``.""" | ||||
command_queue: Queue[object] = Queue() | command_queue: Queue[object] = Queue() | ||||
thread = Thread( | thread = Thread( | ||||
target=visit_scheduler_thread, | target=visit_scheduler_thread, | ||||
kwargs={ | kwargs={ | ||||
"config": config, | "config": config, | ||||
"visit_type": visit_type, | "visit_type": visit_type, | ||||
"command_queue": command_queue, | "command_queue": command_queue, | ||||
"exc_queue": exc_queue, | "exc_queue": exc_queue, | ||||
Show All 31 Lines |
typo?
same comment below