diff --git a/swh/scheduler/celery_backend/config.py b/swh/scheduler/celery_backend/config.py --- a/swh/scheduler/celery_backend/config.py +++ b/swh/scheduler/celery_backend/config.py @@ -85,7 +85,9 @@ class TaskRouter: """Route tasks according to the task_queue attribute in the task class""" def route_for_task(self, task, args=None, kwargs=None): - task_class = app.tasks[task] + task_class = app.tasks.get(task) + if task_class is None: + return {'queue': task} if hasattr(task_class, 'task_queue'): return {'queue': task_class.task_queue} return None diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py --- a/swh/scheduler/celery_backend/runner.py +++ b/swh/scheduler/celery_backend/runner.py @@ -4,6 +4,7 @@ # See top-level LICENSE file for more information import arrow +import celery from celery import group from swh.scheduler import get_scheduler, compute_nb_tasks_from @@ -48,7 +49,11 @@ max_queue_length = task_type['max_queue_length'] backend_name = task_type['backend_name'] if max_queue_length and backend_name in app.tasks: - queue_name = app.tasks[backend_name].task_queue + task = app.tasks.get(backend_name) + if task: + queue_name = task.task_queue + else: + queue_name = backend_name queue_length = app.get_queue_length(queue_name) if queue_length is None: # Running without RabbitMQ (probably a test env). @@ -77,9 +82,9 @@ args = task['arguments']['args'] kwargs = task['arguments']['kwargs'] - celery_task = app.tasks[ - task_types[task['type']]['backend_name'] - ].s(*args, **kwargs) + celery_task = celery.Signature( + task_types[task['type']]['backend_name'], + args=args, kwargs=kwargs) celery_tasks.append(celery_task)