diff --git a/swh/scheduler/celery_backend/orchestrator.py b/swh/scheduler/celery_backend/orchestrator.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/celery_backend/orchestrator.py @@ -0,0 +1,99 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""This is the scheduler orchestrator. It's in charge of scheduling recurring origins. + +For "oneshot" tasks, check the :mod:`swh.scheduler.celery_backend.runner` and +:mod:`swh.scheduler.celery_backend.pika_listener` modules. + +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Dict, List + +from kombu.utils.uuid import uuid + +from swh.scheduler.celery_backend.config import get_available_slots + +if TYPE_CHECKING: + from ..interface import SchedulerInterface + from ..model import ListedOrigin + +logger = logging.getLogger(__name__) + + +# either a dvcs or a package +MAPPING_TYPE_TO_RATIO_KEY: Dict[str, str] = { + "git": "dvcs", + "hg": "dvcs", + "svn": "dvcs", + "cvs": "dvcs", + "bzr": "dvcs", +} + +# Default policy ratio, let's start that configuration in the module first +POLICY_RATIO: Dict[str, Dict[str, float]] = { + "package": { + "already_visited_order_by_lag": 0.5, + "never_visited_oldest_update_first": 0.5, + }, + "dvcs": { + "already_visited_order_by_lag": 0.49, + "never_visited_oldest_update_first": 0.49, + "origins_without_last_update": 0.02, + }, +} + + +def orchestrate(scheduler: SchedulerInterface, app, task_types: List[Dict]) -> None: + """Orchestrate recurring origins scheduling for given visit types. + + """ + logger.debug( + "Orchestrate recurring origin visits for %s task types", len(task_types) + ) + for task_type in task_types: + task_name = task_type["backend_name"] + # FIXME: compute the visit type out of the task's type... + visit_type = task_type["type"][5:] # crop "load-" + queue_name = task_name + # Determine the available slots to fill in the queue + num_tasks = get_available_slots(app, queue_name, task_type["max_queue_length"]) + if not num_tasks: + # queue filled in, next visit type + continue + + logging.info( + "%s: %s slots available in queue %s", visit_type, num_tasks, queue_name, + ) + + policy_ratio_key = MAPPING_TYPE_TO_RATIO_KEY.get(visit_type, "package") + policy_ratio = POLICY_RATIO[policy_ratio_key] + + all_origins: List[ListedOrigin] = [] + for policy, ratio in policy_ratio.items(): + num_tasks_ratio: int = num_tasks * ratio + origins = scheduler.grab_next_visits( + visit_type, num_tasks_ratio, policy=policy + ) + all_origins.extend(origins) + + logger.info( + "%s: %s visits to send to queue %s", + visit_type, + len(all_origins), + queue_name, + ) + for origin in all_origins: + task_dict = origin.as_task_dict() + app.send_task( + task_name, + task_id=uuid(), + args=task_dict["arguments"]["args"], + kwargs=task_dict["arguments"]["kwargs"], + queue=queue_name, + ) diff --git a/swh/scheduler/cli/admin.py b/swh/scheduler/cli/admin.py --- a/swh/scheduler/cli/admin.py +++ b/swh/scheduler/cli/admin.py @@ -3,10 +3,13 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from __future__ import annotations + # WARNING: do not import unnecessary things here to keep cli startup time under # control import logging import time +from typing import List import click @@ -57,7 +60,7 @@ logger = logging.getLogger(__name__ + ".runner") scheduler = ctx.obj["scheduler"] - logger.debug("Scheduler %s" % scheduler) + logger.debug("Scheduler %s", scheduler) task_types = [] for task_type_name in task_type_names: task_type = scheduler.get_task_type(task_type_name) @@ -107,6 +110,76 @@ listener.stop_consuming() +@cli.command("start-orchestrator") +@click.option( + "--period", + "-p", + default=0, + help=( + "Period (in s) at witch pending tasks are checked and " + "executed. Set to 0 (default) for a one shot." + ), +) +@click.option( + "--visit-type", + "visit_types", + multiple=True, + default=[], + help=( + "Visit types to schedule. If not provided, this iterates over every " + "corresponding load task types referenced in the scheduler backend." + ), +) +@click.pass_context +def orchestrator(ctx, period: int, visit_types: List[str]): + """Starts an orchestrator service. + + This process is responsible for scheduling recurring loading tasks of visit types. + + """ + from swh.scheduler.celery_backend.config import build_app + from swh.scheduler.celery_backend.orchestrator import orchestrate + + config = ctx.obj["config"] + app = build_app(config.get("celery")) + app.set_current() + + logger = logging.getLogger(f"{__name__}.orchestrator") + scheduler = ctx.obj["scheduler"] + logger.debug("Scheduler %s", scheduler) + + logger.debug("visit_types: %s", visit_types) + task_types = [] + # Bootstrap the runner tasks we want to deal with + if not visit_types: + all_task_types = scheduler.get_task_types() + for task_type in all_task_types: + if not task_type["type"].startswith("load-"): + # only load tasks are recurring, the rest we dismiss + continue + task_types.append(task_type) + else: + for visit_type in visit_types: + task_type_name = f"load-{visit_type}" + task_type = scheduler.get_task_type(task_type_name) + + if not task_type: + raise ValueError(f"Unknown {task_type_name}") + task_types.append(task_type) + + try: + while True: + try: + orchestrate(scheduler, app, task_types) + except Exception: + logger.exception("Unexpected error in orchestrate()") + if not period: + break + time.sleep(period) + except KeyboardInterrupt: + ctx.exit(0) + + @cli.command("rpc-serve") @click.option("--host", default="0.0.0.0", help="Host to run the scheduler server api") @click.option("--port", default=5008, type=click.INT, help="Binding port of the server") diff --git a/swh/scheduler/tests/test_orchestrator.py b/swh/scheduler/tests/test_orchestrator.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/tests/test_orchestrator.py @@ -0,0 +1,58 @@ +# Copyright (C) 2019-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging +import tempfile +from unittest.mock import patch + +from click.testing import CliRunner +import pytest + +from swh.scheduler.cli import cli + +CLI_CONFIG = """ +scheduler: + cls: foo + args: {} +""" + + +def invoke(scheduler, catch_exceptions, args): + runner = CliRunner() + with patch( + "swh.scheduler.get_scheduler" + ) as get_scheduler_mock, tempfile.NamedTemporaryFile( + "a", suffix=".yml" + ) as config_fd: + config_fd.write(CLI_CONFIG) + config_fd.seek(0) + get_scheduler_mock.return_value = scheduler + args = ["-C" + config_fd.name,] + args + result = runner.invoke(cli, args, obj={"log_level": logging.WARNING}) + if not catch_exceptions and result.exception: + print(result.output) + raise result.exception + return result + + +def test_cli_orchestrator_unknown_visit_type(swh_scheduler): + """When passing unknown visit type, orchestrator should refuse to start.""" + + with pytest.raises(ValueError, match="Unknown"): + invoke( + swh_scheduler, + False, + ["start-orchestrator", "--visit-type", "unknown", "--visit-type", "git",], + ) + + +def test_cli_orchestrator_noop(swh_scheduler): + """Trigger orchestrator with no parameter should run.""" + + # The orchestrator will just iterate over the existing tasks from the scheduler and + # do noop. We are just checking it does not explode here. + result = invoke(swh_scheduler, False, ["start-orchestrator",],) + + assert result.exit_code == 0, result.output