Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7124664
D6520.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
22 KB
Subscribers
None
D6520.diff
View Options
diff --git a/swh/scheduler/celery_backend/recurrent_visits.py b/swh/scheduler/celery_backend/recurrent_visits.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/celery_backend/recurrent_visits.py
@@ -0,0 +1,301 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""This schedules the recurrent visits, for listed origins, in Celery.
+
+For "oneshot" (save code now, lister) tasks, check the
+:mod:`swh.scheduler.celery_backend.runner` and
+:mod:`swh.scheduler.celery_backend.pika_listener` modules.
+
+"""
+
+from __future__ import annotations
+
+from itertools import chain
+import logging
+from queue import Empty, Queue
+import random
+from threading import Thread
+import time
+from typing import TYPE_CHECKING, Any, Dict, List, Tuple
+
+from kombu.utils.uuid import uuid
+
+from swh.scheduler.celery_backend.config import get_available_slots
+
+if TYPE_CHECKING:
+ from ..interface import SchedulerInterface
+ from ..model import ListedOrigin
+
+logger = logging.getLogger(__name__)
+
+
+_VCS_POLICY_RATIOS = {
+ "already_visited_order_by_lag": 0.49,
+ "never_visited_oldest_update_first": 0.49,
+ "origins_without_last_update": 0.02,
+}
+
+# Default policy ratio, let's start that configuration in the module first
+POLICY_RATIO: Dict[str, Dict[str, float]] = {
+ "default": {
+ "already_visited_order_by_lag": 0.5,
+ "never_visited_oldest_update_first": 0.5,
+ },
+ "git": _VCS_POLICY_RATIOS,
+ "hg": _VCS_POLICY_RATIOS,
+ "svn": _VCS_POLICY_RATIOS,
+ "cvs": _VCS_POLICY_RATIOS,
+ "bzr": _VCS_POLICY_RATIOS,
+}
+
+
+MIN_SLOTS_RATIO = 0.05
+"""Quantity of slots that need to be available (with respect to max_queue_length) for
+`grab_next_visits` to trigger"""
+
+QUEUE_FULL_BACKOFF = 60
+"""Backoff time (in seconds) if there's fewer than `MIN_SLOTS_RATIO` slots available in
+the queue."""
+
+NO_ORIGINS_SCHEDULED_BACKOFF = 20 * 60
+"""Backoff time (in seconds) if no origins have been scheduled in the current
+iteration"""
+
+BACKOFF_SPLAY = 5.0
+"""Amplitude of the fuzziness between backoffs"""
+
+TERMINATE = object()
+"""Termination request received from command queue (singleton used for identity
+comparison)"""
+
+
+def grab_next_visits_policy_ratio(
+ scheduler: SchedulerInterface, visit_type: str, num_visits: int
+) -> List[ListedOrigin]:
+ """Get the next `num_visits` for the given `visit_type` using the corresponding
+ set of scheduling policies.
+
+ The `POLICY_RATIO` dict sets, for each visit type, the scheduling policies
+ used to pull the next tasks, and what proportion of the available num_visits
+ they take.
+
+ This function emits a warning if the ratio of retrieved origins is off of
+ the requested ratio by more than 5%.
+
+ Returns:
+ at most `num_visits` `ListedOrigin` objects
+ """
+ policy_ratio = POLICY_RATIO.get(visit_type, POLICY_RATIO["default"])
+
+ fetched_origins: Dict[str, List[ListedOrigin]] = {}
+
+ for policy, ratio in policy_ratio.items():
+ num_tasks_to_send = int(num_visits * ratio)
+ fetched_origins[policy] = scheduler.grab_next_visits(
+ visit_type, num_tasks_to_send, policy=policy
+ )
+
+ all_origins: List[ListedOrigin] = list(
+ chain.from_iterable(fetched_origins.values())
+ )
+ if not all_origins:
+ return []
+
+ # Check whether the ratios of origins fetched are skewed with respect to the
+ # ones we requested
+ fetched_origin_ratios = {
+ policy: len(origins) / len(all_origins)
+ for policy, origins in fetched_origins.items()
+ }
+
+ for policy, expected_ratio in policy_ratio.items():
+ # 5% of skew with respect to request
+ if abs(fetched_origin_ratios[policy] - expected_ratio) / expected_ratio > 0.05:
+ logger.info(
+ "Skewed fetch for visit type %s with policy %s: fetched %s, "
+ "requested %s",
+ visit_type,
+ policy,
+ fetched_origin_ratios[policy],
+ expected_ratio,
+ )
+
+ return all_origins
+
+
+def splay():
+ """Return a random short interval by which to vary the backoffs for the visit
+ scheduling threads"""
+ return random.uniform(0, BACKOFF_SPLAY)
+
+
+def send_visits_for_visit_type(
+ scheduler: SchedulerInterface, app, visit_type: str, task_type: Dict,
+) -> float:
+ """Schedule the next batch of visits for the given `visit_type`.
+
+ First, we determine the number of available slots by introspecting the
+ RabbitMQ queue.
+
+ If there's fewer than `MIN_SLOTS_RATIO` slots available in the queue, we wait
+ for `QUEUE_FULL_BACKOFF` seconds. This avoids running the expensive
+ `grab_next_visits` queries when there's not many jobs to queue.
+
+ Once there's more than `MIN_SLOTS_RATIO` slots available, we run
+ `get_next_visits` to retrieve the next set of origin visits to schedule, and
+ we send them to celery.
+
+ If the last scheduling attempt didn't return any origins, we sleep for
+ `NO_ORIGINS_SCHEDULED_BACKOFF` seconds. This avoids running the expensive
+ `grab_next_visits` queries too often if there's nothing left to schedule.
+
+ Returns:
+ the earliest `time.monotonic` value at which to run the next iteration of
+ the loop.
+
+ """
+ queue_name = task_type["backend_name"]
+ max_queue_length = task_type.get("max_queue_length") or 0
+ min_available_slots = max_queue_length * MIN_SLOTS_RATIO
+
+ current_iteration_start = time.monotonic()
+
+ # Check queue level
+ available_slots = get_available_slots(app, queue_name, max_queue_length)
+ logger.debug(
+ "%s available slots for visit type %s in queue %s",
+ available_slots,
+ visit_type,
+ queue_name,
+ )
+ if available_slots < min_available_slots:
+ return current_iteration_start + QUEUE_FULL_BACKOFF
+
+ origins = grab_next_visits_policy_ratio(scheduler, visit_type, available_slots)
+
+ if not origins:
+ logger.debug("No origins to visit for type %s", visit_type)
+ return current_iteration_start + NO_ORIGINS_SCHEDULED_BACKOFF
+
+ # Try to smooth the ingestion load, origins pulled by different
+ # scheduling policies have different resource usage patterns
+ random.shuffle(origins)
+
+ for origin in origins:
+ task_dict = origin.as_task_dict()
+ app.send_task(
+ queue_name,
+ task_id=uuid(),
+ args=task_dict["arguments"]["args"],
+ kwargs=task_dict["arguments"]["kwargs"],
+ queue=queue_name,
+ )
+
+ logger.info(
+ "%s: %s visits scheduled in queue %s", visit_type, len(origins), queue_name,
+ )
+
+ # When everything worked, we can try to schedule origins again ASAP.
+ return time.monotonic()
+
+
+def visit_scheduler_thread(
+ config: Dict,
+ visit_type: str,
+ command_queue: Queue[object],
+ exc_queue: Queue[Tuple[str, BaseException]],
+):
+ """Target function for the visit sending thread, which initializes local
+ connections and handles exceptions by sending them back to the main thread."""
+
+ from swh.scheduler import get_scheduler
+ from swh.scheduler.celery_backend.config import build_app
+
+ try:
+ # We need to reinitialize these connections because they're not generally
+ # thread-safe
+ app = build_app(config.get("celery"))
+ scheduler = get_scheduler(**config["scheduler"])
+ task_type = scheduler.get_task_type(f"load-{visit_type}")
+
+ if task_type is None:
+ raise ValueError(f"Unknown task type: load-{visit_type}")
+
+ next_iteration = time.monotonic()
+
+ while True:
+ # vary the next iteration time a little bit
+ next_iteration = next_iteration + splay()
+ while time.monotonic() < next_iteration:
+ # Wait for next iteration to start. Listen for termination message.
+ try:
+ msg = command_queue.get(block=True, timeout=1)
+ except Empty:
+ continue
+
+ if msg is TERMINATE:
+ return
+ else:
+ logger.warn("Received unexpected message %s in command queue", msg)
+
+ next_iteration = send_visits_for_visit_type(
+ scheduler, app, visit_type, task_type
+ )
+
+ except BaseException as e:
+ exc_queue.put((visit_type, e))
+
+
+VisitSchedulerThreads = Dict[str, Tuple[Thread, Queue]]
+"""Dict storing the visit scheduler threads and their command queues"""
+
+
+def spawn_visit_scheduler_thread(
+ threads: VisitSchedulerThreads,
+ exc_queue: Queue[Tuple[str, BaseException]],
+ config: Dict[str, Any],
+ visit_type: str,
+):
+ """Spawn a new thread to schedule the visits of type `visit_type`."""
+ command_queue: Queue[object] = Queue()
+ thread = Thread(
+ target=visit_scheduler_thread,
+ kwargs={
+ "config": config,
+ "visit_type": visit_type,
+ "command_queue": command_queue,
+ "exc_queue": exc_queue,
+ },
+ )
+ threads[visit_type] = (thread, command_queue)
+ thread.start()
+
+
+def terminate_visit_scheduler_threads(threads: VisitSchedulerThreads) -> List[str]:
+ """Terminate all visit scheduler threads"""
+ logger.info("Termination requested...")
+ for _, command_queue in threads.values():
+ command_queue.put(TERMINATE)
+
+ loops = 0
+ while threads and loops < 10:
+ logger.info(
+ "Terminating visit scheduling threads: %s", ", ".join(sorted(threads))
+ )
+ loops += 1
+ for visit_type, (thread, _) in list(threads.items()):
+ thread.join(timeout=1)
+ if not thread.is_alive():
+ logger.debug("Thread %s terminated", visit_type)
+ del threads[visit_type]
+
+ if threads:
+ logger.warn(
+ "Could not reap the following threads after 10 attempts: %s",
+ ", ".join(sorted(threads)),
+ )
+
+ return list(sorted(threads))
diff --git a/swh/scheduler/cli/admin.py b/swh/scheduler/cli/admin.py
--- a/swh/scheduler/cli/admin.py
+++ b/swh/scheduler/cli/admin.py
@@ -3,10 +3,13 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from __future__ import annotations
+
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import logging
import time
+from typing import List, Tuple
import click
@@ -57,7 +60,7 @@
logger = logging.getLogger(__name__ + ".runner")
scheduler = ctx.obj["scheduler"]
- logger.debug("Scheduler %s" % scheduler)
+ logger.debug("Scheduler %s", scheduler)
task_types = []
for task_type_name in task_type_names:
task_type = scheduler.get_task_type(task_type_name)
@@ -107,6 +110,92 @@
listener.stop_consuming()
+@cli.command("schedule-recurrent")
+@click.option(
+ "--visit-type",
+ "visit_types",
+ multiple=True,
+ default=[],
+ help=(
+ "Visit types to schedule. If not provided, this iterates over every "
+ "corresponding load task types referenced in the scheduler backend."
+ ),
+)
+@click.pass_context
+def schedule_recurrent(ctx, visit_types: List[str]):
+ """Starts the scheduler for recurrent visits.
+
+ This runs one thread for each visit type, which regularly sends new visits
+ to celery.
+
+ """
+ from queue import Queue
+
+ from swh.scheduler.celery_backend.recurrent_visits import (
+ VisitSchedulerThreads,
+ logger,
+ spawn_visit_scheduler_thread,
+ terminate_visit_scheduler_threads,
+ )
+
+ config = ctx.obj["config"]
+ scheduler = ctx.obj["scheduler"]
+
+ if not visit_types:
+ visit_types = []
+ # Figure out which visit types exist in the scheduler
+ all_task_types = scheduler.get_task_types()
+ for task_type in all_task_types:
+ if not task_type["type"].startswith("load-"):
+ # only consider loading tasks as recurring ones, the rest is dismissed
+ continue
+ # get visit type name from task type
+ visit_types.append(task_type["type"][5:])
+ else:
+ # Check that the passed visit types exist in the scheduler
+ for visit_type in visit_types:
+ task_type_name = f"load-{visit_type}"
+ task_type = scheduler.get_task_type(task_type_name)
+ if not task_type:
+ raise ValueError(f"Unknown task type: {task_type_name}")
+
+ exc_queue: Queue[Tuple[str, BaseException]] = Queue()
+ threads: VisitSchedulerThreads = {}
+
+ try:
+ # Spawn initial threads
+ for visit_type in visit_types:
+ spawn_visit_scheduler_thread(threads, exc_queue, config, visit_type)
+
+ # Handle exceptions from child threads
+ while True:
+ visit_type, exc_info = exc_queue.get(block=True)
+
+ logger.exception(
+ "Thread %s died with exception; respawning",
+ visit_type,
+ exc_info=exc_info,
+ )
+
+ dead_thread = threads[visit_type][0]
+ dead_thread.join(timeout=1)
+
+ if dead_thread.is_alive():
+ logger.warn(
+ "The thread for %s is still alive after sending an exception?! "
+ "Respawning anyway.",
+ visit_type,
+ )
+
+ spawn_visit_scheduler_thread(threads, exc_queue, config, visit_type)
+
+ except SystemExit:
+ remaining_threads = terminate_visit_scheduler_threads(threads)
+ if remaining_threads:
+ ctx.exit(1)
+ ctx.exit(0)
+
+
@cli.command("rpc-serve")
@click.option("--host", default="0.0.0.0", help="Host to run the scheduler server api")
@click.option("--port", default=5008, type=click.INT, help="Binding port of the server")
diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py
--- a/swh/scheduler/tests/conftest.py
+++ b/swh/scheduler/tests/conftest.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2016-2020 The Software Heritage developers
+# Copyright (C) 2016-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -6,6 +6,7 @@
from datetime import datetime, timezone
import os
from typing import Dict, List
+from unittest.mock import patch
import pytest
@@ -60,3 +61,12 @@
def listed_origins(listed_origins_by_type) -> List[ListedOrigin]:
"""Return a (fixed) set of listed origins"""
return sum(listed_origins_by_type.values(), [])
+
+
+@pytest.fixture
+def storage(swh_storage):
+ """An instance of in-memory storage that gets injected
+ into the CLI functions."""
+ with patch("swh.storage.get_storage") as get_storage_mock:
+ get_storage_mock.return_value = swh_storage
+ yield swh_storage
diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py
--- a/swh/scheduler/tests/test_cli.py
+++ b/swh/scheduler/tests/test_cli.py
@@ -595,15 +595,6 @@
return origins
-@pytest.fixture
-def storage(swh_storage):
- """An instance of in-memory storage that gets injected
- into the CLI functions."""
- with patch("swh.storage.get_storage") as get_storage_mock:
- get_storage_mock.return_value = swh_storage
- yield swh_storage
-
-
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
def test_task_schedule_origins_dry_run(swh_scheduler, storage):
"""Tests the scheduling when origin_batch_size*task_batch_size is a
diff --git a/swh/scheduler/tests/test_recurrent_visits.py b/swh/scheduler/tests/test_recurrent_visits.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/tests/test_recurrent_visits.py
@@ -0,0 +1,180 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from datetime import timedelta
+import logging
+from queue import Queue
+from unittest.mock import MagicMock
+
+import pytest
+
+from swh.scheduler.celery_backend.recurrent_visits import (
+ VisitSchedulerThreads,
+ send_visits_for_visit_type,
+ spawn_visit_scheduler_thread,
+ terminate_visit_scheduler_threads,
+ visit_scheduler_thread,
+)
+
+from .test_cli import invoke
+
+TEST_MAX_QUEUE = 10000
+MODULE_NAME = "swh.scheduler.celery_backend.recurrent_visits"
+
+
+def _compute_backend_name(visit_type: str) -> str:
+ "Build a dummy reproducible backend name"
+ return f"swh.loader.{visit_type}.tasks"
+
+
+@pytest.fixture
+def swh_scheduler(swh_scheduler):
+ """Override default fixture of the scheduler to install some more task types."""
+ for visit_type in ["git", "hg", "svn"]:
+ task_type = f"load-{visit_type}"
+ swh_scheduler.create_task_type(
+ {
+ "type": task_type,
+ "max_queue_length": TEST_MAX_QUEUE,
+ "description": "The {} testing task".format(task_type),
+ "backend_name": _compute_backend_name(visit_type),
+ "default_interval": timedelta(days=1),
+ "min_interval": timedelta(hours=6),
+ "max_interval": timedelta(days=12),
+ }
+ )
+ return swh_scheduler
+
+
+def test_cli_schedule_recurrent_unknown_visit_type(swh_scheduler):
+ """When passed an unknown visit type, the recurrent visit scheduler should refuse
+ to start."""
+
+ with pytest.raises(ValueError, match="Unknown"):
+ invoke(
+ swh_scheduler,
+ False,
+ ["schedule-recurrent", "--visit-type", "unknown", "--visit-type", "git"],
+ )
+
+
+def test_cli_schedule_recurrent_noop(swh_scheduler, mocker):
+ """When passing no visit types, the recurrent visit scheduler should start."""
+
+ spawn_visit_scheduler_thread = mocker.patch(
+ f"{MODULE_NAME}.spawn_visit_scheduler_thread"
+ )
+ spawn_visit_scheduler_thread.side_effect = SystemExit
+ # The actual scheduling threads won't spawn, they'll immediately terminate. This
+ # only exercises the logic to pull task types out of the database
+
+ result = invoke(swh_scheduler, False, ["schedule-recurrent"])
+ assert result.exit_code == 0, result.output
+
+
+def test_recurrent_visit_scheduling(
+ swh_scheduler, caplog, listed_origins_by_type, mocker,
+):
+ """Scheduling known tasks is ok."""
+
+ caplog.set_level(logging.DEBUG, MODULE_NAME)
+ nb_origins = 1000
+
+ mock_celery_app = MagicMock()
+ mock_available_slots = mocker.patch(f"{MODULE_NAME}.get_available_slots")
+ mock_available_slots.return_value = nb_origins # Slots available in queue
+
+ # Make sure the scheduler is properly configured in terms of visit/task types
+ all_task_types = {
+ task_type_d["type"]: task_type_d
+ for task_type_d in swh_scheduler.get_task_types()
+ }
+
+ visit_types = list(listed_origins_by_type.keys())
+ assert len(visit_types) > 0
+
+ task_types = []
+ origins = []
+ for visit_type, _origins in listed_origins_by_type.items():
+ origins.extend(swh_scheduler.record_listed_origins(_origins))
+ task_type_name = f"load-{visit_type}"
+ assert task_type_name in all_task_types.keys()
+ task_type = all_task_types[task_type_name]
+ task_type["visit_type"] = visit_type
+ # we'll limit the orchestrator to the origins' type we know
+ task_types.append(task_type)
+
+ for visit_type in ["git", "svn"]:
+ task_type = f"load-{visit_type}"
+ send_visits_for_visit_type(
+ swh_scheduler, mock_celery_app, visit_type, all_task_types[task_type]
+ )
+
+ assert mock_available_slots.called, "The available slots functions should be called"
+
+ records = [record.message for record in caplog.records]
+
+ # Mapping over the dict ratio/policies entries can change overall order so let's
+ # check the set of records
+ expected_records = set()
+ for task_type in task_types:
+ visit_type = task_type["visit_type"]
+ queue_name = task_type["backend_name"]
+ msg = (
+ f"{nb_origins} available slots for visit type {visit_type} "
+ f"in queue {queue_name}"
+ )
+ expected_records.add(msg)
+
+ for expected_record in expected_records:
+ assert expected_record in set(records)
+
+
+@pytest.fixture
+def scheduler_config(swh_scheduler_config):
+ return {"scheduler": {"cls": "local", **swh_scheduler_config}, "celery": {}}
+
+
+def test_visit_scheduler_thread_unknown_task(
+ swh_scheduler, scheduler_config,
+):
+ """Starting a thread with unknown task type reports the error"""
+
+ unknown_visit_type = "unknown"
+ command_queue = Queue()
+ exc_queue = Queue()
+
+ visit_scheduler_thread(
+ scheduler_config, unknown_visit_type, command_queue, exc_queue
+ )
+
+ assert command_queue.empty() is True
+ assert exc_queue.empty() is False
+ assert len(exc_queue.queue) == 1
+ result = exc_queue.queue.pop()
+ assert result[0] == unknown_visit_type
+ assert isinstance(result[1], ValueError)
+
+
+def test_spawn_visit_scheduler_thread_noop(scheduler_config, visit_types, mocker):
+ """Spawning and terminating threads runs smoothly"""
+
+ threads: VisitSchedulerThreads = {}
+ exc_queue = Queue()
+ mock_build_app = mocker.patch("swh.scheduler.celery_backend.config.build_app")
+ mock_build_app.return_value = MagicMock()
+
+ assert len(threads) == 0
+ for visit_type in visit_types:
+ spawn_visit_scheduler_thread(threads, exc_queue, scheduler_config, visit_type)
+
+ # This actually only checks the spawning and terminating logic is sound
+
+ assert len(threads) == len(visit_types)
+
+ actual_threads = terminate_visit_scheduler_threads(threads)
+ assert not len(actual_threads)
+
+ assert mock_build_app.called
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 21 2024, 4:49 PM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3228864
Attached To
D6520: Add a new cli endpoint to schedule recurrent visits in Celery
Event Timeline
Log In to Comment