Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/runner.py
Show First 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | def run_ready_tasks(backend, app): | ||||
all_backend_tasks = [] | all_backend_tasks = [] | ||||
while True: | while True: | ||||
task_types = {} | task_types = {} | ||||
pending_tasks = [] | pending_tasks = [] | ||||
for task_type in backend.get_task_types(): | for task_type in backend.get_task_types(): | ||||
task_type_name = task_type["type"] | task_type_name = task_type["type"] | ||||
task_types[task_type_name] = task_type | task_types[task_type_name] = task_type | ||||
max_queue_length = task_type["max_queue_length"] | max_queue_length = task_type["max_queue_length"] | ||||
if max_queue_length is None: | |||||
max_queue_length = 0 | |||||
backend_name = task_type["backend_name"] | backend_name = task_type["backend_name"] | ||||
if max_queue_length: | if max_queue_length: | ||||
try: | try: | ||||
queue_length = app.get_queue_length(backend_name) | queue_length = app.get_queue_length(backend_name) | ||||
except ValueError: | except ValueError: | ||||
queue_length = None | queue_length = None | ||||
if queue_length is None: | if queue_length is None: | ||||
# Running without RabbitMQ (probably a test env). | # Running without RabbitMQ (probably a test env). | ||||
num_tasks = MAX_NUM_TASKS | num_tasks = MAX_NUM_TASKS | ||||
else: | else: | ||||
num_tasks = min(max_queue_length - queue_length, MAX_NUM_TASKS) | num_tasks = min(max_queue_length - queue_length, MAX_NUM_TASKS) | ||||
else: | else: | ||||
num_tasks = MAX_NUM_TASKS | num_tasks = MAX_NUM_TASKS | ||||
if num_tasks > 0: | # only pull tasks if the buffer is at least 1/5th empty (= 80% | ||||
# full), to help postgresql use properly indexed queries. | |||||
if num_tasks > min(MAX_NUM_TASKS, max_queue_length) // 5: | |||||
num_tasks, num_tasks_priority = compute_nb_tasks_from(num_tasks) | num_tasks, num_tasks_priority = compute_nb_tasks_from(num_tasks) | ||||
grabbed_tasks = backend.grab_ready_tasks( | grabbed_tasks = backend.grab_ready_tasks( | ||||
task_type_name, | task_type_name, | ||||
num_tasks=num_tasks, | num_tasks=num_tasks, | ||||
num_tasks_priority=num_tasks_priority, | num_tasks_priority=num_tasks_priority, | ||||
) | ) | ||||
if grabbed_tasks: | if grabbed_tasks: | ||||
▲ Show 20 Lines • Show All 55 Lines • Show Last 20 Lines |