Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/runner.py
# Copyright (C) 2015-2018 The Software Heritage developers | # Copyright (C) 2015-2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import arrow | import arrow | ||||
from celery import group | |||||
from swh.scheduler import get_scheduler, compute_nb_tasks_from | from swh.scheduler import get_scheduler, compute_nb_tasks_from | ||||
from .config import app as main_app | from .config import app as main_app | ||||
# Max batch size for tasks | # Max batch size for tasks | ||||
MAX_NUM_TASKS = 10000 | MAX_NUM_TASKS = 10000 | ||||
Show All 26 Lines | while True: | ||||
cursor = backend.cursor() | cursor = backend.cursor() | ||||
task_types = {} | task_types = {} | ||||
pending_tasks = [] | pending_tasks = [] | ||||
for task_type in backend.get_task_types(cursor=cursor): | for task_type in backend.get_task_types(cursor=cursor): | ||||
task_type_name = task_type['type'] | task_type_name = task_type['type'] | ||||
task_types[task_type_name] = task_type | task_types[task_type_name] = task_type | ||||
max_queue_length = task_type['max_queue_length'] | max_queue_length = task_type['max_queue_length'] | ||||
backend_name = task_type['backend_name'] | backend_name = task_type['backend_name'] | ||||
if max_queue_length and backend_name in app.tasks: | if max_queue_length: | ||||
queue_name = app.tasks[backend_name].task_queue | |||||
try: | try: | ||||
queue_length = app.get_queue_length(queue_name) | queue_length = app.get_queue_length(backend_name) | ||||
except ValueError: | except ValueError: | ||||
queue_length = None | queue_length = None | ||||
if queue_length is None: | if queue_length is None: | ||||
# Running without RabbitMQ (probably a test env). | # Running without RabbitMQ (probably a test env). | ||||
num_tasks = MAX_NUM_TASKS | num_tasks = MAX_NUM_TASKS | ||||
else: | else: | ||||
num_tasks = min(max_queue_length - queue_length, | num_tasks = min(max_queue_length - queue_length, | ||||
Show All 9 Lines | while True: | ||||
task_type_name, | task_type_name, | ||||
num_tasks=num_tasks, | num_tasks=num_tasks, | ||||
num_tasks_priority=num_tasks_priority, | num_tasks_priority=num_tasks_priority, | ||||
cursor=cursor)) | cursor=cursor)) | ||||
if not pending_tasks: | if not pending_tasks: | ||||
return all_backend_tasks | return all_backend_tasks | ||||
celery_tasks = [] | backend_tasks = [] | ||||
for task in pending_tasks: | for task in pending_tasks: | ||||
args = task['arguments']['args'] | args = task['arguments']['args'] | ||||
kwargs = task['arguments']['kwargs'] | kwargs = task['arguments']['kwargs'] | ||||
celery_task = app.tasks[ | backend_name = task_types[task['type']]['backend_name'] | ||||
task_types[task['type']]['backend_name'] | celery_result = app.send_task( | ||||
].s(*args, **kwargs) | backend_name, args=args, kwargs=kwargs, | ||||
) | |||||
celery_tasks.append(celery_task) | data = { | ||||
group_result = group(celery_tasks).delay() | |||||
backend_tasks = [{ | |||||
'task': task['id'], | 'task': task['id'], | ||||
'backend_id': group_result.results[i].id, | 'backend_id': celery_result.id, | ||||
'scheduled': arrow.utcnow(), | 'scheduled': arrow.utcnow(), | ||||
} for i, task in enumerate(pending_tasks)] | } | ||||
backend_tasks.append(data) | |||||
backend.mass_schedule_task_runs(backend_tasks, cursor=cursor) | backend.mass_schedule_task_runs(backend_tasks, cursor=cursor) | ||||
backend.commit() | backend.commit() | ||||
all_backend_tasks.extend(backend_tasks) | all_backend_tasks.extend(backend_tasks) | ||||
Show All 10 Lines |