Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/config.py
# Copyright (C) 2015-2019 The Software Heritage developers | # Copyright (C) 2015-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import functools | import functools | ||||
import logging | import logging | ||||
import os | import os | ||||
from time import monotonic as _monotonic | from time import monotonic as _monotonic | ||||
import traceback | import traceback | ||||
from typing import Any, Dict | from typing import Any, Dict, Optional | ||||
import urllib.parse | import urllib.parse | ||||
from celery import Celery | from celery import Celery | ||||
from celery.signals import celeryd_after_setup, setup_logging, worker_init | from celery.signals import celeryd_after_setup, setup_logging, worker_init | ||||
from celery.utils.log import ColorFormatter | from celery.utils.log import ColorFormatter | ||||
from celery.worker.control import Panel | from celery.worker.control import Panel | ||||
from kombu import Exchange, Queue | from kombu import Exchange, Queue | ||||
import pkg_resources | import pkg_resources | ||||
▲ Show 20 Lines • Show All 200 Lines • ▼ Show 20 Lines | |||||
def get_queue_length(app, queue_name): | def get_queue_length(app, queue_name): | ||||
"""Shortcut to get a queue's length""" | """Shortcut to get a queue's length""" | ||||
stats = get_queue_stats(app, queue_name) | stats = get_queue_stats(app, queue_name) | ||||
if stats: | if stats: | ||||
return stats.get("messages") | return stats.get("messages") | ||||
MAX_NUM_TASKS = 10000 | |||||
def get_available_slots(app, queue_name: str, max_length: Optional[int]): | |||||
"""Get the number of tasks that can be sent to `queue_name`, when | |||||
the queue is limited to `max_length`.""" | |||||
if not max_length: | |||||
return MAX_NUM_TASKS | |||||
try: | |||||
queue_length = get_queue_length(app, queue_name) | |||||
except ValueError: | |||||
# Unknown queue length, just schedule all the tasks | |||||
return MAX_NUM_TASKS | |||||
# Clamp the return value to MAX_NUM_TASKS | |||||
return min(max_length - queue_length, MAX_NUM_TASKS) | |||||
def register_task_class(app, name, cls): | def register_task_class(app, name, cls): | ||||
"""Register a class-based task under the given name""" | """Register a class-based task under the given name""" | ||||
if name in app.tasks: | if name in app.tasks: | ||||
return | return | ||||
task_instance = cls() | task_instance = cls() | ||||
task_instance.name = name | task_instance.name = name | ||||
app.register_task(task_instance) | app.register_task(task_instance) | ||||
▲ Show 20 Lines • Show All 107 Lines • Show Last 20 Lines |