Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9342234
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
3 KB
Subscribers
None
View Options
diff --git a/swh/scheduler/task.py b/swh/scheduler/task.py
index 3d16510..b141a01 100644
--- a/swh/scheduler/task.py
+++ b/swh/scheduler/task.py
@@ -1,62 +1,68 @@
# Copyright (C) 2015-2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import celery.app.task
from celery.utils.log import get_task_logger
class SWHTask(celery.app.task.Task):
"""a schedulable task (abstract class)
Current implementation is based on Celery. See
http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for
how to use tasks once instantiated
"""
_log = None
def on_failure(self, exc, task_id, args, kwargs, einfo):
self.send_event('task-result-exception')
def on_success(self, retval, task_id, args, kwargs):
self.send_event('task-result', result=retval)
@property
def log(self):
if self._log is None:
self._log = get_task_logger(self.name)
return self._log
+ def run(self, *args, **kwargs):
+ self.log.debug('%s: args=%s, kwargs=%s', self.name, args, kwargs)
+ ret = super().run(*args, **kwargs)
+ self.log.debug('%s: OK => %s', self.name, ret)
+ return ret
+
class Task(SWHTask):
"""a schedulable task (abstract class)
DEPRECATED! Please use SWHTask as base for decorated functions instead.
Sub-classes must implement the run_task() method.
Current implementation is based on Celery. See
http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for
how to use tasks once instantiated
"""
abstract = True
def run(self, *args, **kwargs):
"""This method is called by the celery worker when a task is received.
Should not be overridden as we need our special events to be sent for
the reccurrent scheduler. Override run_task instead."""
return self.run_task(*args, **kwargs)
def run_task(self, *args, **kwargs):
"""Perform the task.
Must return a json-serializable value as it is passed back to the task
scheduler using a celery event.
"""
raise NotImplementedError('tasks must implement the run_task() method')
diff --git a/swh/scheduler/tests/tasks.py b/swh/scheduler/tests/tasks.py
index 5702680..b18df46 100644
--- a/swh/scheduler/tests/tasks.py
+++ b/swh/scheduler/tests/tasks.py
@@ -1,41 +1,31 @@
-from celery import group
+from celery import group, current_app as app
-from swh.scheduler.celery_backend.config import app
-
-@app.task(name='swh.scheduler.tests.tasks.ping',
- bind=True)
+@app.task(name='swh.scheduler.tests.tasks.ping', bind=True)
def ping(self, **kw):
# check this is a SWHTask
assert hasattr(self, 'log')
assert not hasattr(self, 'run_task')
assert 'SWHTask' in [x.__name__ for x in self.__class__.__mro__]
self.log.debug(self.name)
if kw:
return 'OK (kw=%s)' % kw
return 'OK'
-@app.task(name='swh.scheduler.tests.tasks.multiping',
- bind=True)
+@app.task(name='swh.scheduler.tests.tasks.multiping', bind=True)
def multiping(self, n=10):
- self.log.debug(self.name)
-
promise = group(ping.s(i=i) for i in range(n))()
self.log.debug('%s OK (spawned %s subtasks)' % (self.name, n))
promise.save()
return promise.id
-@app.task(name='swh.scheduler.tests.tasks.error',
- bind=True)
-def not_implemented(self):
- self.log.debug(self.name)
+@app.task(name='swh.scheduler.tests.tasks.error')
+def not_implemented():
raise NotImplementedError('Nope')
-@app.task(name='swh.scheduler.tests.tasks.add',
- bind=True)
-def add(self, x, y):
- self.log.debug(self.name)
+@app.task(name='swh.scheduler.tests.tasks.add')
+def add(x, y):
return x + y
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Jul 4, 12:34 PM (2 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3340458
Attached To
rDSCH Scheduling utilities
Event Timeline
Log In to Comment