diff --git a/swh/scheduler/celery_backend/recurrent_visits.py b/swh/scheduler/celery_backend/recurrent_visits.py index 8f43d8e..2a9ebe6 100644 --- a/swh/scheduler/celery_backend/recurrent_visits.py +++ b/swh/scheduler/celery_backend/recurrent_visits.py @@ -1,311 +1,323 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This schedules the recurrent visits, for listed origins, in Celery. For "oneshot" (save code now, lister) tasks, check the :mod:`swh.scheduler.celery_backend.runner` and :mod:`swh.scheduler.celery_backend.pika_listener` modules. """ from __future__ import annotations from itertools import chain import logging from queue import Empty, Queue import random from threading import Thread import time from typing import TYPE_CHECKING, Any, Dict, List, Tuple from kombu.utils.uuid import uuid from swh.scheduler.celery_backend.config import get_available_slots if TYPE_CHECKING: from ..interface import SchedulerInterface from ..model import ListedOrigin logger = logging.getLogger(__name__) _VCS_POLICY_WEIGHTS: Dict[str, float] = { "already_visited_order_by_lag": 49, "never_visited_oldest_update_first": 49, "origins_without_last_update": 2, } POLICY_WEIGHTS: Dict[str, Dict[str, float]] = { "default": { "already_visited_order_by_lag": 50, "never_visited_oldest_update_first": 50, }, "git": _VCS_POLICY_WEIGHTS, "hg": _VCS_POLICY_WEIGHTS, "svn": _VCS_POLICY_WEIGHTS, "cvs": _VCS_POLICY_WEIGHTS, "bzr": _VCS_POLICY_WEIGHTS, } + +POLICY_ADDITIONAL_PARAMETERS: Dict[str, Dict[str, Any]] = { + "git": { + "already_visited_order_by_lag": {"tablesample": 0.1}, + "never_visited_oldest_update_first": {"tablesample": 0.1}, + "origins_without_last_update": {"tablesample": 0.1}, + } +} + """Scheduling policies to use to retrieve visits for the given visit types, with their relative weights""" MIN_SLOTS_RATIO = 0.05 """Quantity of slots that need to be available (with respect to max_queue_length) for :py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` to trigger""" QUEUE_FULL_BACKOFF = 60 """Backoff time (in seconds) if there's fewer than :py:data:`MIN_SLOTS_RATIO` slots available in the queue.""" NO_ORIGINS_SCHEDULED_BACKOFF = 20 * 60 """Backoff time (in seconds) if no origins have been scheduled in the current iteration""" BACKOFF_SPLAY = 5.0 """Amplitude of the fuzziness between backoffs""" TERMINATE = object() """Termination request received from command queue (singleton used for identity comparison)""" def grab_next_visits_policy_weights( scheduler: SchedulerInterface, visit_type: str, num_visits: int ) -> List[ListedOrigin]: """Get the next ``num_visits`` for the given ``visit_type`` using the corresponding set of scheduling policies. The :py:data:`POLICY_WEIGHTS` dict sets, for each visit type, the scheduling policies used to pull the next tasks, and what proportion of the available num_visits they take. This function emits a warning if the ratio of retrieved origins is off of the requested ratio by more than 5%. Returns: at most ``num_visits`` :py:class:`~swh.scheduler.model.ListedOrigin` objects """ policy_weights = POLICY_WEIGHTS.get(visit_type, POLICY_WEIGHTS["default"]) total_weight = sum(policy_weights.values()) if not total_weight: raise ValueError(f"No policy weights set for visit type {visit_type}") policy_ratio = { policy: weight / total_weight for policy, weight in policy_weights.items() } fetched_origins: Dict[str, List[ListedOrigin]] = {} for policy, ratio in policy_ratio.items(): num_tasks_to_send = int(num_visits * ratio) fetched_origins[policy] = scheduler.grab_next_visits( - visit_type, num_tasks_to_send, policy=policy + visit_type, + num_tasks_to_send, + policy=policy, + **POLICY_ADDITIONAL_PARAMETERS.get(visit_type, {}).get(policy, {}), ) all_origins: List[ListedOrigin] = list( chain.from_iterable(fetched_origins.values()) ) if not all_origins: return [] # Check whether the ratios of origins fetched are skewed with respect to the # ones we requested fetched_origin_ratios = { policy: len(origins) / len(all_origins) for policy, origins in fetched_origins.items() } for policy, expected_ratio in policy_ratio.items(): # 5% of skew with respect to request if abs(fetched_origin_ratios[policy] - expected_ratio) / expected_ratio > 0.05: logger.info( "Skewed fetch for visit type %s with policy %s: fetched %s, " "requested %s", visit_type, policy, fetched_origin_ratios[policy], expected_ratio, ) return all_origins def splay(): """Return a random short interval by which to vary the backoffs for the visit scheduling threads""" return random.uniform(0, BACKOFF_SPLAY) def send_visits_for_visit_type( scheduler: SchedulerInterface, app, visit_type: str, task_type: Dict, ) -> float: """Schedule the next batch of visits for the given ``visit_type``. First, we determine the number of available slots by introspecting the RabbitMQ queue. If there's fewer than :py:data:`MIN_SLOTS_RATIO` slots available in the queue, we wait for :py:data:`QUEUE_FULL_BACKOFF` seconds. This avoids running the expensive :py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` queries when there's not many jobs to queue. Once there's more than :py:data:`MIN_SLOTS_RATIO` slots available, we run :py:func:`grab_next_visits_policy_weights` to retrieve the next set of origin visits to schedule, and we send them to celery. If the last scheduling attempt didn't return any origins, we sleep for :py:data:`NO_ORIGINS_SCHEDULED_BACKOFF` seconds. This avoids running the expensive :py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` queries too often if there's nothing left to schedule. Returns: the earliest :py:func:`time.monotonic` value at which to run the next iteration of the loop. """ queue_name = task_type["backend_name"] max_queue_length = task_type.get("max_queue_length") or 0 min_available_slots = max_queue_length * MIN_SLOTS_RATIO current_iteration_start = time.monotonic() # Check queue level available_slots = get_available_slots(app, queue_name, max_queue_length) logger.debug( "%s available slots for visit type %s in queue %s", available_slots, visit_type, queue_name, ) if available_slots < min_available_slots: return current_iteration_start + QUEUE_FULL_BACKOFF origins = grab_next_visits_policy_weights(scheduler, visit_type, available_slots) if not origins: logger.debug("No origins to visit for type %s", visit_type) return current_iteration_start + NO_ORIGINS_SCHEDULED_BACKOFF # Try to smooth the ingestion load, origins pulled by different # scheduling policies have different resource usage patterns random.shuffle(origins) for origin in origins: task_dict = origin.as_task_dict() app.send_task( queue_name, task_id=uuid(), args=task_dict["arguments"]["args"], kwargs=task_dict["arguments"]["kwargs"], queue=queue_name, ) logger.info( "%s: %s visits scheduled in queue %s", visit_type, len(origins), queue_name, ) # When everything worked, we can try to schedule origins again ASAP. return time.monotonic() def visit_scheduler_thread( config: Dict, visit_type: str, command_queue: Queue[object], exc_queue: Queue[Tuple[str, BaseException]], ): """Target function for the visit sending thread, which initializes local connections and handles exceptions by sending them back to the main thread.""" from swh.scheduler import get_scheduler from swh.scheduler.celery_backend.config import build_app try: # We need to reinitialize these connections because they're not generally # thread-safe app = build_app(config.get("celery")) scheduler = get_scheduler(**config["scheduler"]) task_type = scheduler.get_task_type(f"load-{visit_type}") if task_type is None: raise ValueError(f"Unknown task type: load-{visit_type}") next_iteration = time.monotonic() while True: # vary the next iteration time a little bit next_iteration = next_iteration + splay() while time.monotonic() < next_iteration: # Wait for next iteration to start. Listen for termination message. try: msg = command_queue.get(block=True, timeout=1) except Empty: continue if msg is TERMINATE: return else: logger.warn("Received unexpected message %s in command queue", msg) next_iteration = send_visits_for_visit_type( scheduler, app, visit_type, task_type ) except BaseException as e: exc_queue.put((visit_type, e)) VisitSchedulerThreads = Dict[str, Tuple[Thread, Queue]] """Dict storing the visit scheduler threads and their command queues""" def spawn_visit_scheduler_thread( threads: VisitSchedulerThreads, exc_queue: Queue[Tuple[str, BaseException]], config: Dict[str, Any], visit_type: str, ): """Spawn a new thread to schedule the visits of type ``visit_type``.""" command_queue: Queue[object] = Queue() thread = Thread( target=visit_scheduler_thread, kwargs={ "config": config, "visit_type": visit_type, "command_queue": command_queue, "exc_queue": exc_queue, }, ) threads[visit_type] = (thread, command_queue) thread.start() def terminate_visit_scheduler_threads(threads: VisitSchedulerThreads) -> List[str]: """Terminate all visit scheduler threads""" logger.info("Termination requested...") for _, command_queue in threads.values(): command_queue.put(TERMINATE) loops = 0 while threads and loops < 10: logger.info( "Terminating visit scheduling threads: %s", ", ".join(sorted(threads)) ) loops += 1 for visit_type, (thread, _) in list(threads.items()): thread.join(timeout=1) if not thread.is_alive(): logger.debug("Thread %s terminated", visit_type) del threads[visit_type] if threads: logger.warn( "Could not reap the following threads after 10 attempts: %s", ", ".join(sorted(threads)), ) return list(sorted(threads)) diff --git a/swh/scheduler/tests/test_recurrent_visits.py b/swh/scheduler/tests/test_recurrent_visits.py index d118fbf..efa8d67 100644 --- a/swh/scheduler/tests/test_recurrent_visits.py +++ b/swh/scheduler/tests/test_recurrent_visits.py @@ -1,180 +1,202 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import timedelta import logging from queue import Queue from unittest.mock import MagicMock import pytest from swh.scheduler.celery_backend.recurrent_visits import ( + POLICY_ADDITIONAL_PARAMETERS, VisitSchedulerThreads, + grab_next_visits_policy_weights, send_visits_for_visit_type, spawn_visit_scheduler_thread, terminate_visit_scheduler_threads, visit_scheduler_thread, ) from .test_cli import invoke TEST_MAX_QUEUE = 10000 MODULE_NAME = "swh.scheduler.celery_backend.recurrent_visits" def _compute_backend_name(visit_type: str) -> str: "Build a dummy reproducible backend name" return f"swh.loader.{visit_type}.tasks" @pytest.fixture def swh_scheduler(swh_scheduler): """Override default fixture of the scheduler to install some more task types.""" for visit_type in ["git", "hg", "svn"]: task_type = f"load-{visit_type}" swh_scheduler.create_task_type( { "type": task_type, "max_queue_length": TEST_MAX_QUEUE, "description": "The {} testing task".format(task_type), "backend_name": _compute_backend_name(visit_type), "default_interval": timedelta(days=1), "min_interval": timedelta(hours=6), "max_interval": timedelta(days=12), } ) return swh_scheduler def test_cli_schedule_recurrent_unknown_visit_type(swh_scheduler): """When passed an unknown visit type, the recurrent visit scheduler should refuse to start.""" with pytest.raises(ValueError, match="Unknown"): invoke( swh_scheduler, False, ["schedule-recurrent", "--visit-type", "unknown", "--visit-type", "git"], ) def test_cli_schedule_recurrent_noop(swh_scheduler, mocker): """When passing no visit types, the recurrent visit scheduler should start.""" spawn_visit_scheduler_thread = mocker.patch( f"{MODULE_NAME}.spawn_visit_scheduler_thread" ) spawn_visit_scheduler_thread.side_effect = SystemExit # The actual scheduling threads won't spawn, they'll immediately terminate. This # only exercises the logic to pull task types out of the database result = invoke(swh_scheduler, False, ["schedule-recurrent"]) assert result.exit_code == 0, result.output def test_recurrent_visit_scheduling( swh_scheduler, caplog, listed_origins_by_type, mocker, ): """Scheduling known tasks is ok.""" caplog.set_level(logging.DEBUG, MODULE_NAME) nb_origins = 1000 mock_celery_app = MagicMock() mock_available_slots = mocker.patch(f"{MODULE_NAME}.get_available_slots") mock_available_slots.return_value = nb_origins # Slots available in queue # Make sure the scheduler is properly configured in terms of visit/task types all_task_types = { task_type_d["type"]: task_type_d for task_type_d in swh_scheduler.get_task_types() } visit_types = list(listed_origins_by_type.keys()) assert len(visit_types) > 0 task_types = [] origins = [] for visit_type, _origins in listed_origins_by_type.items(): origins.extend(swh_scheduler.record_listed_origins(_origins)) task_type_name = f"load-{visit_type}" assert task_type_name in all_task_types.keys() task_type = all_task_types[task_type_name] task_type["visit_type"] = visit_type # we'll limit the orchestrator to the origins' type we know task_types.append(task_type) for visit_type in ["git", "svn"]: task_type = f"load-{visit_type}" send_visits_for_visit_type( swh_scheduler, mock_celery_app, visit_type, all_task_types[task_type] ) assert mock_available_slots.called, "The available slots functions should be called" records = [record.message for record in caplog.records] # Mapping over the dict ratio/policies entries can change overall order so let's # check the set of records expected_records = set() for task_type in task_types: visit_type = task_type["visit_type"] queue_name = task_type["backend_name"] msg = ( f"{nb_origins} available slots for visit type {visit_type} " f"in queue {queue_name}" ) expected_records.add(msg) for expected_record in expected_records: assert expected_record in set(records) +@pytest.mark.parametrize( + "visit_type, tablesamples", + [("hg", {}), ("git", POLICY_ADDITIONAL_PARAMETERS["git"])], +) +def test_recurrent_visit_additional_parameters( + swh_scheduler, mocker, visit_type, tablesamples +): + """Testing additional policy parameters""" + + mock_grab_next_visits = mocker.patch.object(swh_scheduler, "grab_next_visits") + mock_grab_next_visits.return_value = [] + + grab_next_visits_policy_weights(swh_scheduler, visit_type, 10) + + for call in mock_grab_next_visits.call_args_list: + assert call[1].get("tablesample") == tablesamples.get( + call[1]["policy"], {} + ).get("tablesample") + + @pytest.fixture def scheduler_config(swh_scheduler_config): return {"scheduler": {"cls": "local", **swh_scheduler_config}, "celery": {}} def test_visit_scheduler_thread_unknown_task( swh_scheduler, scheduler_config, ): """Starting a thread with unknown task type reports the error""" unknown_visit_type = "unknown" command_queue = Queue() exc_queue = Queue() visit_scheduler_thread( scheduler_config, unknown_visit_type, command_queue, exc_queue ) assert command_queue.empty() is True assert exc_queue.empty() is False assert len(exc_queue.queue) == 1 result = exc_queue.queue.pop() assert result[0] == unknown_visit_type assert isinstance(result[1], ValueError) def test_spawn_visit_scheduler_thread_noop(scheduler_config, visit_types, mocker): """Spawning and terminating threads runs smoothly""" threads: VisitSchedulerThreads = {} exc_queue = Queue() mock_build_app = mocker.patch("swh.scheduler.celery_backend.config.build_app") mock_build_app.return_value = MagicMock() assert len(threads) == 0 for visit_type in visit_types: spawn_visit_scheduler_thread(threads, exc_queue, scheduler_config, visit_type) # This actually only checks the spawning and terminating logic is sound assert len(threads) == len(visit_types) actual_threads = terminate_visit_scheduler_threads(threads) assert not len(actual_threads) assert mock_build_app.called