Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/test_orchestrator.py
- This file was added.
# Copyright (C) 2021 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
from datetime import timedelta | |||||
import pytest | |||||
from swh.scheduler.celery_backend.orchestrator import orchestrate | |||||
from .test_cli import _fill_storage_with_origins, invoke | |||||
TEST_MAX_QUEUE = 10000 | |||||
def _compute_backend_name(visit_type: str) -> str: | |||||
"Build a dummy reproducible backend name" | |||||
return f"swh.loader.{visit_type}.tasks" | |||||
@pytest.fixture | |||||
def swh_scheduler(swh_scheduler): | |||||
"""Override default fixture of the scheduler to install some more task types. | |||||
""" | |||||
for visit_type in ["git", "hg", "svn"]: | |||||
task_type = f"load-{visit_type}" | |||||
swh_scheduler.create_task_type( | |||||
{ | |||||
"type": task_type, | |||||
"max_queue_length": TEST_MAX_QUEUE, | |||||
"description": "The {} testing task".format(task_type), | |||||
"backend_name": _compute_backend_name(visit_type), | |||||
"default_interval": timedelta(days=1), | |||||
"min_interval": timedelta(hours=6), | |||||
"max_interval": timedelta(days=12), | |||||
} | |||||
) | |||||
return swh_scheduler | |||||
def test_cli_orchestrator_unknown_visit_type(swh_scheduler): | |||||
"""When passing unknown visit type, orchestrator should refuse to start.""" | |||||
with pytest.raises(ValueError, match="Unknown"): | |||||
invoke( | |||||
swh_scheduler, | |||||
False, | |||||
["start-orchestrator", "--visit-type", "unknown", "--visit-type", "git",], | |||||
) | |||||
def test_cli_orchestrator_noop(swh_scheduler): | |||||
"""Trigger orchestrator without any parameter nor anything to do should noop.""" | |||||
# The orchestrator will just iterate over existing tasks from the scheduler and do | |||||
# noop. We are just checking it does not explode here. | |||||
result = invoke(swh_scheduler, False, ["start-orchestrator",],) | |||||
assert result.exit_code == 0, result.output | |||||
def test_orchestrator_scheduling( | |||||
swh_scheduler, | |||||
swh_scheduler_celery_app, | |||||
storage, | |||||
caplog, | |||||
listed_origins_by_type, | |||||
mocker, | |||||
): | |||||
"""Orchestrator schedules known tasks.""" | |||||
nb_total_origins = 1000 | |||||
mock = mocker.patch("swh.scheduler.celery_backend.orchestrator.get_available_slots") | |||||
mock.return_value = nb_total_origins # Slots available in queue | |||||
nb_total_origins = 50 | |||||
origins = _fill_storage_with_origins(storage, nb_total_origins) | |||||
# Make sure the schedule is properly configured in terms of visit/task types | |||||
all_task_types = { | |||||
task_type_d["type"]: task_type_d | |||||
for task_type_d in swh_scheduler.get_task_types() | |||||
} | |||||
visit_types = list(listed_origins_by_type.keys()) | |||||
assert len(visit_types) > 0 | |||||
task_types = [] | |||||
origins = [] | |||||
for visit_type, _origins in listed_origins_by_type.items(): | |||||
origins.extend(swh_scheduler.record_listed_origins(_origins)) | |||||
task_type_name = f"load-{visit_type}" | |||||
assert task_type_name in all_task_types.keys() | |||||
task_types.append(all_task_types[task_type_name]) | |||||
orchestrate(swh_scheduler, swh_scheduler_celery_app, task_types) | |||||
records = [ | |||||
record.message for record in caplog.records if record.levelname == "INFO" | |||||
] | |||||
# Mapping over the dict ratio/policies entries can change overall order so let's | |||||
# check the set of records | |||||
assert set(records) == set( | |||||
[ | |||||
"git: 1000 slots available in queue swh.loader.git.tasks", | |||||
"git: 490.0 tasks to send with policy already_visited_order_by_lag", | |||||
"git: 490.0 tasks to send with policy never_visited_oldest_update_first", | |||||
"git: 20.0 tasks to send with policy origins_without_last_update", | |||||
"git: 490 sent visits to queue swh.loader.git.tasks", | |||||
"svn: 1000 slots available in queue swh.loader.svn.tasks", | |||||
"svn: 490.0 tasks to send with policy already_visited_order_by_lag", | |||||
"svn: 490.0 tasks to send with policy never_visited_oldest_update_first", | |||||
"svn: 20.0 tasks to send with policy origins_without_last_update", | |||||
"svn: 490 sent visits to queue swh.loader.svn.tasks", | |||||
] | |||||
) |