Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/scheduler/celery_backend/recurrent_visits.py b/swh/scheduler/celery_backend/recurrent_visits.py
index 8f43d8e..2a9ebe6 100644
--- a/swh/scheduler/celery_backend/recurrent_visits.py
+++ b/swh/scheduler/celery_backend/recurrent_visits.py
@@ -1,311 +1,323 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""This schedules the recurrent visits, for listed origins, in Celery.
For "oneshot" (save code now, lister) tasks, check the
:mod:`swh.scheduler.celery_backend.runner` and
:mod:`swh.scheduler.celery_backend.pika_listener` modules.
"""
from __future__ import annotations
from itertools import chain
import logging
from queue import Empty, Queue
import random
from threading import Thread
import time
from typing import TYPE_CHECKING, Any, Dict, List, Tuple
from kombu.utils.uuid import uuid
from swh.scheduler.celery_backend.config import get_available_slots
if TYPE_CHECKING:
from ..interface import SchedulerInterface
from ..model import ListedOrigin
logger = logging.getLogger(__name__)
_VCS_POLICY_WEIGHTS: Dict[str, float] = {
"already_visited_order_by_lag": 49,
"never_visited_oldest_update_first": 49,
"origins_without_last_update": 2,
}
POLICY_WEIGHTS: Dict[str, Dict[str, float]] = {
"default": {
"already_visited_order_by_lag": 50,
"never_visited_oldest_update_first": 50,
},
"git": _VCS_POLICY_WEIGHTS,
"hg": _VCS_POLICY_WEIGHTS,
"svn": _VCS_POLICY_WEIGHTS,
"cvs": _VCS_POLICY_WEIGHTS,
"bzr": _VCS_POLICY_WEIGHTS,
}
+
+POLICY_ADDITIONAL_PARAMETERS: Dict[str, Dict[str, Any]] = {
+ "git": {
+ "already_visited_order_by_lag": {"tablesample": 0.1},
+ "never_visited_oldest_update_first": {"tablesample": 0.1},
+ "origins_without_last_update": {"tablesample": 0.1},
+ }
+}
+
"""Scheduling policies to use to retrieve visits for the given visit types, with their
relative weights"""
MIN_SLOTS_RATIO = 0.05
"""Quantity of slots that need to be available (with respect to max_queue_length) for
:py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` to trigger"""
QUEUE_FULL_BACKOFF = 60
"""Backoff time (in seconds) if there's fewer than :py:data:`MIN_SLOTS_RATIO` slots
available in the queue."""
NO_ORIGINS_SCHEDULED_BACKOFF = 20 * 60
"""Backoff time (in seconds) if no origins have been scheduled in the current
iteration"""
BACKOFF_SPLAY = 5.0
"""Amplitude of the fuzziness between backoffs"""
TERMINATE = object()
"""Termination request received from command queue (singleton used for identity
comparison)"""
def grab_next_visits_policy_weights(
scheduler: SchedulerInterface, visit_type: str, num_visits: int
) -> List[ListedOrigin]:
"""Get the next ``num_visits`` for the given ``visit_type`` using the corresponding
set of scheduling policies.
The :py:data:`POLICY_WEIGHTS` dict sets, for each visit type, the scheduling
policies used to pull the next tasks, and what proportion of the available
num_visits they take.
This function emits a warning if the ratio of retrieved origins is off of
the requested ratio by more than 5%.
Returns:
at most ``num_visits`` :py:class:`~swh.scheduler.model.ListedOrigin` objects
"""
policy_weights = POLICY_WEIGHTS.get(visit_type, POLICY_WEIGHTS["default"])
total_weight = sum(policy_weights.values())
if not total_weight:
raise ValueError(f"No policy weights set for visit type {visit_type}")
policy_ratio = {
policy: weight / total_weight for policy, weight in policy_weights.items()
}
fetched_origins: Dict[str, List[ListedOrigin]] = {}
for policy, ratio in policy_ratio.items():
num_tasks_to_send = int(num_visits * ratio)
fetched_origins[policy] = scheduler.grab_next_visits(
- visit_type, num_tasks_to_send, policy=policy
+ visit_type,
+ num_tasks_to_send,
+ policy=policy,
+ **POLICY_ADDITIONAL_PARAMETERS.get(visit_type, {}).get(policy, {}),
)
all_origins: List[ListedOrigin] = list(
chain.from_iterable(fetched_origins.values())
)
if not all_origins:
return []
# Check whether the ratios of origins fetched are skewed with respect to the
# ones we requested
fetched_origin_ratios = {
policy: len(origins) / len(all_origins)
for policy, origins in fetched_origins.items()
}
for policy, expected_ratio in policy_ratio.items():
# 5% of skew with respect to request
if abs(fetched_origin_ratios[policy] - expected_ratio) / expected_ratio > 0.05:
logger.info(
"Skewed fetch for visit type %s with policy %s: fetched %s, "
"requested %s",
visit_type,
policy,
fetched_origin_ratios[policy],
expected_ratio,
)
return all_origins
def splay():
"""Return a random short interval by which to vary the backoffs for the visit
scheduling threads"""
return random.uniform(0, BACKOFF_SPLAY)
def send_visits_for_visit_type(
scheduler: SchedulerInterface, app, visit_type: str, task_type: Dict,
) -> float:
"""Schedule the next batch of visits for the given ``visit_type``.
First, we determine the number of available slots by introspecting the RabbitMQ
queue.
If there's fewer than :py:data:`MIN_SLOTS_RATIO` slots available in the queue, we
wait for :py:data:`QUEUE_FULL_BACKOFF` seconds. This avoids running the expensive
:py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` queries when
there's not many jobs to queue.
Once there's more than :py:data:`MIN_SLOTS_RATIO` slots available, we run
:py:func:`grab_next_visits_policy_weights` to retrieve the next set of origin visits
to schedule, and we send them to celery.
If the last scheduling attempt didn't return any origins, we sleep for
:py:data:`NO_ORIGINS_SCHEDULED_BACKOFF` seconds. This avoids running the expensive
:py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` queries too
often if there's nothing left to schedule.
Returns:
the earliest :py:func:`time.monotonic` value at which to run the next iteration
of the loop.
"""
queue_name = task_type["backend_name"]
max_queue_length = task_type.get("max_queue_length") or 0
min_available_slots = max_queue_length * MIN_SLOTS_RATIO
current_iteration_start = time.monotonic()
# Check queue level
available_slots = get_available_slots(app, queue_name, max_queue_length)
logger.debug(
"%s available slots for visit type %s in queue %s",
available_slots,
visit_type,
queue_name,
)
if available_slots < min_available_slots:
return current_iteration_start + QUEUE_FULL_BACKOFF
origins = grab_next_visits_policy_weights(scheduler, visit_type, available_slots)
if not origins:
logger.debug("No origins to visit for type %s", visit_type)
return current_iteration_start + NO_ORIGINS_SCHEDULED_BACKOFF
# Try to smooth the ingestion load, origins pulled by different
# scheduling policies have different resource usage patterns
random.shuffle(origins)
for origin in origins:
task_dict = origin.as_task_dict()
app.send_task(
queue_name,
task_id=uuid(),
args=task_dict["arguments"]["args"],
kwargs=task_dict["arguments"]["kwargs"],
queue=queue_name,
)
logger.info(
"%s: %s visits scheduled in queue %s", visit_type, len(origins), queue_name,
)
# When everything worked, we can try to schedule origins again ASAP.
return time.monotonic()
def visit_scheduler_thread(
config: Dict,
visit_type: str,
command_queue: Queue[object],
exc_queue: Queue[Tuple[str, BaseException]],
):
"""Target function for the visit sending thread, which initializes local connections
and handles exceptions by sending them back to the main thread."""
from swh.scheduler import get_scheduler
from swh.scheduler.celery_backend.config import build_app
try:
# We need to reinitialize these connections because they're not generally
# thread-safe
app = build_app(config.get("celery"))
scheduler = get_scheduler(**config["scheduler"])
task_type = scheduler.get_task_type(f"load-{visit_type}")
if task_type is None:
raise ValueError(f"Unknown task type: load-{visit_type}")
next_iteration = time.monotonic()
while True:
# vary the next iteration time a little bit
next_iteration = next_iteration + splay()
while time.monotonic() < next_iteration:
# Wait for next iteration to start. Listen for termination message.
try:
msg = command_queue.get(block=True, timeout=1)
except Empty:
continue
if msg is TERMINATE:
return
else:
logger.warn("Received unexpected message %s in command queue", msg)
next_iteration = send_visits_for_visit_type(
scheduler, app, visit_type, task_type
)
except BaseException as e:
exc_queue.put((visit_type, e))
VisitSchedulerThreads = Dict[str, Tuple[Thread, Queue]]
"""Dict storing the visit scheduler threads and their command queues"""
def spawn_visit_scheduler_thread(
threads: VisitSchedulerThreads,
exc_queue: Queue[Tuple[str, BaseException]],
config: Dict[str, Any],
visit_type: str,
):
"""Spawn a new thread to schedule the visits of type ``visit_type``."""
command_queue: Queue[object] = Queue()
thread = Thread(
target=visit_scheduler_thread,
kwargs={
"config": config,
"visit_type": visit_type,
"command_queue": command_queue,
"exc_queue": exc_queue,
},
)
threads[visit_type] = (thread, command_queue)
thread.start()
def terminate_visit_scheduler_threads(threads: VisitSchedulerThreads) -> List[str]:
"""Terminate all visit scheduler threads"""
logger.info("Termination requested...")
for _, command_queue in threads.values():
command_queue.put(TERMINATE)
loops = 0
while threads and loops < 10:
logger.info(
"Terminating visit scheduling threads: %s", ", ".join(sorted(threads))
)
loops += 1
for visit_type, (thread, _) in list(threads.items()):
thread.join(timeout=1)
if not thread.is_alive():
logger.debug("Thread %s terminated", visit_type)
del threads[visit_type]
if threads:
logger.warn(
"Could not reap the following threads after 10 attempts: %s",
", ".join(sorted(threads)),
)
return list(sorted(threads))
diff --git a/swh/scheduler/tests/test_recurrent_visits.py b/swh/scheduler/tests/test_recurrent_visits.py
index d118fbf..efa8d67 100644
--- a/swh/scheduler/tests/test_recurrent_visits.py
+++ b/swh/scheduler/tests/test_recurrent_visits.py
@@ -1,180 +1,202 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import timedelta
import logging
from queue import Queue
from unittest.mock import MagicMock
import pytest
from swh.scheduler.celery_backend.recurrent_visits import (
+ POLICY_ADDITIONAL_PARAMETERS,
VisitSchedulerThreads,
+ grab_next_visits_policy_weights,
send_visits_for_visit_type,
spawn_visit_scheduler_thread,
terminate_visit_scheduler_threads,
visit_scheduler_thread,
)
from .test_cli import invoke
TEST_MAX_QUEUE = 10000
MODULE_NAME = "swh.scheduler.celery_backend.recurrent_visits"
def _compute_backend_name(visit_type: str) -> str:
"Build a dummy reproducible backend name"
return f"swh.loader.{visit_type}.tasks"
@pytest.fixture
def swh_scheduler(swh_scheduler):
"""Override default fixture of the scheduler to install some more task types."""
for visit_type in ["git", "hg", "svn"]:
task_type = f"load-{visit_type}"
swh_scheduler.create_task_type(
{
"type": task_type,
"max_queue_length": TEST_MAX_QUEUE,
"description": "The {} testing task".format(task_type),
"backend_name": _compute_backend_name(visit_type),
"default_interval": timedelta(days=1),
"min_interval": timedelta(hours=6),
"max_interval": timedelta(days=12),
}
)
return swh_scheduler
def test_cli_schedule_recurrent_unknown_visit_type(swh_scheduler):
"""When passed an unknown visit type, the recurrent visit scheduler should refuse
to start."""
with pytest.raises(ValueError, match="Unknown"):
invoke(
swh_scheduler,
False,
["schedule-recurrent", "--visit-type", "unknown", "--visit-type", "git"],
)
def test_cli_schedule_recurrent_noop(swh_scheduler, mocker):
"""When passing no visit types, the recurrent visit scheduler should start."""
spawn_visit_scheduler_thread = mocker.patch(
f"{MODULE_NAME}.spawn_visit_scheduler_thread"
)
spawn_visit_scheduler_thread.side_effect = SystemExit
# The actual scheduling threads won't spawn, they'll immediately terminate. This
# only exercises the logic to pull task types out of the database
result = invoke(swh_scheduler, False, ["schedule-recurrent"])
assert result.exit_code == 0, result.output
def test_recurrent_visit_scheduling(
swh_scheduler, caplog, listed_origins_by_type, mocker,
):
"""Scheduling known tasks is ok."""
caplog.set_level(logging.DEBUG, MODULE_NAME)
nb_origins = 1000
mock_celery_app = MagicMock()
mock_available_slots = mocker.patch(f"{MODULE_NAME}.get_available_slots")
mock_available_slots.return_value = nb_origins # Slots available in queue
# Make sure the scheduler is properly configured in terms of visit/task types
all_task_types = {
task_type_d["type"]: task_type_d
for task_type_d in swh_scheduler.get_task_types()
}
visit_types = list(listed_origins_by_type.keys())
assert len(visit_types) > 0
task_types = []
origins = []
for visit_type, _origins in listed_origins_by_type.items():
origins.extend(swh_scheduler.record_listed_origins(_origins))
task_type_name = f"load-{visit_type}"
assert task_type_name in all_task_types.keys()
task_type = all_task_types[task_type_name]
task_type["visit_type"] = visit_type
# we'll limit the orchestrator to the origins' type we know
task_types.append(task_type)
for visit_type in ["git", "svn"]:
task_type = f"load-{visit_type}"
send_visits_for_visit_type(
swh_scheduler, mock_celery_app, visit_type, all_task_types[task_type]
)
assert mock_available_slots.called, "The available slots functions should be called"
records = [record.message for record in caplog.records]
# Mapping over the dict ratio/policies entries can change overall order so let's
# check the set of records
expected_records = set()
for task_type in task_types:
visit_type = task_type["visit_type"]
queue_name = task_type["backend_name"]
msg = (
f"{nb_origins} available slots for visit type {visit_type} "
f"in queue {queue_name}"
)
expected_records.add(msg)
for expected_record in expected_records:
assert expected_record in set(records)
+@pytest.mark.parametrize(
+ "visit_type, tablesamples",
+ [("hg", {}), ("git", POLICY_ADDITIONAL_PARAMETERS["git"])],
+)
+def test_recurrent_visit_additional_parameters(
+ swh_scheduler, mocker, visit_type, tablesamples
+):
+ """Testing additional policy parameters"""
+
+ mock_grab_next_visits = mocker.patch.object(swh_scheduler, "grab_next_visits")
+ mock_grab_next_visits.return_value = []
+
+ grab_next_visits_policy_weights(swh_scheduler, visit_type, 10)
+
+ for call in mock_grab_next_visits.call_args_list:
+ assert call[1].get("tablesample") == tablesamples.get(
+ call[1]["policy"], {}
+ ).get("tablesample")
+
+
@pytest.fixture
def scheduler_config(swh_scheduler_config):
return {"scheduler": {"cls": "local", **swh_scheduler_config}, "celery": {}}
def test_visit_scheduler_thread_unknown_task(
swh_scheduler, scheduler_config,
):
"""Starting a thread with unknown task type reports the error"""
unknown_visit_type = "unknown"
command_queue = Queue()
exc_queue = Queue()
visit_scheduler_thread(
scheduler_config, unknown_visit_type, command_queue, exc_queue
)
assert command_queue.empty() is True
assert exc_queue.empty() is False
assert len(exc_queue.queue) == 1
result = exc_queue.queue.pop()
assert result[0] == unknown_visit_type
assert isinstance(result[1], ValueError)
def test_spawn_visit_scheduler_thread_noop(scheduler_config, visit_types, mocker):
"""Spawning and terminating threads runs smoothly"""
threads: VisitSchedulerThreads = {}
exc_queue = Queue()
mock_build_app = mocker.patch("swh.scheduler.celery_backend.config.build_app")
mock_build_app.return_value = MagicMock()
assert len(threads) == 0
for visit_type in visit_types:
spawn_visit_scheduler_thread(threads, exc_queue, scheduler_config, visit_type)
# This actually only checks the spawning and terminating logic is sound
assert len(threads) == len(visit_types)
actual_threads = terminate_visit_scheduler_threads(threads)
assert not len(actual_threads)
assert mock_build_app.called

File Metadata

Mime Type
text/x-diff
Expires
Jul 4 2025, 8:13 AM (9 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3293141

Event Timeline