diff --git a/swh/scheduler/celery_backend/config.py b/swh/scheduler/celery_backend/config.py --- a/swh/scheduler/celery_backend/config.py +++ b/swh/scheduler/celery_backend/config.py @@ -100,6 +100,8 @@ logger.info('Setup Queues & Tasks for %s', sender) + instance.app.conf['worker_name'] = sender + for module_name in itertools.chain( # celery worker -I flag instance.app.conf['include'], diff --git a/swh/scheduler/task.py b/swh/scheduler/task.py --- a/swh/scheduler/task.py +++ b/swh/scheduler/task.py @@ -1,11 +1,14 @@ -# Copyright (C) 2015-2017 The Software Heritage developers +# Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from celery import current_app import celery.app.task from celery.utils.log import get_task_logger +from swh.core.statsd import Statsd + class SWHTask(celery.app.task.Task): """a schedulable task (abstract class) @@ -16,12 +19,37 @@ """ + _statsd = None _log = None + @property + def statsd(self): + if self._statsd: + return self._statsd + worker_name = current_app.conf.get('worker_name') + if worker_name: + self._statsd = Statsd(constant_tags={ + 'task': self.name, + 'worker': worker_name, + }) + return self._statsd + else: + return Statsd(constant_tags={ + 'task': self.name, + 'worker': 'unknown worker', + }) + + def __call__(self, *args, **kwargs): + self.statsd.increment('swh_task_called_count') + with self.statsd.timed('swh_task_duration_seconds'): + return super().__call__(*args, **kwargs) + def on_failure(self, exc, task_id, args, kwargs, einfo): + self.statsd.increment('swh_task_failure_count') self.send_event('task-result-exception') def on_success(self, retval, task_id, args, kwargs): + self.statsd.increment('swh_task_success_count') self.send_event('task-result', result=retval) @property