diff --git a/swh/scheduler/celery_backend/config.py b/swh/scheduler/celery_backend/config.py --- a/swh/scheduler/celery_backend/config.py +++ b/swh/scheduler/celery_backend/config.py @@ -84,11 +84,9 @@ class TaskRouter: """Route tasks according to the task_queue attribute in the task class""" - def route_for_task(self, task, args=None, kwargs=None): - task_class = app.tasks[task] - if hasattr(task_class, 'task_queue'): - return {'queue': task_class.task_queue} - return None + def route_for_task(self, task, *args, **kwargs): + if task.startswith('swh.'): + return {'queue': task} class CustomCelery(Celery): diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py --- a/swh/scheduler/celery_backend/runner.py +++ b/swh/scheduler/celery_backend/runner.py @@ -4,7 +4,6 @@ # See top-level LICENSE file for more information import arrow -from celery import group from swh.scheduler import get_scheduler, compute_nb_tasks_from from .config import app as main_app @@ -47,10 +46,9 @@ task_types[task_type_name] = task_type max_queue_length = task_type['max_queue_length'] backend_name = task_type['backend_name'] - if max_queue_length and backend_name in app.tasks: - queue_name = app.tasks[backend_name].task_queue + if max_queue_length: try: - queue_length = app.get_queue_length(queue_name) + queue_length = app.get_queue_length(backend_name) except ValueError: queue_length = None @@ -76,24 +74,23 @@ if not pending_tasks: return all_backend_tasks - celery_tasks = [] + backend_tasks = [] for task in pending_tasks: args = task['arguments']['args'] kwargs = task['arguments']['kwargs'] - celery_task = app.tasks[ - task_types[task['type']]['backend_name'] - ].s(*args, **kwargs) + backend_name = task_types[task['type']]['backend_name'] + celery_result = app.send_task( + backend_name, args=args, kwargs=kwargs, + ) - celery_tasks.append(celery_task) + data = { + 'task': task['id'], + 'backend_id': celery_result.id, + 'scheduled': arrow.utcnow(), + } - group_result = group(celery_tasks).delay() - - backend_tasks = [{ - 'task': task['id'], - 'backend_id': group_result.results[i].id, - 'scheduled': arrow.utcnow(), - } for i, task in enumerate(pending_tasks)] + backend_tasks.append(data) backend.mass_schedule_task_runs(backend_tasks, cursor=cursor) diff --git a/swh/scheduler/task.py b/swh/scheduler/task.py --- a/swh/scheduler/task.py +++ b/swh/scheduler/task.py @@ -147,7 +147,6 @@ """ abstract = True - task_queue = 'celery' def run(self, *args, **kwargs): """This method is called by the celery worker when a task is received. diff --git a/swh/scheduler/tests/scheduler_testing.py b/swh/scheduler/tests/scheduler_testing.py --- a/swh/scheduler/tests/scheduler_testing.py +++ b/swh/scheduler/tests/scheduler_testing.py @@ -44,6 +44,11 @@ def run_ready_tasks(self): """Runs the scheduler and a Celery worker, then blocks until all tasks are completed.""" + + # Make sure the worker is listening to all task-specific queues + for task in self.scheduler.get_task_types(): + app.amqp.queues.select_add(task['backend_name']) + with start_worker(app): backend_tasks = run_ready_tasks(self.scheduler, app) for task in backend_tasks: