diff --git a/swh/scheduler/celery_backend/recurrent_visits.py b/swh/scheduler/celery_backend/recurrent_visits.py --- a/swh/scheduler/celery_backend/recurrent_visits.py +++ b/swh/scheduler/celery_backend/recurrent_visits.py @@ -68,7 +68,7 @@ """Backoff time (in seconds) if there's fewer than :py:data:`MIN_SLOTS_RATIO` slots available in the queue.""" -NO_ORIGINS_SCHEDULED_BACKOFF = 20 * 60 +DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF = 20 * 60 """Backoff time (in seconds) if no origins have been scheduled in the current iteration""" @@ -175,6 +175,7 @@ visit_type: str, task_type: Dict, policy_cfg: List[Dict[str, Any]], + no_origins_scheduled_backoff: int = DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF, ) -> float: """Schedule the next batch of visits for the given ``visit_type``. @@ -190,8 +191,8 @@ :py:func:`grab_next_visits_policy_weights` to retrieve the next set of origin visits to schedule, and we send them to celery. - If the last scheduling attempt didn't return any origins, we sleep for - :py:data:`NO_ORIGINS_SCHEDULED_BACKOFF` seconds. This avoids running the expensive + If the last scheduling attempt didn't return any origins, we sleep by default for + :py:data:`DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF` seconds. This avoids running the expensive :py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` queries too often if there's nothing left to schedule. @@ -227,7 +228,7 @@ if not origins: logger.debug("No origins to visit for type %s", visit_type) - return current_iteration_start + NO_ORIGINS_SCHEDULED_BACKOFF + return current_iteration_start + no_origins_scheduled_backoff # Try to smooth the ingestion load, origins pulled by different # scheduling policies have different resource usage patterns @@ -307,6 +308,9 @@ visit_type, task_type, policy_cfg.get(visit_type, policy_cfg["default"]), + config.get( + "no_origins_scheduled_backoff", DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF + ), ) except BaseException as e: diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py --- a/swh/scheduler/tests/test_cli.py +++ b/swh/scheduler/tests/test_cli.py @@ -26,14 +26,14 @@ """ -def invoke(scheduler, catch_exceptions, args): +def invoke(scheduler, catch_exceptions, args, config=CLI_CONFIG): runner = CliRunner() with patch( "swh.scheduler.get_scheduler" ) as get_scheduler_mock, tempfile.NamedTemporaryFile( "a", suffix=".yml" ) as config_fd: - config_fd.write(CLI_CONFIG) + config_fd.write(config) config_fd.seek(0) get_scheduler_mock.return_value = scheduler args = [ diff --git a/swh/scheduler/tests/test_recurrent_visits.py b/swh/scheduler/tests/test_recurrent_visits.py --- a/swh/scheduler/tests/test_recurrent_visits.py +++ b/swh/scheduler/tests/test_recurrent_visits.py @@ -6,9 +6,10 @@ from datetime import timedelta import logging from queue import Queue -from unittest.mock import MagicMock +import time import pytest +import yaml from swh.scheduler.celery_backend.recurrent_visits import ( DEFAULT_DVCS_POLICY, @@ -50,6 +51,14 @@ return swh_scheduler +@pytest.fixture +def all_task_types(swh_scheduler): + return { + task_type_d["type"]: task_type_d + for task_type_d in swh_scheduler.get_task_types() + } + + def test_cli_schedule_recurrent_unknown_visit_type(swh_scheduler): """When passed an unknown visit type, the recurrent visit scheduler should refuse to start.""" @@ -82,10 +91,61 @@ assert result.exit_code == 0, result.output +def test_send_visits_for_type_no_origin_scheduled_backoff( + swh_scheduler, all_task_types, mocker +): + visit_type = "test-git" + task_type = f"load-{visit_type}" + mocker.patch.object(time, "monotonic", lambda: 0) + no_origins_scheduled_backoff = 60 + assert ( + send_visits_for_visit_type( + swh_scheduler, + mocker.MagicMock(), + visit_type, + all_task_types[task_type], + DEFAULT_DVCS_POLICY, + no_origins_scheduled_backoff, + ) + == no_origins_scheduled_backoff + ) + + +def test_cli_schedule_recurrent_no_origins_scheduled_backoff_in_config( + swh_scheduler, mocker +): + """When passing no visit types, the recurrent visit scheduler should start.""" + + config = """ + scheduler: + cls: foo + args: {} + no_origins_scheduled_backoff: 60 + """ + + spawn_visit_scheduler_thread = mocker.patch( + f"{MODULE_NAME}.spawn_visit_scheduler_thread" + ) + spawn_visit_scheduler_thread.side_effect = SystemExit + # The actual scheduling threads won't spawn, they'll immediately terminate. This + # only exercises the logic to pull task types out of the database + + result = invoke( + swh_scheduler, + False, + ["schedule-recurrent", "--visit-type", "test-git"], + config=config, + ) + assert result.exit_code == 0, result.output + + assert yaml.safe_load(config) in spawn_visit_scheduler_thread.call_args[0] + + def test_recurrent_visit_scheduling( swh_scheduler, caplog, listed_origins_by_type, + all_task_types, mocker, ): """Scheduling known tasks is ok.""" @@ -93,16 +153,11 @@ caplog.set_level(logging.DEBUG, MODULE_NAME) nb_origins = 1000 - mock_celery_app = MagicMock() + mock_celery_app = mocker.MagicMock() mock_available_slots = mocker.patch(f"{MODULE_NAME}.get_available_slots") mock_available_slots.return_value = nb_origins # Slots available in queue # Make sure the scheduler is properly configured in terms of visit/task types - all_task_types = { - task_type_d["type"]: task_type_d - for task_type_d in swh_scheduler.get_task_types() - } - visit_types = list(listed_origins_by_type.keys()) assert len(visit_types) > 0 @@ -200,7 +255,7 @@ threads: VisitSchedulerThreads = {} exc_queue = Queue() mock_build_app = mocker.patch("swh.scheduler.celery_backend.config.build_app") - mock_build_app.return_value = MagicMock() + mock_build_app.return_value = mocker.MagicMock() assert len(threads) == 0 for visit_type in visit_types: