Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/config.py
Show First 20 Lines • Show All 184 Lines • ▼ Show 20 Lines | if name in app.tasks: | ||||
return | return | ||||
task_instance = cls() | task_instance = cls() | ||||
task_instance.name = name | task_instance.name = name | ||||
app.register_task(task_instance) | app.register_task(task_instance) | ||||
INSTANCE_NAME = os.environ.get(CONFIG_NAME_ENVVAR) | INSTANCE_NAME = os.environ.get(CONFIG_NAME_ENVVAR) | ||||
CONFIG_NAME = os.environ.get('SWH_CONFIG_FILENAME') | |||||
CONFIG = {} | |||||
if CONFIG_NAME: | |||||
# load the celery config from the main config file given as | |||||
# SWH_CONFIG_FILENAME environment variable. | |||||
# This is expected to have a [celery] section in which we have the | |||||
# celery specific configuration. | |||||
CONFIG = load_named_config(CONFIG_NAME).get('celery') | |||||
if not CONFIG: | |||||
# otherwise, back to compat config loading mechanism | |||||
if INSTANCE_NAME: | if INSTANCE_NAME: | ||||
CONFIG_NAME = CONFIG_NAME_TEMPLATE % INSTANCE_NAME | CONFIG_NAME = CONFIG_NAME_TEMPLATE % INSTANCE_NAME | ||||
else: | else: | ||||
CONFIG_NAME = DEFAULT_CONFIG_NAME | CONFIG_NAME = DEFAULT_CONFIG_NAME | ||||
# Load the Celery config | # Load the Celery config | ||||
CONFIG = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) | CONFIG = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) | ||||
# Celery Queues | # Celery Queues | ||||
CELERY_QUEUES = [Queue('celery', Exchange('celery'), routing_key='celery')] | CELERY_QUEUES = [Queue('celery', Exchange('celery'), routing_key='celery')] | ||||
CELERY_DEFAULT_CONFIG = dict( | CELERY_DEFAULT_CONFIG = dict( | ||||
# Timezone configuration: all in UTC | # Timezone configuration: all in UTC | ||||
enable_utc=True, | enable_utc=True, | ||||
timezone='UTC', | timezone='UTC', | ||||
# Imported modules | # Imported modules | ||||
imports=CONFIG['task_modules'], | imports=CONFIG.get('task_modules', []), | ||||
# Time (in seconds, or a timedelta object) for when after stored task | # Time (in seconds, or a timedelta object) for when after stored task | ||||
# tombstones will be deleted. None means to never expire results. | # tombstones will be deleted. None means to never expire results. | ||||
result_expires=None, | result_expires=None, | ||||
# A string identifying the default serialization method to use. Can | # A string identifying the default serialization method to use. Can | ||||
# be json (default), pickle, yaml, msgpack, or any custom | # be json (default), pickle, yaml, msgpack, or any custom | ||||
# serialization methods that have been registered with | # serialization methods that have been registered with | ||||
task_serializer='msgpack', | task_serializer='msgpack', | ||||
# Result serialization format | # Result serialization format | ||||
Show All 12 Lines | CELERY_DEFAULT_CONFIG = dict( | ||||
# (if available), or any custom compression schemes registered | # (if available), or any custom compression schemes registered | ||||
# in the Kombu compression registry. | # in the Kombu compression registry. | ||||
# result_compression='bzip2', | # result_compression='bzip2', | ||||
# task_compression='bzip2', | # task_compression='bzip2', | ||||
# Disable all rate limits, even if tasks has explicit rate limits set. | # Disable all rate limits, even if tasks has explicit rate limits set. | ||||
# (Disabling rate limits altogether is recommended if you don’t have any | # (Disabling rate limits altogether is recommended if you don’t have any | ||||
# tasks using them.) | # tasks using them.) | ||||
worker_disable_rate_limits=True, | worker_disable_rate_limits=True, | ||||
# Task hard time limit in seconds. The worker processing the task will be | |||||
# killed and replaced with a new one when this is exceeded. | |||||
# task_time_limit=3600, | |||||
# Task soft time limit in seconds. | |||||
# The SoftTimeLimitExceeded exception will be raised when this is exceeded. | |||||
# The task can catch this to e.g. clean up before the hard time limit | |||||
# comes. | |||||
task_soft_time_limit=CONFIG['task_soft_time_limit'], | |||||
ardumont: i guess we'll see what that changes ;) | |||||
Done Inline ActionsIn fact, with the recent modifications in config file handling, this is a noop, the 'task_soft_time_limit' from CONFIG is already passed to the celery app. douardda: In fact, with the recent modifications in config file handling, this is a noop, the… | |||||
# Task routing | # Task routing | ||||
task_routes=route_for_task, | task_routes=route_for_task, | ||||
# Task queues this worker will consume from | # Task queues this worker will consume from | ||||
task_queues=CELERY_QUEUES, | task_queues=CELERY_QUEUES, | ||||
# Allow pool restarts from remote | # Allow pool restarts from remote | ||||
worker_pool_restarts=True, | worker_pool_restarts=True, | ||||
# Do not prefetch tasks | # Do not prefetch tasks | ||||
worker_prefetch_multiplier=1, | worker_prefetch_multiplier=1, | ||||
Show All 29 Lines |
i guess we'll see what that changes ;)