Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/config.py
# Copyright (C) 2015-2019 The Software Heritage developers | # Copyright (C) 2015-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import functools | import functools | ||||
import logging | import logging | ||||
import os | import os | ||||
import pkg_resources | import pkg_resources | ||||
import traceback | import traceback | ||||
from typing import Any, Dict | from typing import Any, Dict | ||||
import urllib.parse | import urllib.parse | ||||
from celery import Celery | from celery import Celery | ||||
from celery.signals import setup_logging, celeryd_after_setup | from celery.signals import setup_logging, celeryd_after_setup, worker_init | ||||
from celery.utils.log import ColorFormatter | from celery.utils.log import ColorFormatter | ||||
from celery.worker.control import Panel | from celery.worker.control import Panel | ||||
from kombu import Exchange, Queue | from kombu import Exchange, Queue | ||||
from kombu.five import monotonic as _monotonic | from kombu.five import monotonic as _monotonic | ||||
import requests | import requests | ||||
▲ Show 20 Lines • Show All 112 Lines • ▼ Show 20 Lines | def setup_queues_and_tasks(sender, instance, **kwargs): | ||||
This also subscribes the worker to the "implicit" per-task queues defined | This also subscribes the worker to the "implicit" per-task queues defined | ||||
for these task classes. | for these task classes. | ||||
""" | """ | ||||
logger.info('Setup Queues & Tasks for %s', sender) | logger.info('Setup Queues & Tasks for %s', sender) | ||||
instance.app.conf['worker_name'] = sender | instance.app.conf['worker_name'] = sender | ||||
@worker_init.connect | |||||
@_print_errors | |||||
def on_worker_init(*args, **kwargs): | |||||
sentry_dsn = os.environ.get('SWH_SENTRY_DSN') | |||||
if sentry_dsn: | |||||
import sentry_sdk | |||||
from sentry_sdk.integrations.celery import CeleryIntegration | |||||
sentry_sdk.init( | |||||
dsn=sentry_dsn, | |||||
integrations=[CeleryIntegration()], | |||||
debug=bool(os.environ.get('SWH_SENTRY_DEBUG')), | |||||
) | |||||
@Panel.register | @Panel.register | ||||
def monotonic(state): | def monotonic(state): | ||||
"""Get the current value for the monotonic clock""" | """Get the current value for the monotonic clock""" | ||||
return {'monotonic': _monotonic()} | return {'monotonic': _monotonic()} | ||||
def route_for_task(name, args, kwargs, options, task=None, **kw): | def route_for_task(name, args, kwargs, options, task=None, **kw): | ||||
"""Route tasks according to the task_queue attribute in the task class""" | """Route tasks according to the task_queue attribute in the task class""" | ||||
▲ Show 20 Lines • Show All 164 Lines • Show Last 20 Lines |