Page MenuHomeSoftware Heritage

D8475.diff
No OneTemporary

D8475.diff

diff --git a/swh/scheduler/celery_backend/recurrent_visits.py b/swh/scheduler/celery_backend/recurrent_visits.py
--- a/swh/scheduler/celery_backend/recurrent_visits.py
+++ b/swh/scheduler/celery_backend/recurrent_visits.py
@@ -68,7 +68,7 @@
"""Backoff time (in seconds) if there's fewer than :py:data:`MIN_SLOTS_RATIO` slots
available in the queue."""
-NO_ORIGINS_SCHEDULED_BACKOFF = 20 * 60
+DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF = 20 * 60
"""Backoff time (in seconds) if no origins have been scheduled in the current
iteration"""
@@ -175,6 +175,7 @@
visit_type: str,
task_type: Dict,
policy_cfg: List[Dict[str, Any]],
+ no_origins_scheduled_backoff: int = DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF,
) -> float:
"""Schedule the next batch of visits for the given ``visit_type``.
@@ -190,8 +191,8 @@
:py:func:`grab_next_visits_policy_weights` to retrieve the next set of origin visits
to schedule, and we send them to celery.
- If the last scheduling attempt didn't return any origins, we sleep for
- :py:data:`NO_ORIGINS_SCHEDULED_BACKOFF` seconds. This avoids running the expensive
+ If the last scheduling attempt didn't return any origins, we sleep by default for
+ :py:data:`DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF` seconds. This avoids running the expensive
:py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` queries too
often if there's nothing left to schedule.
@@ -227,7 +228,7 @@
if not origins:
logger.debug("No origins to visit for type %s", visit_type)
- return current_iteration_start + NO_ORIGINS_SCHEDULED_BACKOFF
+ return current_iteration_start + no_origins_scheduled_backoff
# Try to smooth the ingestion load, origins pulled by different
# scheduling policies have different resource usage patterns
@@ -307,6 +308,9 @@
visit_type,
task_type,
policy_cfg.get(visit_type, policy_cfg["default"]),
+ config.get(
+ "no_origins_scheduled_backoff", DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF
+ ),
)
except BaseException as e:
diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py
--- a/swh/scheduler/tests/test_cli.py
+++ b/swh/scheduler/tests/test_cli.py
@@ -26,14 +26,14 @@
"""
-def invoke(scheduler, catch_exceptions, args):
+def invoke(scheduler, catch_exceptions, args, config=CLI_CONFIG):
runner = CliRunner()
with patch(
"swh.scheduler.get_scheduler"
) as get_scheduler_mock, tempfile.NamedTemporaryFile(
"a", suffix=".yml"
) as config_fd:
- config_fd.write(CLI_CONFIG)
+ config_fd.write(config)
config_fd.seek(0)
get_scheduler_mock.return_value = scheduler
args = [
diff --git a/swh/scheduler/tests/test_recurrent_visits.py b/swh/scheduler/tests/test_recurrent_visits.py
--- a/swh/scheduler/tests/test_recurrent_visits.py
+++ b/swh/scheduler/tests/test_recurrent_visits.py
@@ -6,9 +6,10 @@
from datetime import timedelta
import logging
from queue import Queue
-from unittest.mock import MagicMock
+import time
import pytest
+import yaml
from swh.scheduler.celery_backend.recurrent_visits import (
DEFAULT_DVCS_POLICY,
@@ -50,6 +51,14 @@
return swh_scheduler
+@pytest.fixture
+def all_task_types(swh_scheduler):
+ return {
+ task_type_d["type"]: task_type_d
+ for task_type_d in swh_scheduler.get_task_types()
+ }
+
+
def test_cli_schedule_recurrent_unknown_visit_type(swh_scheduler):
"""When passed an unknown visit type, the recurrent visit scheduler should refuse
to start."""
@@ -82,10 +91,61 @@
assert result.exit_code == 0, result.output
+def test_send_visits_for_type_no_origin_scheduled_backoff(
+ swh_scheduler, all_task_types, mocker
+):
+ visit_type = "test-git"
+ task_type = f"load-{visit_type}"
+ mocker.patch.object(time, "monotonic", lambda: 0)
+ no_origins_scheduled_backoff = 60
+ assert (
+ send_visits_for_visit_type(
+ swh_scheduler,
+ mocker.MagicMock(),
+ visit_type,
+ all_task_types[task_type],
+ DEFAULT_DVCS_POLICY,
+ no_origins_scheduled_backoff,
+ )
+ == no_origins_scheduled_backoff
+ )
+
+
+def test_cli_schedule_recurrent_no_origins_scheduled_backoff_in_config(
+ swh_scheduler, mocker
+):
+ """When passing no visit types, the recurrent visit scheduler should start."""
+
+ config = """
+ scheduler:
+ cls: foo
+ args: {}
+ no_origins_scheduled_backoff: 60
+ """
+
+ spawn_visit_scheduler_thread = mocker.patch(
+ f"{MODULE_NAME}.spawn_visit_scheduler_thread"
+ )
+ spawn_visit_scheduler_thread.side_effect = SystemExit
+ # The actual scheduling threads won't spawn, they'll immediately terminate. This
+ # only exercises the logic to pull task types out of the database
+
+ result = invoke(
+ swh_scheduler,
+ False,
+ ["schedule-recurrent", "--visit-type", "test-git"],
+ config=config,
+ )
+ assert result.exit_code == 0, result.output
+
+ assert yaml.safe_load(config) in spawn_visit_scheduler_thread.call_args[0]
+
+
def test_recurrent_visit_scheduling(
swh_scheduler,
caplog,
listed_origins_by_type,
+ all_task_types,
mocker,
):
"""Scheduling known tasks is ok."""
@@ -93,16 +153,11 @@
caplog.set_level(logging.DEBUG, MODULE_NAME)
nb_origins = 1000
- mock_celery_app = MagicMock()
+ mock_celery_app = mocker.MagicMock()
mock_available_slots = mocker.patch(f"{MODULE_NAME}.get_available_slots")
mock_available_slots.return_value = nb_origins # Slots available in queue
# Make sure the scheduler is properly configured in terms of visit/task types
- all_task_types = {
- task_type_d["type"]: task_type_d
- for task_type_d in swh_scheduler.get_task_types()
- }
-
visit_types = list(listed_origins_by_type.keys())
assert len(visit_types) > 0
@@ -200,7 +255,7 @@
threads: VisitSchedulerThreads = {}
exc_queue = Queue()
mock_build_app = mocker.patch("swh.scheduler.celery_backend.config.build_app")
- mock_build_app.return_value = MagicMock()
+ mock_build_app.return_value = mocker.MagicMock()
assert len(threads) == 0
for visit_type in visit_types:

File Metadata

Mime Type
text/plain
Expires
Thu, Dec 19, 11:53 AM (14 h, 25 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3220031

Event Timeline