diff --git a/swh/scheduler/celery_backend/orchestrator.py b/swh/scheduler/celery_backend/orchestrator.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/celery_backend/orchestrator.py @@ -0,0 +1,97 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""This is the scheduler orchestrator. It's in charge of scheduling recurring origins. + +For "oneshot" tasks, check the :mod:`swh.scheduler.celery_backend.runner` and +:mod:`swh.scheduler.celery_backend.pika_listener` modules. + +""" + +import logging +from typing import TYPE_CHECKING, Dict, List + +from kombu.utils.uuid import uuid + +from swh.scheduler.celery_backend.config import get_available_slots + +if TYPE_CHECKING: + from ..interface import SchedulerInterface + from ..model import ListedOrigin + +logger = logging.getLogger(__name__) + + +# either a dvcs or a package +MAPPING_TYPE_TO_RATIO_KEY: Dict[str, str] = { + "git": "dvcs", + "hg": "dvcs", + "svn": "dvcs", + "cvs": "dvcs", + "bzr": "dvcs", +} + +# Default policy ratio, let's start that configuration in the module first +POLICY_RATIO: Dict[str, Dict[str, float]] = { + "package": { + "already_visited_order_by_lag": 0.5, + "never_visited_oldest_update_first": 0.5, + }, + "dvcs": { + "already_visited_order_by_lag": 0.49, + "never_visited_oldest_update_first": 0.49, + "origins_without_last_update": 0.02, + }, +} + + +def orchestrate(scheduler: SchedulerInterface, app, task_types: List[Dict]) -> None: + """Orchestrate recurring origins scheduling for given visit types. + + """ + logger.debug( + "Orchestrate recurring origin visits for %s task types", len(task_types) + ) + for task_type in task_types: + task_name = task_type["backend_name"] + # FIXME: compute the visit type out of the task's type... + visit_type = task_type["type"][5:] # crop "load-" + queue_name = task_name + # Determine the available slots to fill in the queue + num_tasks = get_available_slots(app, queue_name, task_type["max_queue_length"]) + if not num_tasks: + # queue filled in, next visit type + continue + + logging.info( + "%s: %s slots available in queue %s", visit_type, num_tasks, queue_name, + ) + + policy_ratio_key = MAPPING_TYPE_TO_RATIO_KEY.get(visit_type, "package") + policy_ratio = POLICY_RATIO[policy_ratio_key] + + all_origins: List[ListedOrigin] = [] + for policy, ratio in policy_ratio.items(): + num_tasks_ratio: int = num_tasks * ratio + origins = scheduler.grab_next_visits( + visit_type, num_tasks_ratio, policy=policy + ) + all_origins.extend(origins) + + logger.info( + "%s: %s visits to send to queue %s", + visit_type, + len(all_origins), + queue_name, + ) + for origin in all_origins: + task_dict = origin.as_task_dict() + app.send_task( + task_name, + task_id=uuid(), + args=task_dict["arguments"]["args"], + kwargs=task_dict["arguments"]["kwargs"], + queue=queue_name, + ) diff --git a/swh/scheduler/cli/admin.py b/swh/scheduler/cli/admin.py --- a/swh/scheduler/cli/admin.py +++ b/swh/scheduler/cli/admin.py @@ -3,16 +3,36 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from functools import lru_cache + # WARNING: do not import unnecessary things here to keep cli startup time under # control import logging import time +from typing import Dict, List import click +from swh.scheduler.interface import SchedulerInterface + from . import cli +@lru_cache +def get_task_type(scheduler: SchedulerInterface, task_type: str) -> Dict: + """Retrieve the task type in the scheduler. The result is cached. + + Args: + scheduler: the scheduler backend + visit_type: The actual visit type to resolve to a task type + + Returns: + The resolved task type + + """ + return scheduler.get_task_type(task_type) + + @cli.command("start-runner") @click.option( "--period", @@ -60,7 +80,7 @@ logger.debug("Scheduler %s" % scheduler) task_types = [] for task_type_name in task_type_names: - task_type = scheduler.get_task_type(task_type_name) + task_type = get_task_type(scheduler, task_type_name) if not task_type: raise ValueError(f"Unknown {task_type_name}") task_types.append(task_type) @@ -107,6 +127,74 @@ listener.stop_consuming() +@cli.command("start-orchestrator") +@click.option( + "--period", + "-p", + default=0, + help=( + "Period (in s) at witch pending tasks are checked and " + "executed. Set to 0 (default) for a one shot." + ), +) +@click.option( + "--visit-type", + "visit-types", + multiple=True, + default=[], + help=( + "Visit types to schedule. If not provided, this iterates over every " + "corresponding load task types referenced in the scheduler backend." + ), +) +@click.pass_context +def orchestrator(ctx, period: int, visit_types: List[str]): + """Starts an orchestrator service. + + This process is responsible for scheduling recurring loading tasks of visit types. + + """ + from swh.scheduler.celery_backend.config import build_app + from swh.scheduler.celery_backend.orchestrator import orchestrate + + config = ctx.obj["config"] + app = build_app(config.get("celery")) + app.set_current() + + logger = logging.getLogger("f{__name__}.orchestrator") + scheduler = ctx.obj["scheduler"] + logger.debug("Scheduler %s", scheduler) + + task_types = [] + # Bootstrap the runner tasks we want to deal with + if not visit_types: + all_task_types = scheduler.get_task_types() + for task_type in all_task_types: + if not task_type["type"].startswith("load-"): + # only load tasks are recurring, the rest we dismiss + continue + task_types.append(task_type) + else: + for visit_type in visit_types: + task_type_name = f"load-{visit_type}" + task_type = get_task_type(scheduler, task_type_name) + if not task_type: + raise ValueError(f"Unknown {task_type_name}") + task_types.append(task_type) + + try: + while True: + try: + orchestrate(scheduler, app, task_types) + except Exception: + logger.exception("Unexpected error in orchestrate()") + if not period: + break + time.sleep(period) + except KeyboardInterrupt: + ctx.exit(0) + + @cli.command("rpc-serve") @click.option("--host", default="0.0.0.0", help="Host to run the scheduler server api") @click.option("--port", default=5008, type=click.INT, help="Binding port of the server")