Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/recurrent_visits.py
- This file was added.
# Copyright (C) 2021 The Software Heritage developers | ||||||||||
# See the AUTHORS file at the top-level directory of this distribution | ||||||||||
# License: GNU General Public License version 3, or any later version | ||||||||||
# See top-level LICENSE file for more information | ||||||||||
"""This schedules the recurrent visits, for listed origins, in Celery. | ||||||||||
For "oneshot" (save code now, lister) tasks, check the | ||||||||||
:mod:`swh.scheduler.celery_backend.runner` and | ||||||||||
:mod:`swh.scheduler.celery_backend.pika_listener` modules. | ||||||||||
""" | ||||||||||
from __future__ import annotations | ||||||||||
import logging | ||||||||||
from queue import Empty, Queue | ||||||||||
import random | ||||||||||
from threading import Thread | ||||||||||
import time | ||||||||||
from typing import TYPE_CHECKING, Any, Dict, List, Tuple | ||||||||||
from kombu.utils.uuid import uuid | ||||||||||
from swh.scheduler.celery_backend.config import get_available_slots | ||||||||||
if TYPE_CHECKING: | ||||||||||
from ..interface import SchedulerInterface | ||||||||||
from ..model import ListedOrigin | ||||||||||
logger = logging.getLogger(__name__) | ||||||||||
_VCS_POLICY_RATIOS = { | ||||||||||
"already_visited_order_by_lag": 0.49, | ||||||||||
"never_visited_oldest_update_first": 0.49, | ||||||||||
"origins_without_last_update": 0.02, | ||||||||||
} | ||||||||||
# Default policy ratio, let's start that configuration in the module first | ||||||||||
POLICY_RATIO: Dict[str, Dict[str, float]] = { | ||||||||||
"default": { | ||||||||||
"already_visited_order_by_lag": 0.5, | ||||||||||
"never_visited_oldest_update_first": 0.5, | ||||||||||
}, | ||||||||||
"git": _VCS_POLICY_RATIOS, | ||||||||||
"hg": _VCS_POLICY_RATIOS, | ||||||||||
"svn": _VCS_POLICY_RATIOS, | ||||||||||
"cvs": _VCS_POLICY_RATIOS, | ||||||||||
"bzr": _VCS_POLICY_RATIOS, | ||||||||||
} | ||||||||||
def grab_next_visits_policy_ratio( | ||||||||||
scheduler: SchedulerInterface, visit_type: str, num_visits: int | ||||||||||
) -> List[ListedOrigin]: | ||||||||||
"""Get the next `num_visits` for the given `visit_type` using the corresponding | ||||||||||
set of scheduling policies. | ||||||||||
The `POLICY_RATIO` dict sets, for each visit type, the scheduling policies | ||||||||||
used to pull the next tasks, and what proportion of the available num_visits | ||||||||||
they take. | ||||||||||
This function emits a warning if the ratio of retrieved origins is off of | ||||||||||
the requested ratio by more than 5%. | ||||||||||
Returns: | ||||||||||
at most `num_visits` `ListedOrigin` objects | ||||||||||
""" | ||||||||||
policy_ratio = POLICY_RATIO.get(visit_type, POLICY_RATIO["default"]) | ||||||||||
fetched_origins: Dict[str, List[ListedOrigin]] = {} | ||||||||||
for policy, ratio in policy_ratio.items(): | ||||||||||
num_tasks_to_send = int(num_visits * ratio) | ||||||||||
fetched_origins[policy] = scheduler.grab_next_visits( | ||||||||||
visit_type, num_tasks_to_send, policy=policy | ||||||||||
) | ||||||||||
all_origins: List[ListedOrigin] = sum(fetched_origins.values(), []) | ||||||||||
vlorentz: using sum() on a list of lists copies the accumulator over and over, causing the operation to… | ||||||||||
if not all_origins: | ||||||||||
return [] | ||||||||||
# Check whether the ratios of origins fetched are skewed with respect to the | ||||||||||
# ones we requested | ||||||||||
fetched_origin_ratios = { | ||||||||||
policy: len(origins) / len(all_origins) | ||||||||||
for policy, origins in fetched_origins.items() | ||||||||||
} | ||||||||||
for policy, expected_ratio in policy_ratio.items(): | ||||||||||
# 5% of skew with respect to request | ||||||||||
if abs(fetched_origin_ratios[policy] - expected_ratio) / expected_ratio > 0.05: | ||||||||||
logger.info( | ||||||||||
"Skewed fetch for visit type %s with policy %s: fetched %s, " | ||||||||||
"requested %s", | ||||||||||
visit_type, | ||||||||||
policy, | ||||||||||
fetched_origin_ratios[policy], | ||||||||||
expected_ratio, | ||||||||||
) | ||||||||||
return all_origins | ||||||||||
MIN_SLOTS_RATIO = 0.05 | ||||||||||
"""Quantity of slots that need to be available (with respect to max_queue_length) for | ||||||||||
`grab_next_visits` to trigger""" | ||||||||||
QUEUE_FULL_BACKOFF = 60 | ||||||||||
"""Backoff time (in seconds) if there's fewer than `MIN_SLOTS_RATIO` slots available in | ||||||||||
the queue.""" | ||||||||||
NO_ORIGINS_SCHEDULED_BACKOFF = 20 * 60 | ||||||||||
"""Backoff time (in seconds) if no origins have been scheduled in the current | ||||||||||
iteration""" | ||||||||||
BACKOFF_SPLAY = 5.0 | ||||||||||
"""Amplitude of the fuzziness between backoffs""" | ||||||||||
TERMINATE = object() | ||||||||||
"""Termination request received from command queue""" | ||||||||||
Done Inline Actionsmove constants to the top of the file. It should be mentioned in TERMINATE's docstring that it's a singleton used for identity comparison. vlorentz: move constants to the top of the file.
It should be mentioned in `TERMINATE`'s docstring that… | ||||||||||
def splay(): | ||||||||||
"""Return a random short interval by which to vary the backoffs for the visit | ||||||||||
scheduling threads""" | ||||||||||
return random.uniform(0, BACKOFF_SPLAY) | ||||||||||
def send_visits_for_visit_type( | ||||||||||
scheduler: SchedulerInterface, app, visit_type: str, task_type: Dict, | ||||||||||
) -> float: | ||||||||||
"""Schedule the next batch of visits for the given `visit_type`. | ||||||||||
First, we determine the number of available slots by introspecting the | ||||||||||
RabbitMQ queue. | ||||||||||
If there's fewer than `MIN_SLOTS_RATIO` slots available in the queue, we wait | ||||||||||
for `QUEUE_FULL_BACKOFF` seconds. This avoids running the expensive | ||||||||||
`grab_next_visits` queries when there's not many jobs to queue. | ||||||||||
Once there's more than `MIN_SLOTS_RATIO` slots available, we run | ||||||||||
`get_next_visits` to retrieve the next set of origin visits to schedule, and | ||||||||||
we send them to celery. | ||||||||||
If the last scheduling attempt didn't return any origins, we sleep for | ||||||||||
`NO_ORIGINS_SCHEDULED_BACKOFF` seconds. This avoids running the expensive | ||||||||||
`grab_next_visits` queries too often if there's nothing left to schedule. | ||||||||||
Returns: the earliest :py:`time.monotonic` value at which to run the next | ||||||||||
iteration of the loop. | ||||||||||
""" | ||||||||||
queue_name = task_type["backend_name"] | ||||||||||
max_queue_length = task_type.get("max_queue_length") or 0 | ||||||||||
min_available_slots = max_queue_length * MIN_SLOTS_RATIO | ||||||||||
current_iteration_start = time.monotonic() | ||||||||||
# Check queue level | ||||||||||
available_slots = get_available_slots(app, queue_name, max_queue_length) | ||||||||||
logger.debug( | ||||||||||
"%s available slots for visit type %s in queue %s", | ||||||||||
available_slots, | ||||||||||
visit_type, | ||||||||||
queue_name, | ||||||||||
) | ||||||||||
if available_slots < min_available_slots: | ||||||||||
return current_iteration_start + QUEUE_FULL_BACKOFF | ||||||||||
origins = grab_next_visits_policy_ratio(scheduler, visit_type, available_slots) | ||||||||||
if not origins: | ||||||||||
logger.debug("No origins to visit for type %s", visit_type) | ||||||||||
return current_iteration_start + NO_ORIGINS_SCHEDULED_BACKOFF | ||||||||||
# Try to smooth the ingestion load, origins pulled by different | ||||||||||
# scheduling policies have different resource usage patterns | ||||||||||
random.shuffle(origins) | ||||||||||
for origin in origins: | ||||||||||
task_dict = origin.as_task_dict() | ||||||||||
app.send_task( | ||||||||||
queue_name, | ||||||||||
task_id=uuid(), | ||||||||||
args=task_dict["arguments"]["args"], | ||||||||||
kwargs=task_dict["arguments"]["kwargs"], | ||||||||||
queue=queue_name, | ||||||||||
) | ||||||||||
logger.info( | ||||||||||
"%s: %s visits scheduled in queue %s", visit_type, len(origins), queue_name, | ||||||||||
) | ||||||||||
# When everything worked, we can try to schedule origins again ASAP. | ||||||||||
return time.monotonic() | ||||||||||
def visit_scheduler_thread( | ||||||||||
config: Dict, | ||||||||||
visit_type: str, | ||||||||||
command_queue: Queue[object], | ||||||||||
exc_queue: Queue[Tuple[str, BaseException]], | ||||||||||
): | ||||||||||
"""Target function for the visit sending thread, which initializes local | ||||||||||
connections and handles exceptions by sending them back to the main thread.""" | ||||||||||
from swh.scheduler import get_scheduler | ||||||||||
from swh.scheduler.celery_backend.config import build_app | ||||||||||
try: | ||||||||||
# We need to reinitialize these connections because they're not generally | ||||||||||
# thread-safe | ||||||||||
app = build_app(config.get("celery")) | ||||||||||
scheduler = get_scheduler(**config["scheduler"]) | ||||||||||
task_type = scheduler.get_task_type(f"load-{visit_type}") | ||||||||||
if task_type is None: | ||||||||||
raise ValueError(f"Unknown task type: load-{visit_type}") | ||||||||||
next_iteration = time.monotonic() | ||||||||||
while True: | ||||||||||
# vary the next iteration time a little bit | ||||||||||
next_iteration = next_iteration + splay() | ||||||||||
while time.monotonic() < next_iteration: | ||||||||||
# Wait for next iteration to start. Listen for termination message. | ||||||||||
try: | ||||||||||
msg = command_queue.get(block=True, timeout=1) | ||||||||||
except Empty: | ||||||||||
continue | ||||||||||
if msg is TERMINATE: | ||||||||||
return | ||||||||||
else: | ||||||||||
logger.warn("Received unexpected message %s in command queue", msg) | ||||||||||
next_iteration = send_visits_for_visit_type( | ||||||||||
scheduler, app, visit_type, task_type | ||||||||||
) | ||||||||||
except BaseException as e: | ||||||||||
exc_queue.put((visit_type, e)) | ||||||||||
VisitSchedulerThreads = Dict[str, Tuple[Thread, Queue]] | ||||||||||
"""Dict storing the visit scheduler threads and their command queues""" | ||||||||||
def spawn_visit_scheduler_thread( | ||||||||||
threads: VisitSchedulerThreads, | ||||||||||
exc_queue: Queue[Tuple[str, BaseException]], | ||||||||||
config: Dict[str, Any], | ||||||||||
visit_type: str, | ||||||||||
): | ||||||||||
"""Spawn a new thread to schedule the visits of type `visit_type`.""" | ||||||||||
command_queue: Queue[object] = Queue() | ||||||||||
thread = Thread( | ||||||||||
target=visit_scheduler_thread, | ||||||||||
kwargs={ | ||||||||||
"config": config, | ||||||||||
"visit_type": visit_type, | ||||||||||
"command_queue": command_queue, | ||||||||||
"exc_queue": exc_queue, | ||||||||||
}, | ||||||||||
) | ||||||||||
threads[visit_type] = (thread, command_queue) | ||||||||||
thread.start() | ||||||||||
def terminate_visit_scheduler_threads(threads: VisitSchedulerThreads) -> List[str]: | ||||||||||
"""Terminate all visit scheduler threads""" | ||||||||||
logger.info("Termination requested...") | ||||||||||
for _, command_queue in threads.values(): | ||||||||||
command_queue.put(TERMINATE) | ||||||||||
loops = 0 | ||||||||||
while threads and loops < 10: | ||||||||||
logger.info( | ||||||||||
"Terminating visit scheduling threads: %s", ", ".join(sorted(threads)) | ||||||||||
) | ||||||||||
loops += 1 | ||||||||||
for visit_type in list(threads): | ||||||||||
thread, _ = threads[visit_type] | ||||||||||
thread.join(timeout=1) | ||||||||||
Done Inline Actions
vlorentz: | ||||||||||
if not thread.is_alive(): | ||||||||||
logger.debug("Thread %s terminated", visit_type) | ||||||||||
del threads[visit_type] | ||||||||||
if threads: | ||||||||||
logger.warn( | ||||||||||
"Could not reap the following threads after 10 attempts: %s", | ||||||||||
", ".join(sorted(threads)), | ||||||||||
) | ||||||||||
return list(sorted(threads)) |
using sum() on a list of lists copies the accumulator over and over, causing the operation to be quadratic instead of linear