diff --git a/swh/scheduler/celery_backend/config.py b/swh/scheduler/celery_backend/config.py --- a/swh/scheduler/celery_backend/config.py +++ b/swh/scheduler/celery_backend/config.py @@ -21,7 +21,7 @@ from swh.scheduler.task import Task -from swh.core.config import load_named_config +from swh.core.config import load_named_config, merge_configs from swh.core.logger import JournalHandler DEFAULT_CONFIG_NAME = 'worker' @@ -254,11 +254,24 @@ task_send_sent_event=False, ) -# Instantiate the Celery app -app = Celery(broker=CONFIG['task_broker'], - backend=CONFIG['result_backend'], - task_cls='swh.scheduler.task:SWHTask') -app.add_defaults(CELERY_DEFAULT_CONFIG) + +def build_app(config=None): + config = merge_configs( + {k: v for (k, (_, v)) in DEFAULT_CONFIG.items()}, + config or {}) + logging.getLogger(__name__).info( + 'Creating a Celery app with %s', config) + + # Instantiate the Celery app + app = Celery(broker=config['task_broker'], + backend=config['result_backend'], + task_cls='swh.scheduler.task:SWHTask') + app.add_defaults(CELERY_DEFAULT_CONFIG) + app.add_defaults(config) + return app + + +app = build_app(CONFIG) # XXX for BW compat Celery.get_queue_length = get_queue_length diff --git a/swh/scheduler/celery_backend/listener.py b/swh/scheduler/celery_backend/listener.py --- a/swh/scheduler/celery_backend/listener.py +++ b/swh/scheduler/celery_backend/listener.py @@ -13,9 +13,9 @@ from arrow import utcnow from kombu import Queue -from celery.events import EventReceiver -from .config import app as main_app +import celery +from celery.events import EventReceiver class ReliableEventReceiver(EventReceiver): @@ -166,8 +166,8 @@ }) recv = ReliableEventReceiver( - main_app.connection(), - app=main_app, + celery.current_app.connection(), + app=celery.current_app, handlers={ 'task-started': task_started, 'task-result': task_succeeded, diff --git a/swh/scheduler/cli.py b/swh/scheduler/cli.py --- a/swh/scheduler/cli.py +++ b/swh/scheduler/cli.py @@ -497,7 +497,10 @@ This process is responsible for checking for ready-to-run tasks and schedule them.""" from swh.scheduler.celery_backend.runner import run_ready_tasks - from swh.scheduler.celery_backend.config import app + from swh.scheduler.celery_backend.config import build_app + + app = build_app(ctx.obj['config'].get('celery')) + app.set_current() logger = logging.getLogger(__name__ + '.runner') scheduler = ctx.obj['scheduler'] @@ -529,9 +532,12 @@ if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') - from swh.scheduler.celery_backend.listener import ( - event_monitor, main_app) - event_monitor(main_app, backend=scheduler) + from swh.scheduler.celery_backend.config import build_app + app = build_app(ctx.obj['config'].get('celery')) + app.set_current() + + from swh.scheduler.celery_backend.listener import event_monitor + event_monitor(app, backend=scheduler) @cli.command('api-server')