Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/config.py
Show All 30 Lines | |||||
DEFAULT_CONFIG = { | DEFAULT_CONFIG = { | ||||
'task_broker': ('str', 'amqp://guest@localhost//'), | 'task_broker': ('str', 'amqp://guest@localhost//'), | ||||
'result_backend': ('str', 'rpc://'), | 'result_backend': ('str', 'rpc://'), | ||||
'task_modules': ('list[str]', []), | 'task_modules': ('list[str]', []), | ||||
'task_queues': ('list[str]', []), | 'task_queues': ('list[str]', []), | ||||
'task_soft_time_limit': ('int', 0), | 'task_soft_time_limit': ('int', 0), | ||||
} | } | ||||
logger = logging.getLogger(__name__) | |||||
@setup_logging.connect | @setup_logging.connect | ||||
def setup_log_handler(loglevel=None, logfile=None, format=None, | def setup_log_handler(loglevel=None, logfile=None, format=None, | ||||
colorize=None, **kwargs): | colorize=None, log_console=True, **kwargs): | ||||
"""Setup logging according to Software Heritage preferences. | """Setup logging according to Software Heritage preferences. | ||||
We use the command-line loglevel for tasks only, as we never | We use the command-line loglevel for tasks only, as we never | ||||
really care about the debug messages from celery. | really care about the debug messages from celery. | ||||
""" | """ | ||||
if loglevel is None: | if loglevel is None: | ||||
loglevel = logging.DEBUG | loglevel = logging.DEBUG | ||||
if isinstance(loglevel, str): | if isinstance(loglevel, str): | ||||
loglevel = logging._nameToLevel[loglevel] | loglevel = logging._nameToLevel[loglevel] | ||||
formatter = logging.Formatter(format) | formatter = logging.Formatter(format) | ||||
root_logger = logging.getLogger('') | root_logger = logging.getLogger('') | ||||
root_logger.setLevel(logging.INFO) | root_logger.setLevel(logging.INFO) | ||||
if loglevel == logging.DEBUG: | if loglevel <= logging.DEBUG: | ||||
log_console = True | |||||
if log_console: | |||||
color_formatter = ColorFormatter(format) if colorize else formatter | color_formatter = ColorFormatter(format) if colorize else formatter | ||||
console = logging.StreamHandler() | console = logging.StreamHandler() | ||||
console.setLevel(logging.DEBUG) | console.setLevel(logging.DEBUG) | ||||
console.setFormatter(color_formatter) | console.setFormatter(color_formatter) | ||||
root_logger.addHandler(console) | root_logger.addHandler(console) | ||||
systemd_journal = JournalHandler() | systemd_journal = JournalHandler() | ||||
systemd_journal.setLevel(logging.DEBUG) | systemd_journal.setLevel(logging.DEBUG) | ||||
Show All 22 Lines | def setup_queues_and_tasks(sender, instance, **kwargs): | ||||
This automatically registers swh.scheduler.task.Task subclasses as | This automatically registers swh.scheduler.task.Task subclasses as | ||||
available celery tasks. | available celery tasks. | ||||
This also subscribes the worker to the "implicit" per-task queues defined | This also subscribes the worker to the "implicit" per-task queues defined | ||||
for these task classes. | for these task classes. | ||||
""" | """ | ||||
logger.info('Setup Queues & Tasks for %s', sender) | |||||
for module_name in itertools.chain( | for module_name in itertools.chain( | ||||
# celery worker -I flag | # celery worker -I flag | ||||
instance.app.conf['include'], | instance.app.conf['include'], | ||||
# set from the celery / swh worker instance configuration file | # set from the celery / swh worker instance configuration file | ||||
instance.app.conf['imports'], | instance.app.conf['imports'], | ||||
): | ): | ||||
module = importlib.import_module(module_name) | module = importlib.import_module(module_name) | ||||
for name in dir(module): | for name in dir(module): | ||||
▲ Show 20 Lines • Show All 149 Lines • ▼ Show 20 Lines | CELERY_DEFAULT_CONFIG = dict( | ||||
task_send_sent_event=False, | task_send_sent_event=False, | ||||
) | ) | ||||
def build_app(config=None): | def build_app(config=None): | ||||
config = merge_configs( | config = merge_configs( | ||||
{k: v for (k, (_, v)) in DEFAULT_CONFIG.items()}, | {k: v for (k, (_, v)) in DEFAULT_CONFIG.items()}, | ||||
config or {}) | config or {}) | ||||
logging.getLogger(__name__).info( | |||||
'Creating a Celery app with %s', config) | logger.debug('Creating a Celery app with %s', config) | ||||
# Instantiate the Celery app | # Instantiate the Celery app | ||||
app = Celery(broker=config['task_broker'], | app = Celery(broker=config['task_broker'], | ||||
backend=config['result_backend'], | backend=config['result_backend'], | ||||
task_cls='swh.scheduler.task:SWHTask') | task_cls='swh.scheduler.task:SWHTask') | ||||
app.add_defaults(CELERY_DEFAULT_CONFIG) | app.add_defaults(CELERY_DEFAULT_CONFIG) | ||||
app.add_defaults(config) | app.add_defaults(config) | ||||
return app | return app | ||||
app = build_app(CONFIG) | app = build_app(CONFIG) | ||||
# XXX for BW compat | # XXX for BW compat | ||||
Celery.get_queue_length = get_queue_length | Celery.get_queue_length = get_queue_length |