Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/scheduler/task.py b/swh/scheduler/task.py
index 3d16510..b141a01 100644
--- a/swh/scheduler/task.py
+++ b/swh/scheduler/task.py
@@ -1,62 +1,68 @@
# Copyright (C) 2015-2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import celery.app.task
from celery.utils.log import get_task_logger
class SWHTask(celery.app.task.Task):
"""a schedulable task (abstract class)
Current implementation is based on Celery. See
http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for
how to use tasks once instantiated
"""
_log = None
def on_failure(self, exc, task_id, args, kwargs, einfo):
self.send_event('task-result-exception')
def on_success(self, retval, task_id, args, kwargs):
self.send_event('task-result', result=retval)
@property
def log(self):
if self._log is None:
self._log = get_task_logger(self.name)
return self._log
+ def run(self, *args, **kwargs):
+ self.log.debug('%s: args=%s, kwargs=%s', self.name, args, kwargs)
+ ret = super().run(*args, **kwargs)
+ self.log.debug('%s: OK => %s', self.name, ret)
+ return ret
+
class Task(SWHTask):
"""a schedulable task (abstract class)
DEPRECATED! Please use SWHTask as base for decorated functions instead.
Sub-classes must implement the run_task() method.
Current implementation is based on Celery. See
http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for
how to use tasks once instantiated
"""
abstract = True
def run(self, *args, **kwargs):
"""This method is called by the celery worker when a task is received.
Should not be overridden as we need our special events to be sent for
the reccurrent scheduler. Override run_task instead."""
return self.run_task(*args, **kwargs)
def run_task(self, *args, **kwargs):
"""Perform the task.
Must return a json-serializable value as it is passed back to the task
scheduler using a celery event.
"""
raise NotImplementedError('tasks must implement the run_task() method')
diff --git a/swh/scheduler/tests/tasks.py b/swh/scheduler/tests/tasks.py
index 5702680..b18df46 100644
--- a/swh/scheduler/tests/tasks.py
+++ b/swh/scheduler/tests/tasks.py
@@ -1,41 +1,31 @@
-from celery import group
+from celery import group, current_app as app
-from swh.scheduler.celery_backend.config import app
-
-@app.task(name='swh.scheduler.tests.tasks.ping',
- bind=True)
+@app.task(name='swh.scheduler.tests.tasks.ping', bind=True)
def ping(self, **kw):
# check this is a SWHTask
assert hasattr(self, 'log')
assert not hasattr(self, 'run_task')
assert 'SWHTask' in [x.__name__ for x in self.__class__.__mro__]
self.log.debug(self.name)
if kw:
return 'OK (kw=%s)' % kw
return 'OK'
-@app.task(name='swh.scheduler.tests.tasks.multiping',
- bind=True)
+@app.task(name='swh.scheduler.tests.tasks.multiping', bind=True)
def multiping(self, n=10):
- self.log.debug(self.name)
-
promise = group(ping.s(i=i) for i in range(n))()
self.log.debug('%s OK (spawned %s subtasks)' % (self.name, n))
promise.save()
return promise.id
-@app.task(name='swh.scheduler.tests.tasks.error',
- bind=True)
-def not_implemented(self):
- self.log.debug(self.name)
+@app.task(name='swh.scheduler.tests.tasks.error')
+def not_implemented():
raise NotImplementedError('Nope')
-@app.task(name='swh.scheduler.tests.tasks.add',
- bind=True)
-def add(self, x, y):
- self.log.debug(self.name)
+@app.task(name='swh.scheduler.tests.tasks.add')
+def add(x, y):
return x + y

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 12:34 PM (2 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3340458

Event Timeline