Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/config.py
Show First 20 Lines • Show All 114 Lines • ▼ Show 20 Lines | |||||
@Panel.register | @Panel.register | ||||
def monotonic(state): | def monotonic(state): | ||||
"""Get the current value for the monotonic clock""" | """Get the current value for the monotonic clock""" | ||||
return {'monotonic': _monotonic()} | return {'monotonic': _monotonic()} | ||||
class TaskRouter: | def route_for_task(name, args, kwargs, options, task=None, **kw): | ||||
"""Route tasks according to the task_queue attribute in the task class""" | """Route tasks according to the task_queue attribute in the task class""" | ||||
def route_for_task(self, task, *args, **kwargs): | if name is not None and name.startswith('swh.'): | ||||
if task.startswith('swh.'): | return {'queue': name} | ||||
return {'queue': task} | |||||
class CustomCelery(Celery): | class CustomCelery(Celery): | ||||
def get_queue_stats(self, queue_name): | def get_queue_stats(self, queue_name): | ||||
"""Get the statistics regarding a queue on the broker. | """Get the statistics regarding a queue on the broker. | ||||
Arguments: | Arguments: | ||||
queue_name: name of the queue to check | queue_name: name of the queue to check | ||||
▲ Show 20 Lines • Show All 102 Lines • ▼ Show 20 Lines | app.conf.update( | ||||
# killed and replaced with a new one when this is exceeded. | # killed and replaced with a new one when this is exceeded. | ||||
# task_time_limit=3600, | # task_time_limit=3600, | ||||
# Task soft time limit in seconds. | # Task soft time limit in seconds. | ||||
# The SoftTimeLimitExceeded exception will be raised when this is exceeded. | # The SoftTimeLimitExceeded exception will be raised when this is exceeded. | ||||
# The task can catch this to e.g. clean up before the hard time limit | # The task can catch this to e.g. clean up before the hard time limit | ||||
# comes. | # comes. | ||||
task_soft_time_limit=CONFIG['task_soft_time_limit'], | task_soft_time_limit=CONFIG['task_soft_time_limit'], | ||||
# Task routing | # Task routing | ||||
task_routes=TaskRouter(), | task_routes=route_for_task, | ||||
# Task queues this worker will consume from | # Task queues this worker will consume from | ||||
task_queues=CELERY_QUEUES, | task_queues=CELERY_QUEUES, | ||||
# Allow pool restarts from remote | # Allow pool restarts from remote | ||||
worker_pool_restarts=True, | worker_pool_restarts=True, | ||||
# Do not prefetch tasks | # Do not prefetch tasks | ||||
worker_prefetch_multiplier=1, | worker_prefetch_multiplier=1, | ||||
# Send events | # Send events | ||||
worker_send_task_events=True, | worker_send_task_events=True, | ||||
# Do not send useless task_sent events | # Do not send useless task_sent events | ||||
task_send_sent_event=False, | task_send_sent_event=False, | ||||
) | ) |