Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/runner.py
# Copyright (C) 2015-2021 The Software Heritage developers | # Copyright (C) 2015-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import logging | import logging | ||||
from typing import Dict, List, Tuple | from typing import Dict, List, Tuple | ||||
from kombu.utils.uuid import uuid | from kombu.utils.uuid import uuid | ||||
from swh.core.statsd import statsd | from swh.core.statsd import statsd | ||||
from swh.scheduler import get_scheduler | from swh.scheduler import get_scheduler | ||||
from swh.scheduler.celery_backend.config import MAX_NUM_TASKS, get_available_slots | |||||
from swh.scheduler.interface import SchedulerInterface | from swh.scheduler.interface import SchedulerInterface | ||||
from swh.scheduler.utils import utcnow | from swh.scheduler.utils import utcnow | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
# Max batch size for tasks | |||||
MAX_NUM_TASKS = 10000 | |||||
def run_ready_tasks(backend: SchedulerInterface, app) -> List[Dict]: | def run_ready_tasks(backend: SchedulerInterface, app) -> List[Dict]: | ||||
"""Schedule tasks ready to be scheduled. | """Schedule tasks ready to be scheduled. | ||||
This lookups any tasks per task type and mass schedules those accordingly (send | This lookups any tasks per task type and mass schedules those accordingly (send | ||||
messages to rabbitmq and mark as scheduled equivalent tasks in the scheduler | messages to rabbitmq and mark as scheduled equivalent tasks in the scheduler | ||||
backend). | backend). | ||||
Show All 22 Lines | def run_ready_tasks(backend: SchedulerInterface, app) -> List[Dict]: | ||||
""" | """ | ||||
all_backend_tasks: List[Dict] = [] | all_backend_tasks: List[Dict] = [] | ||||
while True: | while True: | ||||
task_types = {} | task_types = {} | ||||
pending_tasks = [] | pending_tasks = [] | ||||
for task_type in backend.get_task_types(): | for task_type in backend.get_task_types(): | ||||
task_type_name = task_type["type"] | task_type_name = task_type["type"] | ||||
task_types[task_type_name] = task_type | task_types[task_type_name] = task_type | ||||
max_queue_length = task_type["max_queue_length"] | |||||
if max_queue_length is None: | max_queue_length = task_type["max_queue_length"] or 0 | ||||
max_queue_length = 0 | |||||
backend_name = task_type["backend_name"] | backend_name = task_type["backend_name"] | ||||
if max_queue_length: | num_tasks = get_available_slots(app, backend_name, max_queue_length) | ||||
try: | |||||
queue_length = app.get_queue_length(backend_name) | |||||
except ValueError: | |||||
queue_length = None | |||||
if queue_length is None: | |||||
# Running without RabbitMQ (probably a test env). | |||||
num_tasks = MAX_NUM_TASKS | |||||
else: | |||||
num_tasks = min(max_queue_length - queue_length, MAX_NUM_TASKS) | |||||
else: | |||||
num_tasks = MAX_NUM_TASKS | |||||
# only pull tasks if the buffer is at least 1/5th empty (= 80% | # only pull tasks if the buffer is at least 1/5th empty (= 80% | ||||
# full), to help postgresql use properly indexed queries. | # full), to help postgresql use properly indexed queries. | ||||
if num_tasks > min(MAX_NUM_TASKS, max_queue_length) // 5: | if num_tasks > min(MAX_NUM_TASKS, max_queue_length) // 5: | ||||
# Only grab num_tasks tasks with no priority | # Only grab num_tasks tasks with no priority | ||||
grabbed_tasks = backend.grab_ready_tasks( | grabbed_tasks = backend.grab_ready_tasks( | ||||
task_type_name, num_tasks=num_tasks | task_type_name, num_tasks=num_tasks | ||||
) | ) | ||||
if grabbed_tasks: | if grabbed_tasks: | ||||
▲ Show 20 Lines • Show All 82 Lines • Show Last 20 Lines |