diff --git a/swh/scheduler/celery_backend/config.py b/swh/scheduler/celery_backend/config.py --- a/swh/scheduler/celery_backend/config.py +++ b/swh/scheduler/celery_backend/config.py @@ -190,13 +190,24 @@ INSTANCE_NAME = os.environ.get(CONFIG_NAME_ENVVAR) -if INSTANCE_NAME: - CONFIG_NAME = CONFIG_NAME_TEMPLATE % INSTANCE_NAME -else: - CONFIG_NAME = DEFAULT_CONFIG_NAME - -# Load the Celery config -CONFIG = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) +CONFIG_NAME = os.environ.get('SWH_CONFIG_FILENAME') +CONFIG = {} +if CONFIG_NAME: + # load the celery config from the main config file given as + # SWH_CONFIG_FILENAME environment variable. + # This is expected to have a [celery] section in which we have the + # celery specific configuration. + CONFIG = load_named_config(CONFIG_NAME).get('celery') + +if not CONFIG: + # otherwise, back to compat config loading mechanism + if INSTANCE_NAME: + CONFIG_NAME = CONFIG_NAME_TEMPLATE % INSTANCE_NAME + else: + CONFIG_NAME = DEFAULT_CONFIG_NAME + + # Load the Celery config + CONFIG = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) # Celery Queues CELERY_QUEUES = [Queue('celery', Exchange('celery'), routing_key='celery')] @@ -206,7 +217,7 @@ enable_utc=True, timezone='UTC', # Imported modules - imports=CONFIG['task_modules'], + imports=CONFIG.get('task_modules', []), # Time (in seconds, or a timedelta object) for when after stored task # tombstones will be deleted. None means to never expire results. result_expires=None, @@ -235,14 +246,6 @@ # (Disabling rate limits altogether is recommended if you don’t have any # tasks using them.) worker_disable_rate_limits=True, - # Task hard time limit in seconds. The worker processing the task will be - # killed and replaced with a new one when this is exceeded. - # task_time_limit=3600, - # Task soft time limit in seconds. - # The SoftTimeLimitExceeded exception will be raised when this is exceeded. - # The task can catch this to e.g. clean up before the hard time limit - # comes. - task_soft_time_limit=CONFIG['task_soft_time_limit'], # Task routing task_routes=route_for_task, # Task queues this worker will consume from