Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/runner.py
Show First 20 Lines • Show All 44 Lines • ▼ Show 20 Lines | while True: | ||||
for task_type in backend.get_task_types(cursor=cursor): | for task_type in backend.get_task_types(cursor=cursor): | ||||
task_type_name = task_type['type'] | task_type_name = task_type['type'] | ||||
task_types[task_type_name] = task_type | task_types[task_type_name] = task_type | ||||
max_queue_length = task_type['max_queue_length'] | max_queue_length = task_type['max_queue_length'] | ||||
backend_name = task_type['backend_name'] | backend_name = task_type['backend_name'] | ||||
if max_queue_length and backend_name in app.tasks: | if max_queue_length and backend_name in app.tasks: | ||||
queue_name = app.tasks[backend_name].task_queue | queue_name = app.tasks[backend_name].task_queue | ||||
queue_length = app.get_queue_length(queue_name) | queue_length = app.get_queue_length(queue_name) | ||||
if queue_length is None: | |||||
# Running without RabbitMQ (probably a test env). | |||||
num_tasks = MAX_NUM_TASKS | |||||
else: | |||||
num_tasks = min(max_queue_length - queue_length, | num_tasks = min(max_queue_length - queue_length, | ||||
MAX_NUM_TASKS) | MAX_NUM_TASKS) | ||||
else: | else: | ||||
num_tasks = MAX_NUM_TASKS | num_tasks = MAX_NUM_TASKS | ||||
if num_tasks > 0: | if num_tasks > 0: | ||||
num_tasks, num_tasks_priority = compute_nb_tasks_from( | num_tasks, num_tasks_priority = compute_nb_tasks_from( | ||||
num_tasks) | num_tasks) | ||||
pending_tasks.extend( | pending_tasks.extend( | ||||
backend.grab_ready_tasks( | backend.grab_ready_tasks( | ||||
▲ Show 20 Lines • Show All 44 Lines • Show Last 20 Lines |