Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/config.py
Show First 20 Lines • Show All 79 Lines • ▼ Show 20 Lines | |||||
def monotonic(state): | def monotonic(state): | ||||
"""Get the current value for the monotonic clock""" | """Get the current value for the monotonic clock""" | ||||
return {'monotonic': _monotonic()} | return {'monotonic': _monotonic()} | ||||
class TaskRouter: | class TaskRouter: | ||||
"""Route tasks according to the task_queue attribute in the task class""" | """Route tasks according to the task_queue attribute in the task class""" | ||||
def route_for_task(self, task, args=None, kwargs=None): | def route_for_task(self, task, args=None, kwargs=None): | ||||
task_class = app.tasks[task] | task_class = app.tasks.get(task) | ||||
if task_class is None: | |||||
return {'queue': 'auto_queue_for_%s' % task} | |||||
if hasattr(task_class, 'task_queue'): | if hasattr(task_class, 'task_queue'): | ||||
return {'queue': task_class.task_queue} | return {'queue': task_class.task_queue} | ||||
return None | return None | ||||
class CustomCelery(Celery): | class CustomCelery(Celery): | ||||
def get_queue_stats(self, queue_name): | def get_queue_stats(self, queue_name): | ||||
"""Get the statistics regarding a queue on the broker. | """Get the statistics regarding a queue on the broker. | ||||
▲ Show 20 Lines • Show All 113 Lines • Show Last 20 Lines |