Page MenuHomeSoftware Heritage

D6520.diff
No OneTemporary

D6520.diff

diff --git a/swh/scheduler/celery_backend/recurrent_visits.py b/swh/scheduler/celery_backend/recurrent_visits.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/celery_backend/recurrent_visits.py
@@ -0,0 +1,301 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""This schedules the recurrent visits, for listed origins, in Celery.
+
+For "oneshot" (save code now, lister) tasks, check the
+:mod:`swh.scheduler.celery_backend.runner` and
+:mod:`swh.scheduler.celery_backend.pika_listener` modules.
+
+"""
+
+from __future__ import annotations
+
+from itertools import chain
+import logging
+from queue import Empty, Queue
+import random
+from threading import Thread
+import time
+from typing import TYPE_CHECKING, Any, Dict, List, Tuple
+
+from kombu.utils.uuid import uuid
+
+from swh.scheduler.celery_backend.config import get_available_slots
+
+if TYPE_CHECKING:
+ from ..interface import SchedulerInterface
+ from ..model import ListedOrigin
+
+logger = logging.getLogger(__name__)
+
+
+_VCS_POLICY_RATIOS = {
+ "already_visited_order_by_lag": 0.49,
+ "never_visited_oldest_update_first": 0.49,
+ "origins_without_last_update": 0.02,
+}
+
+# Default policy ratio, let's start that configuration in the module first
+POLICY_RATIO: Dict[str, Dict[str, float]] = {
+ "default": {
+ "already_visited_order_by_lag": 0.5,
+ "never_visited_oldest_update_first": 0.5,
+ },
+ "git": _VCS_POLICY_RATIOS,
+ "hg": _VCS_POLICY_RATIOS,
+ "svn": _VCS_POLICY_RATIOS,
+ "cvs": _VCS_POLICY_RATIOS,
+ "bzr": _VCS_POLICY_RATIOS,
+}
+
+
+MIN_SLOTS_RATIO = 0.05
+"""Quantity of slots that need to be available (with respect to max_queue_length) for
+`grab_next_visits` to trigger"""
+
+QUEUE_FULL_BACKOFF = 60
+"""Backoff time (in seconds) if there's fewer than `MIN_SLOTS_RATIO` slots available in
+the queue."""
+
+NO_ORIGINS_SCHEDULED_BACKOFF = 20 * 60
+"""Backoff time (in seconds) if no origins have been scheduled in the current
+iteration"""
+
+BACKOFF_SPLAY = 5.0
+"""Amplitude of the fuzziness between backoffs"""
+
+TERMINATE = object()
+"""Termination request received from command queue (singleton used for identity
+comparison)"""
+
+
+def grab_next_visits_policy_ratio(
+ scheduler: SchedulerInterface, visit_type: str, num_visits: int
+) -> List[ListedOrigin]:
+ """Get the next `num_visits` for the given `visit_type` using the corresponding
+ set of scheduling policies.
+
+ The `POLICY_RATIO` dict sets, for each visit type, the scheduling policies
+ used to pull the next tasks, and what proportion of the available num_visits
+ they take.
+
+ This function emits a warning if the ratio of retrieved origins is off of
+ the requested ratio by more than 5%.
+
+ Returns:
+ at most `num_visits` `ListedOrigin` objects
+ """
+ policy_ratio = POLICY_RATIO.get(visit_type, POLICY_RATIO["default"])
+
+ fetched_origins: Dict[str, List[ListedOrigin]] = {}
+
+ for policy, ratio in policy_ratio.items():
+ num_tasks_to_send = int(num_visits * ratio)
+ fetched_origins[policy] = scheduler.grab_next_visits(
+ visit_type, num_tasks_to_send, policy=policy
+ )
+
+ all_origins: List[ListedOrigin] = list(
+ chain.from_iterable(fetched_origins.values())
+ )
+ if not all_origins:
+ return []
+
+ # Check whether the ratios of origins fetched are skewed with respect to the
+ # ones we requested
+ fetched_origin_ratios = {
+ policy: len(origins) / len(all_origins)
+ for policy, origins in fetched_origins.items()
+ }
+
+ for policy, expected_ratio in policy_ratio.items():
+ # 5% of skew with respect to request
+ if abs(fetched_origin_ratios[policy] - expected_ratio) / expected_ratio > 0.05:
+ logger.info(
+ "Skewed fetch for visit type %s with policy %s: fetched %s, "
+ "requested %s",
+ visit_type,
+ policy,
+ fetched_origin_ratios[policy],
+ expected_ratio,
+ )
+
+ return all_origins
+
+
+def splay():
+ """Return a random short interval by which to vary the backoffs for the visit
+ scheduling threads"""
+ return random.uniform(0, BACKOFF_SPLAY)
+
+
+def send_visits_for_visit_type(
+ scheduler: SchedulerInterface, app, visit_type: str, task_type: Dict,
+) -> float:
+ """Schedule the next batch of visits for the given `visit_type`.
+
+ First, we determine the number of available slots by introspecting the
+ RabbitMQ queue.
+
+ If there's fewer than `MIN_SLOTS_RATIO` slots available in the queue, we wait
+ for `QUEUE_FULL_BACKOFF` seconds. This avoids running the expensive
+ `grab_next_visits` queries when there's not many jobs to queue.
+
+ Once there's more than `MIN_SLOTS_RATIO` slots available, we run
+ `get_next_visits` to retrieve the next set of origin visits to schedule, and
+ we send them to celery.
+
+ If the last scheduling attempt didn't return any origins, we sleep for
+ `NO_ORIGINS_SCHEDULED_BACKOFF` seconds. This avoids running the expensive
+ `grab_next_visits` queries too often if there's nothing left to schedule.
+
+ Returns:
+ the earliest `time.monotonic` value at which to run the next iteration of
+ the loop.
+
+ """
+ queue_name = task_type["backend_name"]
+ max_queue_length = task_type.get("max_queue_length") or 0
+ min_available_slots = max_queue_length * MIN_SLOTS_RATIO
+
+ current_iteration_start = time.monotonic()
+
+ # Check queue level
+ available_slots = get_available_slots(app, queue_name, max_queue_length)
+ logger.debug(
+ "%s available slots for visit type %s in queue %s",
+ available_slots,
+ visit_type,
+ queue_name,
+ )
+ if available_slots < min_available_slots:
+ return current_iteration_start + QUEUE_FULL_BACKOFF
+
+ origins = grab_next_visits_policy_ratio(scheduler, visit_type, available_slots)
+
+ if not origins:
+ logger.debug("No origins to visit for type %s", visit_type)
+ return current_iteration_start + NO_ORIGINS_SCHEDULED_BACKOFF
+
+ # Try to smooth the ingestion load, origins pulled by different
+ # scheduling policies have different resource usage patterns
+ random.shuffle(origins)
+
+ for origin in origins:
+ task_dict = origin.as_task_dict()
+ app.send_task(
+ queue_name,
+ task_id=uuid(),
+ args=task_dict["arguments"]["args"],
+ kwargs=task_dict["arguments"]["kwargs"],
+ queue=queue_name,
+ )
+
+ logger.info(
+ "%s: %s visits scheduled in queue %s", visit_type, len(origins), queue_name,
+ )
+
+ # When everything worked, we can try to schedule origins again ASAP.
+ return time.monotonic()
+
+
+def visit_scheduler_thread(
+ config: Dict,
+ visit_type: str,
+ command_queue: Queue[object],
+ exc_queue: Queue[Tuple[str, BaseException]],
+):
+ """Target function for the visit sending thread, which initializes local
+ connections and handles exceptions by sending them back to the main thread."""
+
+ from swh.scheduler import get_scheduler
+ from swh.scheduler.celery_backend.config import build_app
+
+ try:
+ # We need to reinitialize these connections because they're not generally
+ # thread-safe
+ app = build_app(config.get("celery"))
+ scheduler = get_scheduler(**config["scheduler"])
+ task_type = scheduler.get_task_type(f"load-{visit_type}")
+
+ if task_type is None:
+ raise ValueError(f"Unknown task type: load-{visit_type}")
+
+ next_iteration = time.monotonic()
+
+ while True:
+ # vary the next iteration time a little bit
+ next_iteration = next_iteration + splay()
+ while time.monotonic() < next_iteration:
+ # Wait for next iteration to start. Listen for termination message.
+ try:
+ msg = command_queue.get(block=True, timeout=1)
+ except Empty:
+ continue
+
+ if msg is TERMINATE:
+ return
+ else:
+ logger.warn("Received unexpected message %s in command queue", msg)
+
+ next_iteration = send_visits_for_visit_type(
+ scheduler, app, visit_type, task_type
+ )
+
+ except BaseException as e:
+ exc_queue.put((visit_type, e))
+
+
+VisitSchedulerThreads = Dict[str, Tuple[Thread, Queue]]
+"""Dict storing the visit scheduler threads and their command queues"""
+
+
+def spawn_visit_scheduler_thread(
+ threads: VisitSchedulerThreads,
+ exc_queue: Queue[Tuple[str, BaseException]],
+ config: Dict[str, Any],
+ visit_type: str,
+):
+ """Spawn a new thread to schedule the visits of type `visit_type`."""
+ command_queue: Queue[object] = Queue()
+ thread = Thread(
+ target=visit_scheduler_thread,
+ kwargs={
+ "config": config,
+ "visit_type": visit_type,
+ "command_queue": command_queue,
+ "exc_queue": exc_queue,
+ },
+ )
+ threads[visit_type] = (thread, command_queue)
+ thread.start()
+
+
+def terminate_visit_scheduler_threads(threads: VisitSchedulerThreads) -> List[str]:
+ """Terminate all visit scheduler threads"""
+ logger.info("Termination requested...")
+ for _, command_queue in threads.values():
+ command_queue.put(TERMINATE)
+
+ loops = 0
+ while threads and loops < 10:
+ logger.info(
+ "Terminating visit scheduling threads: %s", ", ".join(sorted(threads))
+ )
+ loops += 1
+ for visit_type, (thread, _) in list(threads.items()):
+ thread.join(timeout=1)
+ if not thread.is_alive():
+ logger.debug("Thread %s terminated", visit_type)
+ del threads[visit_type]
+
+ if threads:
+ logger.warn(
+ "Could not reap the following threads after 10 attempts: %s",
+ ", ".join(sorted(threads)),
+ )
+
+ return list(sorted(threads))
diff --git a/swh/scheduler/cli/admin.py b/swh/scheduler/cli/admin.py
--- a/swh/scheduler/cli/admin.py
+++ b/swh/scheduler/cli/admin.py
@@ -3,10 +3,13 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from __future__ import annotations
+
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import logging
import time
+from typing import List, Tuple
import click
@@ -57,7 +60,7 @@
logger = logging.getLogger(__name__ + ".runner")
scheduler = ctx.obj["scheduler"]
- logger.debug("Scheduler %s" % scheduler)
+ logger.debug("Scheduler %s", scheduler)
task_types = []
for task_type_name in task_type_names:
task_type = scheduler.get_task_type(task_type_name)
@@ -107,6 +110,92 @@
listener.stop_consuming()
+@cli.command("schedule-recurrent")
+@click.option(
+ "--visit-type",
+ "visit_types",
+ multiple=True,
+ default=[],
+ help=(
+ "Visit types to schedule. If not provided, this iterates over every "
+ "corresponding load task types referenced in the scheduler backend."
+ ),
+)
+@click.pass_context
+def schedule_recurrent(ctx, visit_types: List[str]):
+ """Starts the scheduler for recurrent visits.
+
+ This runs one thread for each visit type, which regularly sends new visits
+ to celery.
+
+ """
+ from queue import Queue
+
+ from swh.scheduler.celery_backend.recurrent_visits import (
+ VisitSchedulerThreads,
+ logger,
+ spawn_visit_scheduler_thread,
+ terminate_visit_scheduler_threads,
+ )
+
+ config = ctx.obj["config"]
+ scheduler = ctx.obj["scheduler"]
+
+ if not visit_types:
+ visit_types = []
+ # Figure out which visit types exist in the scheduler
+ all_task_types = scheduler.get_task_types()
+ for task_type in all_task_types:
+ if not task_type["type"].startswith("load-"):
+ # only consider loading tasks as recurring ones, the rest is dismissed
+ continue
+ # get visit type name from task type
+ visit_types.append(task_type["type"][5:])
+ else:
+ # Check that the passed visit types exist in the scheduler
+ for visit_type in visit_types:
+ task_type_name = f"load-{visit_type}"
+ task_type = scheduler.get_task_type(task_type_name)
+ if not task_type:
+ raise ValueError(f"Unknown task type: {task_type_name}")
+
+ exc_queue: Queue[Tuple[str, BaseException]] = Queue()
+ threads: VisitSchedulerThreads = {}
+
+ try:
+ # Spawn initial threads
+ for visit_type in visit_types:
+ spawn_visit_scheduler_thread(threads, exc_queue, config, visit_type)
+
+ # Handle exceptions from child threads
+ while True:
+ visit_type, exc_info = exc_queue.get(block=True)
+
+ logger.exception(
+ "Thread %s died with exception; respawning",
+ visit_type,
+ exc_info=exc_info,
+ )
+
+ dead_thread = threads[visit_type][0]
+ dead_thread.join(timeout=1)
+
+ if dead_thread.is_alive():
+ logger.warn(
+ "The thread for %s is still alive after sending an exception?! "
+ "Respawning anyway.",
+ visit_type,
+ )
+
+ spawn_visit_scheduler_thread(threads, exc_queue, config, visit_type)
+
+ except SystemExit:
+ remaining_threads = terminate_visit_scheduler_threads(threads)
+ if remaining_threads:
+ ctx.exit(1)
+ ctx.exit(0)
+
+
@cli.command("rpc-serve")
@click.option("--host", default="0.0.0.0", help="Host to run the scheduler server api")
@click.option("--port", default=5008, type=click.INT, help="Binding port of the server")
diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py
--- a/swh/scheduler/tests/conftest.py
+++ b/swh/scheduler/tests/conftest.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2016-2020 The Software Heritage developers
+# Copyright (C) 2016-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -6,6 +6,7 @@
from datetime import datetime, timezone
import os
from typing import Dict, List
+from unittest.mock import patch
import pytest
@@ -60,3 +61,12 @@
def listed_origins(listed_origins_by_type) -> List[ListedOrigin]:
"""Return a (fixed) set of listed origins"""
return sum(listed_origins_by_type.values(), [])
+
+
+@pytest.fixture
+def storage(swh_storage):
+ """An instance of in-memory storage that gets injected
+ into the CLI functions."""
+ with patch("swh.storage.get_storage") as get_storage_mock:
+ get_storage_mock.return_value = swh_storage
+ yield swh_storage
diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py
--- a/swh/scheduler/tests/test_cli.py
+++ b/swh/scheduler/tests/test_cli.py
@@ -595,15 +595,6 @@
return origins
-@pytest.fixture
-def storage(swh_storage):
- """An instance of in-memory storage that gets injected
- into the CLI functions."""
- with patch("swh.storage.get_storage") as get_storage_mock:
- get_storage_mock.return_value = swh_storage
- yield swh_storage
-
-
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
def test_task_schedule_origins_dry_run(swh_scheduler, storage):
"""Tests the scheduling when origin_batch_size*task_batch_size is a
diff --git a/swh/scheduler/tests/test_recurrent_visits.py b/swh/scheduler/tests/test_recurrent_visits.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/tests/test_recurrent_visits.py
@@ -0,0 +1,180 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from datetime import timedelta
+import logging
+from queue import Queue
+from unittest.mock import MagicMock
+
+import pytest
+
+from swh.scheduler.celery_backend.recurrent_visits import (
+ VisitSchedulerThreads,
+ send_visits_for_visit_type,
+ spawn_visit_scheduler_thread,
+ terminate_visit_scheduler_threads,
+ visit_scheduler_thread,
+)
+
+from .test_cli import invoke
+
+TEST_MAX_QUEUE = 10000
+MODULE_NAME = "swh.scheduler.celery_backend.recurrent_visits"
+
+
+def _compute_backend_name(visit_type: str) -> str:
+ "Build a dummy reproducible backend name"
+ return f"swh.loader.{visit_type}.tasks"
+
+
+@pytest.fixture
+def swh_scheduler(swh_scheduler):
+ """Override default fixture of the scheduler to install some more task types."""
+ for visit_type in ["git", "hg", "svn"]:
+ task_type = f"load-{visit_type}"
+ swh_scheduler.create_task_type(
+ {
+ "type": task_type,
+ "max_queue_length": TEST_MAX_QUEUE,
+ "description": "The {} testing task".format(task_type),
+ "backend_name": _compute_backend_name(visit_type),
+ "default_interval": timedelta(days=1),
+ "min_interval": timedelta(hours=6),
+ "max_interval": timedelta(days=12),
+ }
+ )
+ return swh_scheduler
+
+
+def test_cli_schedule_recurrent_unknown_visit_type(swh_scheduler):
+ """When passed an unknown visit type, the recurrent visit scheduler should refuse
+ to start."""
+
+ with pytest.raises(ValueError, match="Unknown"):
+ invoke(
+ swh_scheduler,
+ False,
+ ["schedule-recurrent", "--visit-type", "unknown", "--visit-type", "git"],
+ )
+
+
+def test_cli_schedule_recurrent_noop(swh_scheduler, mocker):
+ """When passing no visit types, the recurrent visit scheduler should start."""
+
+ spawn_visit_scheduler_thread = mocker.patch(
+ f"{MODULE_NAME}.spawn_visit_scheduler_thread"
+ )
+ spawn_visit_scheduler_thread.side_effect = SystemExit
+ # The actual scheduling threads won't spawn, they'll immediately terminate. This
+ # only exercises the logic to pull task types out of the database
+
+ result = invoke(swh_scheduler, False, ["schedule-recurrent"])
+ assert result.exit_code == 0, result.output
+
+
+def test_recurrent_visit_scheduling(
+ swh_scheduler, caplog, listed_origins_by_type, mocker,
+):
+ """Scheduling known tasks is ok."""
+
+ caplog.set_level(logging.DEBUG, MODULE_NAME)
+ nb_origins = 1000
+
+ mock_celery_app = MagicMock()
+ mock_available_slots = mocker.patch(f"{MODULE_NAME}.get_available_slots")
+ mock_available_slots.return_value = nb_origins # Slots available in queue
+
+ # Make sure the scheduler is properly configured in terms of visit/task types
+ all_task_types = {
+ task_type_d["type"]: task_type_d
+ for task_type_d in swh_scheduler.get_task_types()
+ }
+
+ visit_types = list(listed_origins_by_type.keys())
+ assert len(visit_types) > 0
+
+ task_types = []
+ origins = []
+ for visit_type, _origins in listed_origins_by_type.items():
+ origins.extend(swh_scheduler.record_listed_origins(_origins))
+ task_type_name = f"load-{visit_type}"
+ assert task_type_name in all_task_types.keys()
+ task_type = all_task_types[task_type_name]
+ task_type["visit_type"] = visit_type
+ # we'll limit the orchestrator to the origins' type we know
+ task_types.append(task_type)
+
+ for visit_type in ["git", "svn"]:
+ task_type = f"load-{visit_type}"
+ send_visits_for_visit_type(
+ swh_scheduler, mock_celery_app, visit_type, all_task_types[task_type]
+ )
+
+ assert mock_available_slots.called, "The available slots functions should be called"
+
+ records = [record.message for record in caplog.records]
+
+ # Mapping over the dict ratio/policies entries can change overall order so let's
+ # check the set of records
+ expected_records = set()
+ for task_type in task_types:
+ visit_type = task_type["visit_type"]
+ queue_name = task_type["backend_name"]
+ msg = (
+ f"{nb_origins} available slots for visit type {visit_type} "
+ f"in queue {queue_name}"
+ )
+ expected_records.add(msg)
+
+ for expected_record in expected_records:
+ assert expected_record in set(records)
+
+
+@pytest.fixture
+def scheduler_config(swh_scheduler_config):
+ return {"scheduler": {"cls": "local", **swh_scheduler_config}, "celery": {}}
+
+
+def test_visit_scheduler_thread_unknown_task(
+ swh_scheduler, scheduler_config,
+):
+ """Starting a thread with unknown task type reports the error"""
+
+ unknown_visit_type = "unknown"
+ command_queue = Queue()
+ exc_queue = Queue()
+
+ visit_scheduler_thread(
+ scheduler_config, unknown_visit_type, command_queue, exc_queue
+ )
+
+ assert command_queue.empty() is True
+ assert exc_queue.empty() is False
+ assert len(exc_queue.queue) == 1
+ result = exc_queue.queue.pop()
+ assert result[0] == unknown_visit_type
+ assert isinstance(result[1], ValueError)
+
+
+def test_spawn_visit_scheduler_thread_noop(scheduler_config, visit_types, mocker):
+ """Spawning and terminating threads runs smoothly"""
+
+ threads: VisitSchedulerThreads = {}
+ exc_queue = Queue()
+ mock_build_app = mocker.patch("swh.scheduler.celery_backend.config.build_app")
+ mock_build_app.return_value = MagicMock()
+
+ assert len(threads) == 0
+ for visit_type in visit_types:
+ spawn_visit_scheduler_thread(threads, exc_queue, scheduler_config, visit_type)
+
+ # This actually only checks the spawning and terminating logic is sound
+
+ assert len(threads) == len(visit_types)
+
+ actual_threads = terminate_visit_scheduler_threads(threads)
+ assert not len(actual_threads)
+
+ assert mock_build_app.called

File Metadata

Mime Type
text/plain
Expires
Dec 21 2024, 4:49 PM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3228864

Event Timeline