diff --git a/swh/scheduler/celery_backend/config.py b/swh/scheduler/celery_backend/config.py --- a/swh/scheduler/celery_backend/config.py +++ b/swh/scheduler/celery_backend/config.py @@ -120,11 +120,10 @@ return {'monotonic': _monotonic()} -class TaskRouter: +def route_for_task(name, args, kwargs, options, task=None, **kw): """Route tasks according to the task_queue attribute in the task class""" - def route_for_task(self, task, *args, **kwargs): - if task.startswith('swh.'): - return {'queue': task} + if name is not None and name.startswith('swh.'): + return {'queue': name} class CustomCelery(Celery): @@ -243,7 +242,7 @@ # comes. task_soft_time_limit=CONFIG['task_soft_time_limit'], # Task routing - task_routes=TaskRouter(), + task_routes=route_for_task, # Task queues this worker will consume from task_queues=CELERY_QUEUES, # Allow pool restarts from remote diff --git a/swh/scheduler/task.py b/swh/scheduler/task.py --- a/swh/scheduler/task.py +++ b/swh/scheduler/task.py @@ -7,9 +7,35 @@ from celery.utils.log import get_task_logger -class Task(celery.app.task.Task): +class SWHTask(celery.app.task.Task): """a schedulable task (abstract class) + Current implementation is based on Celery. See + http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for + how to use tasks once instantiated + + """ + + _log = None + + def on_failure(self, exc, task_id, args, kwargs, einfo): + self.send_event('task-result-exception') + + def on_success(self, retval, task_id, args, kwargs): + self.send_event('task-result', result=retval) + + @property + def log(self): + if self._log is None: + self._log = get_task_logger(self.name) + return self._log + + +class Task(SWHTask): + """a schedulable task (abstract class) + + DEPRECATED! Please use SWHTask as base for decorated functions instead. + Sub-classes must implement the run_task() method. Current implementation is based on Celery. See @@ -25,14 +51,7 @@ Should not be overridden as we need our special events to be sent for the reccurrent scheduler. Override run_task instead.""" - try: - result = self.run_task(*args, **kwargs) - except Exception as e: - self.send_event('task-result-exception') - raise e from None - else: - self.send_event('task-result', result=result) - return result + return self.run_task(*args, **kwargs) def run_task(self, *args, **kwargs): """Perform the task. @@ -41,10 +60,3 @@ scheduler using a celery event. """ raise NotImplementedError('tasks must implement the run_task() method') - - @property - def log(self): - if not hasattr(self, '__log'): - self.__log = get_task_logger('%s.%s' % - (__name__, self.__class__.__name__)) - return self.__log