Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/runner.py
Show First 20 Lines • Show All 72 Lines • ▼ Show 20 Lines | while True: | ||||
num_tasks = min(max_queue_length - queue_length, MAX_NUM_TASKS) | num_tasks = min(max_queue_length - queue_length, MAX_NUM_TASKS) | ||||
else: | else: | ||||
num_tasks = MAX_NUM_TASKS | num_tasks = MAX_NUM_TASKS | ||||
# only pull tasks if the buffer is at least 1/5th empty (= 80% | # only pull tasks if the buffer is at least 1/5th empty (= 80% | ||||
# full), to help postgresql use properly indexed queries. | # full), to help postgresql use properly indexed queries. | ||||
if num_tasks > min(MAX_NUM_TASKS, max_queue_length) // 5: | if num_tasks > min(MAX_NUM_TASKS, max_queue_length) // 5: | ||||
# Only grab num_tasks tasks with no priority | # Only grab num_tasks tasks with no priority | ||||
grabbed_tasks = backend.grab_ready_tasks( | grabbed_tasks = backend.grab_ready_tasks( | ||||
task_type_name, num_tasks=num_tasks, num_tasks_priority=0 | task_type_name, num_tasks=num_tasks | ||||
) | ) | ||||
if grabbed_tasks: | if grabbed_tasks: | ||||
pending_tasks.extend(grabbed_tasks) | pending_tasks.extend(grabbed_tasks) | ||||
logger.info( | logger.info( | ||||
"Grabbed %s tasks %s", len(grabbed_tasks), task_type_name | "Grabbed %s tasks %s", len(grabbed_tasks), task_type_name | ||||
) | ) | ||||
statsd.increment( | statsd.increment( | ||||
"swh_scheduler_runner_scheduled_task_total", | "swh_scheduler_runner_scheduled_task_total", | ||||
▲ Show 20 Lines • Show All 76 Lines • Show Last 20 Lines |