Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/config.py
Show First 20 Lines • Show All 85 Lines • ▼ Show 20 Lines | def setup_queues_and_tasks(sender, instance, **kwargs): | ||||
This automatically registers swh.scheduler.task.Task subclasses as | This automatically registers swh.scheduler.task.Task subclasses as | ||||
available celery tasks. | available celery tasks. | ||||
This also subscribes the worker to the "implicit" per-task queues defined | This also subscribes the worker to the "implicit" per-task queues defined | ||||
for these task classes. | for these task classes. | ||||
""" | """ | ||||
for module_name in itertools.chain( | for module_name in itertools.chain( | ||||
# celery worker -I flag | # celery worker -I flag | ||||
instance.app.conf['include'], | instance.app.conf['include'], | ||||
# set from the celery / swh worker instance configuration file | # set from the celery / swh worker instance configuration file | ||||
instance.app.conf['imports'], | instance.app.conf['imports'], | ||||
): | ): | ||||
module = importlib.import_module(module_name) | module = importlib.import_module(module_name) | ||||
for name in dir(module): | for name in dir(module): | ||||
obj = getattr(module, name) | obj = getattr(module, name) | ||||
if ( | if ( | ||||
isinstance(obj, type) | isinstance(obj, type) | ||||
and issubclass(obj, Task) | and issubclass(obj, Task) | ||||
and obj != Task # Don't register the abstract class itself | and obj != Task # Don't register the abstract class itself | ||||
and not name.endswith('Base') | |||||
and not name.startswith('_') | |||||
and obj.__module__ == module.__name__ | |||||
): | ): | ||||
class_name = '%s.%s' % (module_name, name) | class_name = '%s.%s' % (module_name, name) | ||||
instance.app.register_task_class(class_name, obj) | instance.app.register_task_class(class_name, obj) | ||||
for task_name in instance.app.tasks: | for task_name in instance.app.tasks: | ||||
if task_name.startswith('swh.'): | if task_name.startswith('swh.'): | ||||
instance.app.amqp.queues.select_add(task_name) | instance.app.amqp.queues.select_add(task_name) | ||||
▲ Show 20 Lines • Show All 142 Lines • Show Last 20 Lines |