Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/config.py
Show All 15 Lines | |||||
from kombu import Exchange, Queue | from kombu import Exchange, Queue | ||||
from kombu.five import monotonic as _monotonic | from kombu.five import monotonic as _monotonic | ||||
import requests | import requests | ||||
from swh.scheduler.task import Task | from swh.scheduler.task import Task | ||||
from swh.core.config import load_named_config | from swh.core.config import load_named_config, merge_configs | ||||
from swh.core.logger import JournalHandler | from swh.core.logger import JournalHandler | ||||
DEFAULT_CONFIG_NAME = 'worker' | DEFAULT_CONFIG_NAME = 'worker' | ||||
CONFIG_NAME_ENVVAR = 'SWH_WORKER_INSTANCE' | CONFIG_NAME_ENVVAR = 'SWH_WORKER_INSTANCE' | ||||
CONFIG_NAME_TEMPLATE = 'worker/%s' | CONFIG_NAME_TEMPLATE = 'worker/%s' | ||||
DEFAULT_CONFIG = { | DEFAULT_CONFIG = { | ||||
'task_broker': ('str', 'amqp://guest@localhost//'), | 'task_broker': ('str', 'amqp://guest@localhost//'), | ||||
▲ Show 20 Lines • Show All 216 Lines • ▼ Show 20 Lines | CELERY_DEFAULT_CONFIG = dict( | ||||
# Do not prefetch tasks | # Do not prefetch tasks | ||||
worker_prefetch_multiplier=1, | worker_prefetch_multiplier=1, | ||||
# Send events | # Send events | ||||
worker_send_task_events=True, | worker_send_task_events=True, | ||||
# Do not send useless task_sent events | # Do not send useless task_sent events | ||||
task_send_sent_event=False, | task_send_sent_event=False, | ||||
) | ) | ||||
def build_app(config=None): | |||||
config = merge_configs( | |||||
{k: v for (k, (_, v)) in DEFAULT_CONFIG.items()}, | |||||
config or {}) | |||||
logging.getLogger(__name__).info( | |||||
'Creating a Celery app with %s', config) | |||||
# Instantiate the Celery app | # Instantiate the Celery app | ||||
app = Celery(broker=CONFIG['task_broker'], | app = Celery(broker=config['task_broker'], | ||||
backend=CONFIG['result_backend'], | backend=config['result_backend'], | ||||
task_cls='swh.scheduler.task:SWHTask') | task_cls='swh.scheduler.task:SWHTask') | ||||
app.add_defaults(CELERY_DEFAULT_CONFIG) | app.add_defaults(CELERY_DEFAULT_CONFIG) | ||||
app.add_defaults(config) | |||||
return app | |||||
app = build_app(CONFIG) | |||||
# XXX for BW compat | # XXX for BW compat | ||||
Celery.get_queue_length = get_queue_length | Celery.get_queue_length = get_queue_length |