Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/runner.py
Show All 33 Lines | Returns: | ||||
backend_tasks = run_ready_tasks(self.scheduler, app) | backend_tasks = run_ready_tasks(self.scheduler, app) | ||||
for task in backend_tasks: | for task in backend_tasks: | ||||
AsyncResult(id=task['backend_id']).get() | AsyncResult(id=task['backend_id']).get() | ||||
""" | """ | ||||
all_backend_tasks = [] | all_backend_tasks = [] | ||||
while True: | while True: | ||||
cursor = backend.cursor() | |||||
task_types = {} | task_types = {} | ||||
pending_tasks = [] | pending_tasks = [] | ||||
for task_type in backend.get_task_types(cursor=cursor): | for task_type in backend.get_task_types(): | ||||
task_type_name = task_type['type'] | task_type_name = task_type['type'] | ||||
task_types[task_type_name] = task_type | task_types[task_type_name] = task_type | ||||
max_queue_length = task_type['max_queue_length'] | max_queue_length = task_type['max_queue_length'] | ||||
backend_name = task_type['backend_name'] | backend_name = task_type['backend_name'] | ||||
if max_queue_length: | if max_queue_length: | ||||
try: | try: | ||||
queue_length = app.get_queue_length(backend_name) | queue_length = app.get_queue_length(backend_name) | ||||
except ValueError: | except ValueError: | ||||
Show All 9 Lines | while True: | ||||
num_tasks = MAX_NUM_TASKS | num_tasks = MAX_NUM_TASKS | ||||
if num_tasks > 0: | if num_tasks > 0: | ||||
num_tasks, num_tasks_priority = compute_nb_tasks_from( | num_tasks, num_tasks_priority = compute_nb_tasks_from( | ||||
num_tasks) | num_tasks) | ||||
grabbed_tasks = backend.grab_ready_tasks( | grabbed_tasks = backend.grab_ready_tasks( | ||||
task_type_name, | task_type_name, | ||||
num_tasks=num_tasks, | num_tasks=num_tasks, | ||||
num_tasks_priority=num_tasks_priority, | num_tasks_priority=num_tasks_priority) | ||||
cursor=cursor) | |||||
if grabbed_tasks: | if grabbed_tasks: | ||||
pending_tasks.extend(grabbed_tasks) | pending_tasks.extend(grabbed_tasks) | ||||
logger.info('Grabbed %s tasks %s', | logger.info('Grabbed %s tasks %s', | ||||
len(grabbed_tasks), task_type_name) | len(grabbed_tasks), task_type_name) | ||||
if not pending_tasks: | if not pending_tasks: | ||||
return all_backend_tasks | return all_backend_tasks | ||||
Show All 10 Lines | while True: | ||||
'task': task['id'], | 'task': task['id'], | ||||
'backend_id': celery_result.id, | 'backend_id': celery_result.id, | ||||
'scheduled': arrow.utcnow(), | 'scheduled': arrow.utcnow(), | ||||
} | } | ||||
backend_tasks.append(data) | backend_tasks.append(data) | ||||
logger.debug('Sent %s celery tasks', len(backend_tasks)) | logger.debug('Sent %s celery tasks', len(backend_tasks)) | ||||
backend.mass_schedule_task_runs(backend_tasks, cursor=cursor) | backend.mass_schedule_task_runs(backend_tasks) | ||||
backend.commit() | |||||
all_backend_tasks.extend(backend_tasks) | all_backend_tasks.extend(backend_tasks) | ||||
def main(): | def main(): | ||||
from .config import app as main_app | from .config import app as main_app | ||||
for module in main_app.conf.CELERY_IMPORTS: | for module in main_app.conf.CELERY_IMPORTS: | ||||
__import__(module) | __import__(module) | ||||
Show All 10 Lines |