diff --git a/swh/scheduler/task.py b/swh/scheduler/task.py index 3d16510..b141a01 100644 --- a/swh/scheduler/task.py +++ b/swh/scheduler/task.py @@ -1,62 +1,68 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import celery.app.task from celery.utils.log import get_task_logger class SWHTask(celery.app.task.Task): """a schedulable task (abstract class) Current implementation is based on Celery. See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for how to use tasks once instantiated """ _log = None def on_failure(self, exc, task_id, args, kwargs, einfo): self.send_event('task-result-exception') def on_success(self, retval, task_id, args, kwargs): self.send_event('task-result', result=retval) @property def log(self): if self._log is None: self._log = get_task_logger(self.name) return self._log + def run(self, *args, **kwargs): + self.log.debug('%s: args=%s, kwargs=%s', self.name, args, kwargs) + ret = super().run(*args, **kwargs) + self.log.debug('%s: OK => %s', self.name, ret) + return ret + class Task(SWHTask): """a schedulable task (abstract class) DEPRECATED! Please use SWHTask as base for decorated functions instead. Sub-classes must implement the run_task() method. Current implementation is based on Celery. See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for how to use tasks once instantiated """ abstract = True def run(self, *args, **kwargs): """This method is called by the celery worker when a task is received. Should not be overridden as we need our special events to be sent for the reccurrent scheduler. Override run_task instead.""" return self.run_task(*args, **kwargs) def run_task(self, *args, **kwargs): """Perform the task. Must return a json-serializable value as it is passed back to the task scheduler using a celery event. """ raise NotImplementedError('tasks must implement the run_task() method') diff --git a/swh/scheduler/tests/tasks.py b/swh/scheduler/tests/tasks.py index 5702680..b18df46 100644 --- a/swh/scheduler/tests/tasks.py +++ b/swh/scheduler/tests/tasks.py @@ -1,41 +1,31 @@ -from celery import group +from celery import group, current_app as app -from swh.scheduler.celery_backend.config import app - -@app.task(name='swh.scheduler.tests.tasks.ping', - bind=True) +@app.task(name='swh.scheduler.tests.tasks.ping', bind=True) def ping(self, **kw): # check this is a SWHTask assert hasattr(self, 'log') assert not hasattr(self, 'run_task') assert 'SWHTask' in [x.__name__ for x in self.__class__.__mro__] self.log.debug(self.name) if kw: return 'OK (kw=%s)' % kw return 'OK' -@app.task(name='swh.scheduler.tests.tasks.multiping', - bind=True) +@app.task(name='swh.scheduler.tests.tasks.multiping', bind=True) def multiping(self, n=10): - self.log.debug(self.name) - promise = group(ping.s(i=i) for i in range(n))() self.log.debug('%s OK (spawned %s subtasks)' % (self.name, n)) promise.save() return promise.id -@app.task(name='swh.scheduler.tests.tasks.error', - bind=True) -def not_implemented(self): - self.log.debug(self.name) +@app.task(name='swh.scheduler.tests.tasks.error') +def not_implemented(): raise NotImplementedError('Nope') -@app.task(name='swh.scheduler.tests.tasks.add', - bind=True) -def add(self, x, y): - self.log.debug(self.name) +@app.task(name='swh.scheduler.tests.tasks.add') +def add(x, y): return x + y