Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/config.py
# Copyright (C) 2015-2019 The Software Heritage developers | # Copyright (C) 2015-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import logging | import logging | ||||
import os | import os | ||||
import pkg_resources | |||||
import urllib.parse | import urllib.parse | ||||
from celery import Celery | from celery import Celery | ||||
from celery.signals import setup_logging, celeryd_after_setup | from celery.signals import setup_logging, celeryd_after_setup | ||||
from celery.utils.log import ColorFormatter | from celery.utils.log import ColorFormatter | ||||
from celery.worker.control import Panel | from celery.worker.control import Panel | ||||
from kombu import Exchange, Queue | from kombu import Exchange, Queue | ||||
from kombu.five import monotonic as _monotonic | from kombu.five import monotonic as _monotonic | ||||
import requests | import requests | ||||
from typing import Any, Dict | from typing import Any, Dict | ||||
from swh.scheduler import CONFIG as SWH_CONFIG | from swh.scheduler import CONFIG as SWH_CONFIG | ||||
from swh.scheduler import load_worker_modules | |||||
from swh.core.config import load_named_config, merge_configs | from swh.core.config import load_named_config, merge_configs | ||||
try: | try: | ||||
from swh.core.logger import JournalHandler | from swh.core.logger import JournalHandler | ||||
except ImportError: | except ImportError: | ||||
JournalHandler = None # type: ignore | JournalHandler = None # type: ignore | ||||
▲ Show 20 Lines • Show All 174 Lines • ▼ Show 20 Lines | if not CONFIG: | ||||
else: | else: | ||||
CONFIG_NAME = DEFAULT_CONFIG_NAME | CONFIG_NAME = DEFAULT_CONFIG_NAME | ||||
# Load the Celery config | # Load the Celery config | ||||
CONFIG = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) | CONFIG = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) | ||||
CONFIG.setdefault('task_modules', []) | CONFIG.setdefault('task_modules', []) | ||||
# load tasks modules declared as plugin entry points | # load tasks modules declared as plugin entry points | ||||
for entrypoint in pkg_resources.iter_entry_points('swh.workers'): | for entrypoint in load_worker_modules().values(): | ||||
worker_registrer_fn = entrypoint.load() | worker_registrer_fn = entrypoint.load() | ||||
# The registry function is expected to return a dict which the 'tasks' key | # The registry function is expected to return a dict which the 'tasks' key | ||||
# is a string (or a list of strings) with the name of the python module in | # is a string (or a list of strings) with the name of the python module in | ||||
# which celery tasks are defined. | # which celery tasks are defined. | ||||
task_modules = worker_registrer_fn().get('task_modules', []) | task_modules = worker_registrer_fn().get('task_modules', []) | ||||
CONFIG['task_modules'].extend(task_modules) | CONFIG['task_modules'].extend(task_modules) | ||||
# Celery Queues | # Celery Queues | ||||
▲ Show 20 Lines • Show All 71 Lines • Show Last 20 Lines |