Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/config.py
# Copyright (C) 2015 The Software Heritage developers | # Copyright (C) 2015 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from itertools import chain | |||||
vlorentz: Could you just `import itertools`? `chain` is also used by celery (which recommends using `from… | |||||
import importlib | |||||
import logging | import logging | ||||
import os | import os | ||||
import urllib.parse | import urllib.parse | ||||
from celery import Celery | from celery import Celery | ||||
from celery.signals import setup_logging, celeryd_after_setup | from celery.signals import setup_logging, celeryd_after_setup | ||||
from celery.utils.log import ColorFormatter | from celery.utils.log import ColorFormatter | ||||
from celery.worker.control import Panel | from celery.worker.control import Panel | ||||
from kombu import Exchange, Queue | from kombu import Exchange, Queue | ||||
from kombu.five import monotonic as _monotonic | from kombu.five import monotonic as _monotonic | ||||
import requests | import requests | ||||
from swh.scheduler.task import Task | |||||
from swh.core.config import load_named_config | from swh.core.config import load_named_config | ||||
from swh.core.logger import JournalHandler | from swh.core.logger import JournalHandler | ||||
DEFAULT_CONFIG_NAME = 'worker' | DEFAULT_CONFIG_NAME = 'worker' | ||||
CONFIG_NAME_ENVVAR = 'SWH_WORKER_INSTANCE' | CONFIG_NAME_ENVVAR = 'SWH_WORKER_INSTANCE' | ||||
CONFIG_NAME_TEMPLATE = 'worker/%s' | CONFIG_NAME_TEMPLATE = 'worker/%s' | ||||
DEFAULT_CONFIG = { | DEFAULT_CONFIG = { | ||||
▲ Show 20 Lines • Show All 44 Lines • ▼ Show 20 Lines | def setup_log_handler(loglevel=None, logfile=None, format=None, | ||||
swh_logger.setLevel(loglevel) | swh_logger.setLevel(loglevel) | ||||
# get_task_logger makes the swh tasks loggers children of celery.task | # get_task_logger makes the swh tasks loggers children of celery.task | ||||
celery_task_logger = logging.getLogger('celery.task') | celery_task_logger = logging.getLogger('celery.task') | ||||
celery_task_logger.setLevel(loglevel) | celery_task_logger.setLevel(loglevel) | ||||
@celeryd_after_setup.connect | @celeryd_after_setup.connect | ||||
def setup_queues_and_tasks(sender, instance, **kwargs): | def setup_queues_and_tasks(sender, instance, **kwargs): | ||||
for module_name in chain(app.conf['include'], app.conf['imports']): | |||||
module = importlib.import_module(module_name) | |||||
for name in dir(module): | |||||
obj = getattr(module, name) | |||||
if isinstance(obj, type) and issubclass(obj, Task) and obj != Task: | |||||
class_name = '%s.%s' % (module_name, name) | |||||
instance.app.register_task_class(class_name, obj) | |||||
for task_name in instance.app.tasks: | for task_name in instance.app.tasks: | ||||
if task_name.startswith('swh.'): | if task_name.startswith('swh.'): | ||||
instance.app.amqp.queues.select_add(task_name) | instance.app.amqp.queues.select_add(task_name) | ||||
vlorentzUnsubmitted Done Inline ActionsAdding a docstring to this function would be nice, it's getting non-trivial. vlorentz: Adding a docstring to this function would be nice, it's getting non-trivial. | |||||
olasdAuthorUnsubmitted Done Inline ActionsYes, agreed. olasd: Yes, agreed. | |||||
@Panel.register | @Panel.register | ||||
def monotonic(state): | def monotonic(state): | ||||
"""Get the current value for the monotonic clock""" | """Get the current value for the monotonic clock""" | ||||
return {'monotonic': _monotonic()} | return {'monotonic': _monotonic()} | ||||
▲ Show 20 Lines • Show All 43 Lines • ▼ Show 20 Lines | def get_queue_stats(self, queue_name): | ||||
return r.json() | return r.json() | ||||
def get_queue_length(self, queue_name): | def get_queue_length(self, queue_name): | ||||
"""Shortcut to get a queue's length""" | """Shortcut to get a queue's length""" | ||||
stats = self.get_queue_stats(queue_name) | stats = self.get_queue_stats(queue_name) | ||||
if stats: | if stats: | ||||
return stats.get('messages') | return stats.get('messages') | ||||
def register_task_class(self, name, cls): | |||||
"""Register a class-based task under the given name""" | |||||
if name in self.tasks: | |||||
return | |||||
task_instance = cls() | |||||
task_instance.name = name | |||||
self.register_task(task_instance) | |||||
INSTANCE_NAME = os.environ.get(CONFIG_NAME_ENVVAR) | INSTANCE_NAME = os.environ.get(CONFIG_NAME_ENVVAR) | ||||
if INSTANCE_NAME: | if INSTANCE_NAME: | ||||
CONFIG_NAME = CONFIG_NAME_TEMPLATE % INSTANCE_NAME | CONFIG_NAME = CONFIG_NAME_TEMPLATE % INSTANCE_NAME | ||||
else: | else: | ||||
CONFIG_NAME = DEFAULT_CONFIG_NAME | CONFIG_NAME = DEFAULT_CONFIG_NAME | ||||
# Load the Celery config | # Load the Celery config | ||||
▲ Show 20 Lines • Show All 67 Lines • Show Last 20 Lines |
Could you just import itertools? chain is also used by celery (which recommends using from celery import chain) so it got me confused for a second when reading the code below