Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/config.py
# Copyright (C) 2015-2019 The Software Heritage developers | # Copyright (C) 2015-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import functools | |||||
import logging | import logging | ||||
import os | import os | ||||
import pkg_resources | import pkg_resources | ||||
import traceback | |||||
from typing import Any, Dict | |||||
import urllib.parse | import urllib.parse | ||||
from celery import Celery | from celery import Celery | ||||
from celery.signals import setup_logging, celeryd_after_setup | from celery.signals import setup_logging, celeryd_after_setup | ||||
from celery.utils.log import ColorFormatter | from celery.utils.log import ColorFormatter | ||||
from celery.worker.control import Panel | from celery.worker.control import Panel | ||||
from kombu import Exchange, Queue | from kombu import Exchange, Queue | ||||
from kombu.five import monotonic as _monotonic | from kombu.five import monotonic as _monotonic | ||||
import requests | import requests | ||||
from typing import Any, Dict | |||||
from swh.scheduler import CONFIG as SWH_CONFIG | from swh.scheduler import CONFIG as SWH_CONFIG | ||||
from swh.core.config import load_named_config, merge_configs | from swh.core.config import load_named_config, merge_configs | ||||
try: | try: | ||||
from swh.core.logger import JournalHandler | from swh.core.logger import JournalHandler | ||||
except ImportError: | except ImportError: | ||||
JournalHandler = None # type: ignore | JournalHandler = None # type: ignore | ||||
DEFAULT_CONFIG_NAME = 'worker' | DEFAULT_CONFIG_NAME = 'worker' | ||||
CONFIG_NAME_ENVVAR = 'SWH_WORKER_INSTANCE' | CONFIG_NAME_ENVVAR = 'SWH_WORKER_INSTANCE' | ||||
CONFIG_NAME_TEMPLATE = 'worker/%s' | CONFIG_NAME_TEMPLATE = 'worker/%s' | ||||
DEFAULT_CONFIG = { | DEFAULT_CONFIG = { | ||||
'task_broker': ('str', 'amqp://guest@localhost//'), | 'task_broker': ('str', 'amqp://guest@localhost//'), | ||||
'task_modules': ('list[str]', []), | 'task_modules': ('list[str]', []), | ||||
'task_queues': ('list[str]', []), | 'task_queues': ('list[str]', []), | ||||
'task_soft_time_limit': ('int', 0), | 'task_soft_time_limit': ('int', 0), | ||||
} | } | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
# Celery eats tracebacks in signal callbacks, this decorator catches | |||||
# and prints them. | |||||
# Also tries to notify Sentry if possible. | |||||
def _print_errors(f): | |||||
@functools.wraps(f) | |||||
def newf(*args, **kwargs): | |||||
try: | |||||
return f(*args, **kwargs) | |||||
except Exception as exc: | |||||
traceback.print_exc() | |||||
try: | |||||
import sentry_sdk | |||||
sentry_sdk.capture_exception(exc) | |||||
except Exception: | |||||
traceback.print_exc() | |||||
return newf | |||||
@setup_logging.connect | @setup_logging.connect | ||||
@_print_errors | |||||
def setup_log_handler(loglevel=None, logfile=None, format=None, colorize=None, | def setup_log_handler(loglevel=None, logfile=None, format=None, colorize=None, | ||||
log_console=None, log_journal=None, **kwargs): | log_console=None, log_journal=None, **kwargs): | ||||
"""Setup logging according to Software Heritage preferences. | """Setup logging according to Software Heritage preferences. | ||||
We use the command-line loglevel for tasks only, as we never | We use the command-line loglevel for tasks only, as we never | ||||
really care about the debug messages from celery. | really care about the debug messages from celery. | ||||
""" | """ | ||||
if loglevel is None: | if loglevel is None: | ||||
▲ Show 20 Lines • Show All 44 Lines • ▼ Show 20 Lines | def setup_log_handler(loglevel=None, logfile=None, format=None, colorize=None, | ||||
logging.getLogger('swh').setLevel(loglevel) | logging.getLogger('swh').setLevel(loglevel) | ||||
# get_task_logger makes the swh tasks loggers children of celery.task | # get_task_logger makes the swh tasks loggers children of celery.task | ||||
logging.getLogger('celery.task').setLevel(loglevel) | logging.getLogger('celery.task').setLevel(loglevel) | ||||
return loglevel | return loglevel | ||||
@celeryd_after_setup.connect | @celeryd_after_setup.connect | ||||
@_print_errors | |||||
def setup_queues_and_tasks(sender, instance, **kwargs): | def setup_queues_and_tasks(sender, instance, **kwargs): | ||||
"""Signal called on worker start. | """Signal called on worker start. | ||||
This automatically registers swh.scheduler.task.Task subclasses as | This automatically registers swh.scheduler.task.Task subclasses as | ||||
available celery tasks. | available celery tasks. | ||||
This also subscribes the worker to the "implicit" per-task queues defined | This also subscribes the worker to the "implicit" per-task queues defined | ||||
for these task classes. | for these task classes. | ||||
▲ Show 20 Lines • Show All 178 Lines • Show Last 20 Lines |