Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/config.py
Show First 20 Lines • Show All 224 Lines • ▼ Show 20 Lines | if stats: | ||||
return stats.get("messages") | return stats.get("messages") | ||||
MAX_NUM_TASKS = 10000 | MAX_NUM_TASKS = 10000 | ||||
def get_available_slots(app, queue_name: str, max_length: Optional[int]): | def get_available_slots(app, queue_name: str, max_length: Optional[int]): | ||||
"""Get the number of tasks that can be sent to `queue_name`, when | """Get the number of tasks that can be sent to `queue_name`, when | ||||
the queue is limited to `max_length`.""" | the queue is limited to `max_length`. | ||||
Returns: | |||||
The number of available slots in the queue. That result should be positive. | |||||
""" | |||||
if not max_length: | if not max_length: | ||||
return MAX_NUM_TASKS | return MAX_NUM_TASKS | ||||
try: | try: | ||||
queue_length = get_queue_length(app, queue_name) | queue_length = get_queue_length(app, queue_name) | ||||
# Clamp the return value to MAX_NUM_TASKS | # Clamp the return value to MAX_NUM_TASKS | ||||
max_val = min(max_length - queue_length, MAX_NUM_TASKS) | max_val = max(0, min(max_length - queue_length, MAX_NUM_TASKS)) | ||||
except (ValueError, TypeError): | except (ValueError, TypeError): | ||||
# Unknown queue length, just schedule all the tasks | # Unknown queue length, just schedule all the tasks | ||||
max_val = MAX_NUM_TASKS | max_val = MAX_NUM_TASKS | ||||
return max_val | return max_val | ||||
def register_task_class(app, name, cls): | def register_task_class(app, name, cls): | ||||
▲ Show 20 Lines • Show All 114 Lines • Show Last 20 Lines |