diff --git a/swh/scheduler/celery_backend/recurrent_visits.py b/swh/scheduler/celery_backend/recurrent_visits.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/celery_backend/recurrent_visits.py @@ -0,0 +1,298 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""This schedules the recurrent visits, for listed origins, in Celery. + +For "oneshot" (save code now, lister) tasks, check the +:mod:`swh.scheduler.celery_backend.runner` and +:mod:`swh.scheduler.celery_backend.pika_listener` modules. + +""" + +from __future__ import annotations + +import logging +from queue import Empty, Queue +import random +from threading import Thread +import time +from typing import TYPE_CHECKING, Any, Dict, List, Tuple + +from kombu.utils.uuid import uuid + +from swh.scheduler.celery_backend.config import get_available_slots + +if TYPE_CHECKING: + from ..interface import SchedulerInterface + from ..model import ListedOrigin + +logger = logging.getLogger(__name__) + + +_VCS_POLICY_RATIOS = { + "already_visited_order_by_lag": 0.49, + "never_visited_oldest_update_first": 0.49, + "origins_without_last_update": 0.02, +} + +# Default policy ratio, let's start that configuration in the module first +POLICY_RATIO: Dict[str, Dict[str, float]] = { + "default": { + "already_visited_order_by_lag": 0.5, + "never_visited_oldest_update_first": 0.5, + }, + "git": _VCS_POLICY_RATIOS, + "hg": _VCS_POLICY_RATIOS, + "svn": _VCS_POLICY_RATIOS, + "cvs": _VCS_POLICY_RATIOS, + "bzr": _VCS_POLICY_RATIOS, +} + + +def grab_next_visits_policy_ratio( + scheduler: SchedulerInterface, visit_type: str, num_visits: int +) -> List[ListedOrigin]: + """Get the next `num_visits` for the given `visit_type` using the corresponding + set of scheduling policies. + + The `POLICY_RATIO` dict sets, for each visit type, the scheduling policies + used to pull the next tasks, and what proportion of the available num_visits + they take. + + This function emits a warning if the ratio of retrieved origins is off of + the requested ratio by more than 5%. + + Returns: + at most `num_visits` `ListedOrigin` objects + """ + policy_ratio = POLICY_RATIO.get(visit_type, POLICY_RATIO["default"]) + + fetched_origins: Dict[str, List[ListedOrigin]] = {} + + for policy, ratio in policy_ratio.items(): + num_tasks_to_send = int(num_visits * ratio) + fetched_origins[policy] = scheduler.grab_next_visits( + visit_type, num_tasks_to_send, policy=policy + ) + + all_origins: List[ListedOrigin] = sum(fetched_origins.values(), []) + if not all_origins: + return [] + + # Check whether the ratios of origins fetched are skewed with respect to the + # ones we requested + fetched_origin_ratios = { + policy: len(origins) / len(all_origins) + for policy, origins in fetched_origins.items() + } + + for policy, expected_ratio in policy_ratio.items(): + # 5% of skew with respect to request + if abs(fetched_origin_ratios[policy] - expected_ratio) / expected_ratio > 0.05: + logger.info( + "Skewed fetch for visit type %s with policy %s: fetched %s, " + "requested %s", + visit_type, + policy, + fetched_origin_ratios[policy], + expected_ratio, + ) + + return all_origins + + +MIN_SLOTS_RATIO = 0.05 +"""Quantity of slots that need to be available (with respect to max_queue_length) for +`grab_next_visits` to trigger""" + +QUEUE_FULL_BACKOFF = 60 +"""Backoff time (in seconds) if there's fewer than `MIN_SLOTS_RATIO` slots available in +the queue.""" + +NO_ORIGINS_SCHEDULED_BACKOFF = 20 * 60 +"""Backoff time (in seconds) if no origins have been scheduled in the current +iteration""" + +BACKOFF_SPLAY = 5.0 +"""Amplitude of the fuzziness between backoffs""" + +TERMINATE = object() +"""Termination request received from command queue""" + + +def splay(): + """Return a random short interval by which to vary the backoffs for the visit + scheduling threads""" + return random.uniform(0, BACKOFF_SPLAY) + + +def send_visits_for_visit_type( + scheduler: SchedulerInterface, app, visit_type: str, task_type: Dict, +) -> float: + """Schedule the next batch of visits for the given `visit_type`. + + First, we determine the number of available slots by introspecting the + RabbitMQ queue. + + If there's fewer than `MIN_SLOTS_RATIO` slots available in the queue, we wait + for `QUEUE_FULL_BACKOFF` seconds. This avoids running the expensive + `grab_next_visits` queries when there's not many jobs to queue. + + Once there's more than `MIN_SLOTS_RATIO` slots available, we run + `get_next_visits` to retrieve the next set of origin visits to schedule, and + we send them to celery. + + If the last scheduling attempt didn't return any origins, we sleep for + `NO_ORIGINS_SCHEDULED_BACKOFF` seconds. This avoids running the expensive + `grab_next_visits` queries too often if there's nothing left to schedule. + + Returns: + the earliest `time.monotonic` value at which to run the next iteration of + the loop. + + """ + queue_name = task_type["backend_name"] + max_queue_length = task_type.get("max_queue_length") or 0 + min_available_slots = max_queue_length * MIN_SLOTS_RATIO + + current_iteration_start = time.monotonic() + + # Check queue level + available_slots = get_available_slots(app, queue_name, max_queue_length) + logger.debug( + "%s available slots for visit type %s in queue %s", + available_slots, + visit_type, + queue_name, + ) + if available_slots < min_available_slots: + return current_iteration_start + QUEUE_FULL_BACKOFF + + origins = grab_next_visits_policy_ratio(scheduler, visit_type, available_slots) + + if not origins: + logger.debug("No origins to visit for type %s", visit_type) + return current_iteration_start + NO_ORIGINS_SCHEDULED_BACKOFF + + # Try to smooth the ingestion load, origins pulled by different + # scheduling policies have different resource usage patterns + random.shuffle(origins) + + for origin in origins: + task_dict = origin.as_task_dict() + app.send_task( + queue_name, + task_id=uuid(), + args=task_dict["arguments"]["args"], + kwargs=task_dict["arguments"]["kwargs"], + queue=queue_name, + ) + + logger.info( + "%s: %s visits scheduled in queue %s", visit_type, len(origins), queue_name, + ) + + # When everything worked, we can try to schedule origins again ASAP. + return time.monotonic() + + +def visit_scheduler_thread( + config: Dict, + visit_type: str, + command_queue: Queue[object], + exc_queue: Queue[Tuple[str, BaseException]], +): + """Target function for the visit sending thread, which initializes local + connections and handles exceptions by sending them back to the main thread.""" + + from swh.scheduler import get_scheduler + from swh.scheduler.celery_backend.config import build_app + + try: + # We need to reinitialize these connections because they're not generally + # thread-safe + app = build_app(config.get("celery")) + scheduler = get_scheduler(**config["scheduler"]) + task_type = scheduler.get_task_type(f"load-{visit_type}") + + if task_type is None: + raise ValueError(f"Unknown task type: load-{visit_type}") + + next_iteration = time.monotonic() + + while True: + # vary the next iteration time a little bit + next_iteration = next_iteration + splay() + while time.monotonic() < next_iteration: + # Wait for next iteration to start. Listen for termination message. + try: + msg = command_queue.get(block=True, timeout=1) + except Empty: + continue + + if msg is TERMINATE: + return + else: + logger.warn("Received unexpected message %s in command queue", msg) + + next_iteration = send_visits_for_visit_type( + scheduler, app, visit_type, task_type + ) + + except BaseException as e: + exc_queue.put((visit_type, e)) + + +VisitSchedulerThreads = Dict[str, Tuple[Thread, Queue]] +"""Dict storing the visit scheduler threads and their command queues""" + + +def spawn_visit_scheduler_thread( + threads: VisitSchedulerThreads, + exc_queue: Queue[Tuple[str, BaseException]], + config: Dict[str, Any], + visit_type: str, +): + """Spawn a new thread to schedule the visits of type `visit_type`.""" + command_queue: Queue[object] = Queue() + thread = Thread( + target=visit_scheduler_thread, + kwargs={ + "config": config, + "visit_type": visit_type, + "command_queue": command_queue, + "exc_queue": exc_queue, + }, + ) + threads[visit_type] = (thread, command_queue) + thread.start() + + +def terminate_visit_scheduler_threads(threads: VisitSchedulerThreads) -> List[str]: + """Terminate all visit scheduler threads""" + logger.info("Termination requested...") + for _, command_queue in threads.values(): + command_queue.put(TERMINATE) + + loops = 0 + while threads and loops < 10: + logger.info( + "Terminating visit scheduling threads: %s", ", ".join(sorted(threads)) + ) + loops += 1 + for visit_type in list(threads): + thread, _ = threads[visit_type] + thread.join(timeout=1) + if not thread.is_alive(): + logger.debug("Thread %s terminated", visit_type) + del threads[visit_type] + + if threads: + logger.warn( + "Could not reap the following threads after 10 attempts: %s", + ", ".join(sorted(threads)), + ) + + return list(sorted(threads)) diff --git a/swh/scheduler/cli/admin.py b/swh/scheduler/cli/admin.py --- a/swh/scheduler/cli/admin.py +++ b/swh/scheduler/cli/admin.py @@ -3,10 +3,13 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from __future__ import annotations + # WARNING: do not import unnecessary things here to keep cli startup time under # control import logging import time +from typing import List, Tuple import click @@ -57,7 +60,7 @@ logger = logging.getLogger(__name__ + ".runner") scheduler = ctx.obj["scheduler"] - logger.debug("Scheduler %s" % scheduler) + logger.debug("Scheduler %s", scheduler) task_types = [] for task_type_name in task_type_names: task_type = scheduler.get_task_type(task_type_name) @@ -107,6 +110,101 @@ listener.stop_consuming() +@cli.command("schedule-recurrent") +@click.option( + "--period", + "-p", + default=0, + help=( + "Period (in s) at witch pending tasks are checked and " + "executed. Set to 0 (default) for a one shot." + ), +) +@click.option( + "--visit-type", + "visit_types", + multiple=True, + default=[], + help=( + "Visit types to schedule. If not provided, this iterates over every " + "corresponding load task types referenced in the scheduler backend." + ), +) +@click.pass_context +def schedule_recurrent(ctx, period: int, visit_types: List[str]): + """Starts the scheduler for recurrent visits. + + This runs one thread for each visit type, which regularly sends new visits + to celery. + + """ + from queue import Queue + + from swh.scheduler.celery_backend.recurrent_visits import ( + VisitSchedulerThreads, + logger, + spawn_visit_scheduler_thread, + terminate_visit_scheduler_threads, + ) + + config = ctx.obj["config"] + scheduler = ctx.obj["scheduler"] + + if not visit_types: + visit_types = [] + # Figure out which visit types exist in the scheduler + all_task_types = scheduler.get_task_types() + for task_type in all_task_types: + if not task_type["type"].startswith("load-"): + # only consider loading tasks as recurring ones, the rest is dismissed + continue + # get visit type name from task type + visit_types.append(task_type["type"][5:]) + else: + # Check that the passed visit types exist in the scheduler + for visit_type in visit_types: + task_type_name = f"load-{visit_type}" + task_type = scheduler.get_task_type(task_type_name) + if not task_type: + raise ValueError(f"Unknown task type: {task_type_name}") + + exc_queue: Queue[Tuple[str, BaseException]] = Queue() + threads: VisitSchedulerThreads = {} + + try: + # Spawn initial threads + for visit_type in visit_types: + spawn_visit_scheduler_thread(threads, exc_queue, config, visit_type) + + # Handle exceptions from child threads + while True: + visit_type, exc_info = exc_queue.get(block=True) + + logger.exception( + "Thread %s died with exception; respawning", + visit_type, + exc_info=exc_info, + ) + + dead_thread = threads[visit_type][0] + dead_thread.join(timeout=1) + + if dead_thread.is_alive(): + logger.warn( + "The thread for %s is still alive after sending an exception?! " + "Respawning anyway.", + visit_type, + ) + + spawn_visit_scheduler_thread(threads, exc_queue, config, visit_type) + + except SystemExit: + remaining_threads = terminate_visit_scheduler_threads(threads) + if remaining_threads: + ctx.exit(1) + ctx.exit(0) + + @cli.command("rpc-serve") @click.option("--host", default="0.0.0.0", help="Host to run the scheduler server api") @click.option("--port", default=5008, type=click.INT, help="Binding port of the server") diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py --- a/swh/scheduler/tests/conftest.py +++ b/swh/scheduler/tests/conftest.py @@ -1,4 +1,4 @@ -# Copyright (C) 2016-2020 The Software Heritage developers +# Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -6,6 +6,7 @@ from datetime import datetime, timezone import os from typing import Dict, List +from unittest.mock import patch import pytest @@ -60,3 +61,12 @@ def listed_origins(listed_origins_by_type) -> List[ListedOrigin]: """Return a (fixed) set of listed origins""" return sum(listed_origins_by_type.values(), []) + + +@pytest.fixture +def storage(swh_storage): + """An instance of in-memory storage that gets injected + into the CLI functions.""" + with patch("swh.storage.get_storage") as get_storage_mock: + get_storage_mock.return_value = swh_storage + yield swh_storage diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py --- a/swh/scheduler/tests/test_cli.py +++ b/swh/scheduler/tests/test_cli.py @@ -595,15 +595,6 @@ return origins -@pytest.fixture -def storage(swh_storage): - """An instance of in-memory storage that gets injected - into the CLI functions.""" - with patch("swh.storage.get_storage") as get_storage_mock: - get_storage_mock.return_value = swh_storage - yield swh_storage - - @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) def test_task_schedule_origins_dry_run(swh_scheduler, storage): """Tests the scheduling when origin_batch_size*task_batch_size is a diff --git a/swh/scheduler/tests/test_recurrent_visits.py b/swh/scheduler/tests/test_recurrent_visits.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/tests/test_recurrent_visits.py @@ -0,0 +1,178 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from datetime import timedelta +import logging +from queue import Queue +from unittest.mock import MagicMock + +import pytest + +from swh.scheduler.celery_backend.recurrent_visits import ( + VisitSchedulerThreads, + send_visits_for_visit_type, + spawn_visit_scheduler_thread, + terminate_visit_scheduler_threads, + visit_scheduler_thread, +) + +from .test_cli import invoke + +TEST_MAX_QUEUE = 10000 +MODULE_NAME = "swh.scheduler.celery_backend.recurrent_visits" + + +def _compute_backend_name(visit_type: str) -> str: + "Build a dummy reproducible backend name" + return f"swh.loader.{visit_type}.tasks" + + +@pytest.fixture +def swh_scheduler(swh_scheduler): + """Override default fixture of the scheduler to install some more task types.""" + for visit_type in ["git", "hg", "svn"]: + task_type = f"load-{visit_type}" + swh_scheduler.create_task_type( + { + "type": task_type, + "max_queue_length": TEST_MAX_QUEUE, + "description": "The {} testing task".format(task_type), + "backend_name": _compute_backend_name(visit_type), + "default_interval": timedelta(days=1), + "min_interval": timedelta(hours=6), + "max_interval": timedelta(days=12), + } + ) + return swh_scheduler + + +def test_cli_schedule_recurrent_unknown_visit_type(swh_scheduler): + """When passed an unknown visit type, the recurrent visit scheduler should refuse + to start.""" + + with pytest.raises(ValueError, match="Unknown"): + invoke( + swh_scheduler, + False, + ["schedule-recurrent", "--visit-type", "unknown", "--visit-type", "git"], + ) + + +def test_cli_schedule_recurrent_noop(swh_scheduler, mocker): + """When passing no visit types, the recurrent visit scheduler should start.""" + + spawn_visit_scheduler_thread = mocker.patch( + f"{MODULE_NAME}.spawn_visit_scheduler_thread" + ) + spawn_visit_scheduler_thread.side_effect = SystemExit + + # The orchestrator will just iterate over existing tasks from the scheduler and do + # noop. We are just checking it does not explode here. + result = invoke(swh_scheduler, False, ["schedule-recurrent"]) + + assert result.exit_code == 0, result.output + + +def test_recurrent_visit_scheduling( + swh_scheduler, caplog, listed_origins_by_type, mocker, +): + """Scheduling known tasks is ok.""" + + caplog.set_level(logging.DEBUG, MODULE_NAME) + nb_origins = 1000 + + mock_celery_app = MagicMock() + + mock_available_slots = mocker.patch(f"{MODULE_NAME}.get_available_slots") + mock_available_slots.return_value = nb_origins # Slots available in queue + + # Make sure the scheduler is properly configured in terms of visit/task types + all_task_types = { + task_type_d["type"]: task_type_d + for task_type_d in swh_scheduler.get_task_types() + } + + visit_types = list(listed_origins_by_type.keys()) + assert len(visit_types) > 0 + + task_types = [] + origins = [] + for visit_type, _origins in listed_origins_by_type.items(): + origins.extend(swh_scheduler.record_listed_origins(_origins)) + task_type_name = f"load-{visit_type}" + assert task_type_name in all_task_types.keys() + task_type = all_task_types[task_type_name] + task_type["visit_type"] = visit_type + # we'll limit the orchestrator to the origins' type we know + task_types.append(task_type) + + for visit_type in ["git", "svn"]: + task_type = f"load-{visit_type}" + send_visits_for_visit_type( + swh_scheduler, mock_celery_app, visit_type, all_task_types[task_type] + ) + + assert mock_available_slots.called, "The available slots functions should be called" + + records = [record.message for record in caplog.records] + + # Mapping over the dict ratio/policies entries can change overall order so let's + # check the set of records + expected_records = set() + for task_type in task_types: + visit_type = task_type["visit_type"] + queue_name = task_type["backend_name"] + msg = ( + f"{nb_origins} available slots for visit type {visit_type} " + f"in queue {queue_name}" + ) + expected_records.add(msg) + + for expected_record in expected_records: + assert expected_record in set(records) + + +@pytest.fixture +def scheduler_config(swh_scheduler_config): + return {"scheduler": {"cls": "local", **swh_scheduler_config}, "celery": {}} + + +def test_visit_scheduler_thread_unknown_task( + swh_scheduler, scheduler_config, caplog, listed_origins_by_type, mocker, +): + """Starting a thread with unknown task type reports the error""" + + unknown_visit_type = "unknown" + command_queue = Queue() + exc_queue = Queue() + + visit_scheduler_thread( + scheduler_config, unknown_visit_type, command_queue, exc_queue + ) + + assert command_queue.empty() is True + assert exc_queue.empty() is False + assert len(exc_queue.queue) == 1 + result = exc_queue.queue.pop() + assert result[0] == unknown_visit_type + assert isinstance(result[1], ValueError) + + +def test_spawn_visit_scheduler_thread(scheduler_config, visit_types): + """Spawning and terminating threads runs smoothly""" + + threads: VisitSchedulerThreads = {} + exc_queue = Queue() + + assert len(threads) == 0 + + for visit_type in visit_types: + spawn_visit_scheduler_thread(threads, exc_queue, scheduler_config, visit_type) + + assert len(threads) == len(visit_types) + + actual_threads = terminate_visit_scheduler_threads(threads) + + assert not len(actual_threads)