Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/config.py
Show First 20 Lines • Show All 195 Lines • ▼ Show 20 Lines | else: | ||||
CONFIG_NAME = DEFAULT_CONFIG_NAME | CONFIG_NAME = DEFAULT_CONFIG_NAME | ||||
# Load the Celery config | # Load the Celery config | ||||
CONFIG = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) | CONFIG = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) | ||||
# Celery Queues | # Celery Queues | ||||
CELERY_QUEUES = [Queue('celery', Exchange('celery'), routing_key='celery')] | CELERY_QUEUES = [Queue('celery', Exchange('celery'), routing_key='celery')] | ||||
for queue in CONFIG['task_queues']: | |||||
CELERY_QUEUES.append(Queue(queue, Exchange(queue), routing_key=queue)) | |||||
CELERY_DEFAULT_CONFIG = dict( | CELERY_DEFAULT_CONFIG = dict( | ||||
# Timezone configuration: all in UTC | # Timezone configuration: all in UTC | ||||
enable_utc=True, | enable_utc=True, | ||||
timezone='UTC', | timezone='UTC', | ||||
# Imported modules | # Imported modules | ||||
imports=CONFIG['task_modules'], | imports=CONFIG['task_modules'], | ||||
# Time (in seconds, or a timedelta object) for when after stored task | # Time (in seconds, or a timedelta object) for when after stored task | ||||
# tombstones will be deleted. None means to never expire results. | # tombstones will be deleted. None means to never expire results. | ||||
▲ Show 20 Lines • Show All 46 Lines • ▼ Show 20 Lines | CELERY_DEFAULT_CONFIG = dict( | ||||
) | ) | ||||
def build_app(config=None): | def build_app(config=None): | ||||
config = merge_configs( | config = merge_configs( | ||||
{k: v for (k, (_, v)) in DEFAULT_CONFIG.items()}, | {k: v for (k, (_, v)) in DEFAULT_CONFIG.items()}, | ||||
config or {}) | config or {}) | ||||
config['task_queues'] = [Queue(queue, Exchange(queue), routing_key=queue) | |||||
for queue in config.get('task_queues', ())] | |||||
logger.debug('Creating a Celery app with %s', config) | logger.debug('Creating a Celery app with %s', config) | ||||
# Instantiate the Celery app | # Instantiate the Celery app | ||||
app = Celery(broker=config['task_broker'], | app = Celery(broker=config['task_broker'], | ||||
backend=config['result_backend'], | backend=config['result_backend'], | ||||
task_cls='swh.scheduler.task:SWHTask') | task_cls='swh.scheduler.task:SWHTask') | ||||
app.add_defaults(CELERY_DEFAULT_CONFIG) | app.add_defaults(CELERY_DEFAULT_CONFIG) | ||||
app.add_defaults(config) | app.add_defaults(config) | ||||
return app | return app | ||||
app = build_app(CONFIG) | app = build_app(CONFIG) | ||||
# XXX for BW compat | # XXX for BW compat | ||||
Celery.get_queue_length = get_queue_length | Celery.get_queue_length = get_queue_length |