Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/test_recurrent_visits.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from datetime import timedelta | from datetime import timedelta | ||||
import logging | import logging | ||||
from queue import Queue | from queue import Queue | ||||
from unittest.mock import MagicMock | import time | ||||
import pytest | import pytest | ||||
import yaml | |||||
from swh.scheduler.celery_backend.recurrent_visits import ( | from swh.scheduler.celery_backend.recurrent_visits import ( | ||||
DEFAULT_DVCS_POLICY, | DEFAULT_DVCS_POLICY, | ||||
VisitSchedulerThreads, | VisitSchedulerThreads, | ||||
grab_next_visits_policy_weights, | grab_next_visits_policy_weights, | ||||
send_visits_for_visit_type, | send_visits_for_visit_type, | ||||
spawn_visit_scheduler_thread, | spawn_visit_scheduler_thread, | ||||
terminate_visit_scheduler_threads, | terminate_visit_scheduler_threads, | ||||
Show All 25 Lines | for visit_type in ["test-git", "test-hg", "test-svn"]: | ||||
"default_interval": timedelta(days=1), | "default_interval": timedelta(days=1), | ||||
"min_interval": timedelta(hours=6), | "min_interval": timedelta(hours=6), | ||||
"max_interval": timedelta(days=12), | "max_interval": timedelta(days=12), | ||||
} | } | ||||
) | ) | ||||
return swh_scheduler | return swh_scheduler | ||||
@pytest.fixture | |||||
def all_task_types(swh_scheduler): | |||||
return { | |||||
task_type_d["type"]: task_type_d | |||||
for task_type_d in swh_scheduler.get_task_types() | |||||
} | |||||
def test_cli_schedule_recurrent_unknown_visit_type(swh_scheduler): | def test_cli_schedule_recurrent_unknown_visit_type(swh_scheduler): | ||||
"""When passed an unknown visit type, the recurrent visit scheduler should refuse | """When passed an unknown visit type, the recurrent visit scheduler should refuse | ||||
to start.""" | to start.""" | ||||
with pytest.raises(ValueError, match="Unknown"): | with pytest.raises(ValueError, match="Unknown"): | ||||
invoke( | invoke( | ||||
swh_scheduler, | swh_scheduler, | ||||
False, | False, | ||||
Show All 16 Lines | def test_cli_schedule_recurrent_noop(swh_scheduler, mocker): | ||||
spawn_visit_scheduler_thread.side_effect = SystemExit | spawn_visit_scheduler_thread.side_effect = SystemExit | ||||
# The actual scheduling threads won't spawn, they'll immediately terminate. This | # The actual scheduling threads won't spawn, they'll immediately terminate. This | ||||
# only exercises the logic to pull task types out of the database | # only exercises the logic to pull task types out of the database | ||||
result = invoke(swh_scheduler, False, ["schedule-recurrent"]) | result = invoke(swh_scheduler, False, ["schedule-recurrent"]) | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
def test_send_visits_for_type_no_origin_scheduled_backoff( | |||||
swh_scheduler, all_task_types, mocker | |||||
): | |||||
visit_type = "test-git" | |||||
task_type = f"load-{visit_type}" | |||||
mocker.patch.object(time, "monotonic", lambda: 0) | |||||
no_origins_scheduled_backoff = 60 | |||||
assert ( | |||||
send_visits_for_visit_type( | |||||
swh_scheduler, | |||||
mocker.MagicMock(), | |||||
visit_type, | |||||
all_task_types[task_type], | |||||
DEFAULT_DVCS_POLICY, | |||||
no_origins_scheduled_backoff, | |||||
) | |||||
== no_origins_scheduled_backoff | |||||
) | |||||
def test_cli_schedule_recurrent_no_origins_scheduled_backoff_in_config( | |||||
swh_scheduler, mocker | |||||
): | |||||
"""When passing no visit types, the recurrent visit scheduler should start.""" | |||||
config = """ | |||||
scheduler: | |||||
cls: foo | |||||
args: {} | |||||
no_origins_scheduled_backoff: 60 | |||||
""" | |||||
spawn_visit_scheduler_thread = mocker.patch( | |||||
f"{MODULE_NAME}.spawn_visit_scheduler_thread" | |||||
) | |||||
spawn_visit_scheduler_thread.side_effect = SystemExit | |||||
# The actual scheduling threads won't spawn, they'll immediately terminate. This | |||||
# only exercises the logic to pull task types out of the database | |||||
result = invoke( | |||||
swh_scheduler, | |||||
False, | |||||
["schedule-recurrent", "--visit-type", "test-git"], | |||||
config=config, | |||||
) | |||||
assert result.exit_code == 0, result.output | |||||
assert yaml.safe_load(config) in spawn_visit_scheduler_thread.call_args[0] | |||||
def test_recurrent_visit_scheduling( | def test_recurrent_visit_scheduling( | ||||
swh_scheduler, | swh_scheduler, | ||||
caplog, | caplog, | ||||
listed_origins_by_type, | listed_origins_by_type, | ||||
all_task_types, | |||||
mocker, | mocker, | ||||
): | ): | ||||
"""Scheduling known tasks is ok.""" | """Scheduling known tasks is ok.""" | ||||
caplog.set_level(logging.DEBUG, MODULE_NAME) | caplog.set_level(logging.DEBUG, MODULE_NAME) | ||||
nb_origins = 1000 | nb_origins = 1000 | ||||
mock_celery_app = MagicMock() | mock_celery_app = mocker.MagicMock() | ||||
mock_available_slots = mocker.patch(f"{MODULE_NAME}.get_available_slots") | mock_available_slots = mocker.patch(f"{MODULE_NAME}.get_available_slots") | ||||
mock_available_slots.return_value = nb_origins # Slots available in queue | mock_available_slots.return_value = nb_origins # Slots available in queue | ||||
# Make sure the scheduler is properly configured in terms of visit/task types | # Make sure the scheduler is properly configured in terms of visit/task types | ||||
all_task_types = { | |||||
task_type_d["type"]: task_type_d | |||||
for task_type_d in swh_scheduler.get_task_types() | |||||
} | |||||
visit_types = list(listed_origins_by_type.keys()) | visit_types = list(listed_origins_by_type.keys()) | ||||
assert len(visit_types) > 0 | assert len(visit_types) > 0 | ||||
task_types = [] | task_types = [] | ||||
origins = [] | origins = [] | ||||
for visit_type, _origins in listed_origins_by_type.items(): | for visit_type, _origins in listed_origins_by_type.items(): | ||||
origins.extend(swh_scheduler.record_listed_origins(_origins)) | origins.extend(swh_scheduler.record_listed_origins(_origins)) | ||||
task_type_name = f"load-{visit_type}" | task_type_name = f"load-{visit_type}" | ||||
▲ Show 20 Lines • Show All 81 Lines • ▼ Show 20 Lines | |||||
def test_spawn_visit_scheduler_thread_noop(scheduler_config, visit_types, mocker): | def test_spawn_visit_scheduler_thread_noop(scheduler_config, visit_types, mocker): | ||||
"""Spawning and terminating threads runs smoothly""" | """Spawning and terminating threads runs smoothly""" | ||||
threads: VisitSchedulerThreads = {} | threads: VisitSchedulerThreads = {} | ||||
exc_queue = Queue() | exc_queue = Queue() | ||||
mock_build_app = mocker.patch("swh.scheduler.celery_backend.config.build_app") | mock_build_app = mocker.patch("swh.scheduler.celery_backend.config.build_app") | ||||
mock_build_app.return_value = MagicMock() | mock_build_app.return_value = mocker.MagicMock() | ||||
assert len(threads) == 0 | assert len(threads) == 0 | ||||
for visit_type in visit_types: | for visit_type in visit_types: | ||||
spawn_visit_scheduler_thread(threads, exc_queue, scheduler_config, visit_type) | spawn_visit_scheduler_thread(threads, exc_queue, scheduler_config, visit_type) | ||||
# This actually only checks the spawning and terminating logic is sound | # This actually only checks the spawning and terminating logic is sound | ||||
assert len(threads) == len(visit_types) | assert len(threads) == len(visit_types) | ||||
actual_threads = terminate_visit_scheduler_threads(threads) | actual_threads = terminate_visit_scheduler_threads(threads) | ||||
assert not len(actual_threads) | assert not len(actual_threads) | ||||
assert mock_build_app.called | assert mock_build_app.called |