Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/orchestrator.py
- This file was added.
# Copyright (C) 2021 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
"""This is the scheduler orchestrator. It's in charge of scheduling recurring origins. | |||||
For "oneshot" tasks, check the :mod:`swh.scheduler.celery_backend.runner` and | |||||
:mod:`swh.scheduler.celery_backend.pika_listener` modules. | |||||
""" | |||||
from __future__ import annotations | |||||
import logging | |||||
import random | |||||
from typing import TYPE_CHECKING, Dict, List | |||||
from kombu.utils.uuid import uuid | |||||
from swh.scheduler.celery_backend.config import get_available_slots | |||||
if TYPE_CHECKING: | |||||
from ..interface import SchedulerInterface | |||||
from ..model import ListedOrigin | |||||
logger = logging.getLogger(__name__) | |||||
# either a dvcs or a package | |||||
MAPPING_TYPE_TO_RATIO_KEY: Dict[str, str] = { | |||||
"git": "dvcs", | |||||
"hg": "dvcs", | |||||
"svn": "dvcs", | |||||
"cvs": "dvcs", | |||||
"bzr": "dvcs", | |||||
} | |||||
# Default policy ratio, let's start that configuration in the module first | |||||
POLICY_RATIO: Dict[str, Dict[str, float]] = { | |||||
"package": { | |||||
"already_visited_order_by_lag": 0.5, | |||||
"never_visited_oldest_update_first": 0.5, | |||||
}, | |||||
"dvcs": { | |||||
"already_visited_order_by_lag": 0.49, | |||||
"never_visited_oldest_update_first": 0.49, | |||||
"origins_without_last_update": 0.02, | |||||
}, | |||||
} | |||||
def orchestrate(scheduler: SchedulerInterface, app, task_types: List[Dict]) -> None: | |||||
"""Orchestrate recurring origins scheduling for given visit types. | |||||
""" | |||||
logger.debug( | |||||
"Orchestrate recurring origin visits for %s task types", | |||||
", ".join(t["type"] for t in task_types), | |||||
) | |||||
for task_type in task_types: | |||||
queue_name = task_type["backend_name"] | |||||
# FIXME: compute the visit type out of the task's type... | |||||
visit_type = task_type["type"][5:] # crop "load-" | |||||
# Determine the available slots to fill in the queue | |||||
num_tasks = get_available_slots( | |||||
app, queue_name, task_type.get("max_queue_length") | |||||
) | |||||
if not num_tasks: | |||||
# queue filled in, next visit type | |||||
continue | |||||
logging.info( | |||||
"%s: %s slots available in queue %s", visit_type, num_tasks, queue_name, | |||||
) | |||||
policy_ratio_key = MAPPING_TYPE_TO_RATIO_KEY.get(visit_type, "package") | |||||
policy_ratio = POLICY_RATIO[policy_ratio_key] | |||||
all_origins: List[ListedOrigin] = [] | |||||
policies = list(policy_ratio.keys()) | |||||
random.shuffle(policies) | |||||
for policy in policies: | |||||
ratio = policy_ratio[policy] | |||||
num_tasks_ratio: int = num_tasks * ratio | |||||
logger.info( | |||||
"%s: %s tasks to send with policy %s", | |||||
visit_type, | |||||
num_tasks_ratio, | |||||
policy, | |||||
) | |||||
origins = scheduler.grab_next_visits( | |||||
visit_type, num_tasks_ratio, policy=policy | |||||
) | |||||
all_origins.extend(origins) | |||||
# Actually send messages to queue | |||||
for origin in all_origins: | |||||
task_dict = origin.as_task_dict() | |||||
app.send_task( | |||||
queue_name, | |||||
task_id=uuid(), | |||||
args=task_dict["arguments"]["args"], | |||||
kwargs=task_dict["arguments"]["kwargs"], | |||||
queue=queue_name, | |||||
) | |||||
logger.info( | |||||
"%s: %s sent visits to queue %s", visit_type, len(all_origins), queue_name, | |||||
) |