diff --git a/swh/scheduler/celery_backend/orchestrator.py b/swh/scheduler/celery_backend/orchestrator.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/celery_backend/orchestrator.py @@ -0,0 +1,109 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""This is the scheduler orchestrator. It's in charge of scheduling recurring origins. + +For "oneshot" tasks, check the :mod:`swh.scheduler.celery_backend.runner` and +:mod:`swh.scheduler.celery_backend.pika_listener` modules. + +""" + +from __future__ import annotations + +import logging +import random +from typing import TYPE_CHECKING, Dict, List + +from kombu.utils.uuid import uuid + +from swh.scheduler.celery_backend.config import get_available_slots + +if TYPE_CHECKING: + from ..interface import SchedulerInterface + from ..model import ListedOrigin + +logger = logging.getLogger(__name__) + + +# either a dvcs or a package +MAPPING_TYPE_TO_RATIO_KEY: Dict[str, str] = { + "git": "dvcs", + "hg": "dvcs", + "svn": "dvcs", + "cvs": "dvcs", + "bzr": "dvcs", +} + +# Default policy ratio, let's start that configuration in the module first +POLICY_RATIO: Dict[str, Dict[str, float]] = { + "package": { + "already_visited_order_by_lag": 0.5, + "never_visited_oldest_update_first": 0.5, + }, + "dvcs": { + "already_visited_order_by_lag": 0.49, + "never_visited_oldest_update_first": 0.49, + "origins_without_last_update": 0.02, + }, +} + + +def orchestrate(scheduler: SchedulerInterface, app, task_types: List[Dict]) -> None: + """Orchestrate recurring origins scheduling for given visit types. + + """ + logger.debug( + "Orchestrate recurring origin visits for %s task types", + ", ".join(t["type"] for t in task_types), + ) + for task_type in task_types: + queue_name = task_type["backend_name"] + # FIXME: compute the visit type out of the task's type... + visit_type = task_type["type"][5:] # crop "load-" + # Determine the available slots to fill in the queue + num_tasks = get_available_slots( + app, queue_name, task_type.get("max_queue_length") + ) + if not num_tasks: + # queue filled in, next visit type + continue + + logging.info( + "%s: %s slots available in queue %s", visit_type, num_tasks, queue_name, + ) + + policy_ratio_key = MAPPING_TYPE_TO_RATIO_KEY.get(visit_type, "package") + policy_ratio = POLICY_RATIO[policy_ratio_key] + + all_origins: List[ListedOrigin] = [] + policies = list(policy_ratio.keys()) + random.shuffle(policies) + for policy in policies: + ratio = policy_ratio[policy] + num_tasks_ratio: int = num_tasks * ratio + logger.info( + "%s: %s tasks to send with policy %s", + visit_type, + num_tasks_ratio, + policy, + ) + origins = scheduler.grab_next_visits( + visit_type, num_tasks_ratio, policy=policy + ) + all_origins.extend(origins) + + # Actually send messages to queue + for origin in all_origins: + task_dict = origin.as_task_dict() + app.send_task( + queue_name, + task_id=uuid(), + args=task_dict["arguments"]["args"], + kwargs=task_dict["arguments"]["kwargs"], + queue=queue_name, + ) + logger.info( + "%s: %s sent visits to queue %s", visit_type, len(all_origins), queue_name, + ) diff --git a/swh/scheduler/cli/admin.py b/swh/scheduler/cli/admin.py --- a/swh/scheduler/cli/admin.py +++ b/swh/scheduler/cli/admin.py @@ -3,10 +3,13 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from __future__ import annotations + # WARNING: do not import unnecessary things here to keep cli startup time under # control import logging import time +from typing import List import click @@ -57,7 +60,7 @@ logger = logging.getLogger(__name__ + ".runner") scheduler = ctx.obj["scheduler"] - logger.debug("Scheduler %s" % scheduler) + logger.debug("Scheduler %s", scheduler) task_types = [] for task_type_name in task_type_names: task_type = scheduler.get_task_type(task_type_name) @@ -107,6 +110,72 @@ listener.stop_consuming() +@cli.command("start-orchestrator") +@click.option( + "--period", + "-p", + default=0, + help=( + "Period (in s) at witch pending tasks are checked and " + "executed. Set to 0 (default) for a one shot." + ), +) +@click.option( + "--visit-type", + "visit_types", + multiple=True, + default=[], + help=( + "Visit types to schedule. If not provided, this iterates over every " + "corresponding load task types referenced in the scheduler backend." + ), +) +@click.pass_context +def orchestrator(ctx, period: int, visit_types: List[str]): + """Starts an orchestrator service. This process is responsible for scheduling + recurring loading tasks for a given set of visit types. + + """ + from swh.scheduler.celery_backend.config import build_app + from swh.scheduler.celery_backend.orchestrator import orchestrate + + config = ctx.obj["config"] + app = build_app(config.get("celery")) + app.set_current() + + logger = logging.getLogger(f"{__name__}.orchestrator") + scheduler = ctx.obj["scheduler"] + + task_types = [] + # Bootstrap the runner tasks we want to deal with + if not visit_types: + all_task_types = scheduler.get_task_types() + for task_type in all_task_types: + if not task_type["type"].startswith("load-"): + # only consider loading tasks as recurring ones, the rest is dismissed + continue + task_types.append(task_type) + else: + for visit_type in visit_types: + task_type_name = f"load-{visit_type}" + task_type = scheduler.get_task_type(task_type_name) + if not task_type: + raise ValueError(f"Unknown {task_type_name}") + task_types.append(task_type) + + try: + while True: + try: + orchestrate(scheduler, app, task_types) + except Exception: + logger.exception("Unexpected error in orchestrate call") + if not period: + break + time.sleep(period) + except KeyboardInterrupt: + ctx.exit(0) + + @cli.command("rpc-serve") @click.option("--host", default="0.0.0.0", help="Host to run the scheduler server api") @click.option("--port", default=5008, type=click.INT, help="Binding port of the server") diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py --- a/swh/scheduler/tests/conftest.py +++ b/swh/scheduler/tests/conftest.py @@ -1,4 +1,4 @@ -# Copyright (C) 2016-2020 The Software Heritage developers +# Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -6,6 +6,7 @@ from datetime import datetime, timezone import os from typing import Dict, List +from unittest.mock import patch import pytest @@ -60,3 +61,12 @@ def listed_origins(listed_origins_by_type) -> List[ListedOrigin]: """Return a (fixed) set of listed origins""" return sum(listed_origins_by_type.values(), []) + + +@pytest.fixture +def storage(swh_storage): + """An instance of in-memory storage that gets injected + into the CLI functions.""" + with patch("swh.storage.get_storage") as get_storage_mock: + get_storage_mock.return_value = swh_storage + yield swh_storage diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py --- a/swh/scheduler/tests/test_cli.py +++ b/swh/scheduler/tests/test_cli.py @@ -595,15 +595,6 @@ return origins -@pytest.fixture -def storage(swh_storage): - """An instance of in-memory storage that gets injected - into the CLI functions.""" - with patch("swh.storage.get_storage") as get_storage_mock: - get_storage_mock.return_value = swh_storage - yield swh_storage - - @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) def test_task_schedule_origins_dry_run(swh_scheduler, storage): """Tests the scheduling when origin_batch_size*task_batch_size is a diff --git a/swh/scheduler/tests/test_orchestrator.py b/swh/scheduler/tests/test_orchestrator.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/tests/test_orchestrator.py @@ -0,0 +1,114 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from datetime import timedelta +from unittest.mock import MagicMock + +import pytest + +from swh.model.model import Origin +from swh.scheduler.celery_backend.orchestrator import orchestrate + +from .test_cli import invoke + +TEST_MAX_QUEUE = 10000 + + +def _compute_backend_name(visit_type: str) -> str: + "Build a dummy reproducible backend name" + return f"swh.loader.{visit_type}.tasks" + + +@pytest.fixture +def swh_scheduler(swh_scheduler): + """Override default fixture of the scheduler to install some more task types. + + """ + for visit_type in ["git", "hg", "svn"]: + task_type = f"load-{visit_type}" + swh_scheduler.create_task_type( + { + "type": task_type, + "max_queue_length": TEST_MAX_QUEUE, + "description": "The {} testing task".format(task_type), + "backend_name": _compute_backend_name(visit_type), + "default_interval": timedelta(days=1), + "min_interval": timedelta(hours=6), + "max_interval": timedelta(days=12), + } + ) + return swh_scheduler + + +def test_cli_orchestrator_unknown_visit_type(swh_scheduler): + """When passing unknown visit type, orchestrator should refuse to start.""" + + with pytest.raises(ValueError, match="Unknown"): + invoke( + swh_scheduler, + False, + ["start-orchestrator", "--visit-type", "unknown", "--visit-type", "git",], + ) + + +def test_cli_orchestrator_noop(swh_scheduler): + """Trigger orchestrator without any parameter nor anything to do should noop.""" + + # The orchestrator will just iterate over existing tasks from the scheduler and do + # noop. We are just checking it does not explode here. + result = invoke(swh_scheduler, False, ["start-orchestrator",],) + + assert result.exit_code == 0, result.output + + +def test_orchestrator_scheduling( + swh_scheduler, caplog, listed_origins_by_type, mocker, +): + """Orchestrator schedules known tasks.""" + nb_origins = 1000 + + mock_celery_app = MagicMock() + + mock = mocker.patch("swh.scheduler.celery_backend.orchestrator.get_available_slots") + mock.return_value = nb_origins # Slots available in queue + + # Make sure the scheduler is properly configured in terms of visit/task types + all_task_types = { + task_type_d["type"]: task_type_d + for task_type_d in swh_scheduler.get_task_types() + } + + visit_types = list(listed_origins_by_type.keys()) + assert len(visit_types) > 0 + + task_types = [] + origins = [] + for visit_type, _origins in listed_origins_by_type.items(): + origins.extend(swh_scheduler.record_listed_origins(_origins)) + task_type_name = f"load-{visit_type}" + assert task_type_name in all_task_types.keys() + task_type = all_task_types[task_type_name] + task_type["visit_type"] = visit_type + # we'll limit the orchestrator to the origins' type we know + task_types.append(task_type) + + orchestrate(swh_scheduler, mock_celery_app, task_types) + + records = [ + record.message for record in caplog.records if record.levelname == "INFO" + ] + + # Mapping over the dict ratio/policies entries can change overall order so let's + # check the set of records + expected_records = set() + for task_type in task_types: + visit_type = task_type["visit_type"] + queue_name = task_type["backend_name"] + msg = f"{visit_type}: {nb_origins} slots available in queue {queue_name}" + expected_records.add(msg) + + assert len(records) > 0 + for expected_record in expected_records: + assert expected_record in set(records)