Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/task.py
# Copyright (C) 2015-2017 The Software Heritage developers | # Copyright (C) 2015-2017 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import celery.app.task | import celery.app.task | ||||
from celery.utils.log import get_task_logger | from celery.utils.log import get_task_logger | ||||
class Task(celery.app.task.Task): | class SWHTask(celery.app.task.Task): | ||||
"""a schedulable task (abstract class) | """a schedulable task (abstract class) | ||||
Current implementation is based on Celery. See | |||||
http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for | |||||
how to use tasks once instantiated | |||||
""" | |||||
_log = None | |||||
def on_failure(self, exc, task_id, args, kwargs, einfo): | |||||
self.send_event('task-result-exception') | |||||
def on_success(self, retval, task_id, args, kwargs): | |||||
self.send_event('task-result', result=retval) | |||||
@property | |||||
def log(self): | |||||
if self._log is None: | |||||
self._log = get_task_logger(self.name) | |||||
return self._log | |||||
class Task(SWHTask): | |||||
"""a schedulable task (abstract class) | |||||
DEPRECATED! Please use SWHTask as base for decorated functions instead. | |||||
Sub-classes must implement the run_task() method. | Sub-classes must implement the run_task() method. | ||||
Current implementation is based on Celery. See | Current implementation is based on Celery. See | ||||
http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for | http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for | ||||
how to use tasks once instantiated | how to use tasks once instantiated | ||||
""" | """ | ||||
abstract = True | abstract = True | ||||
def run(self, *args, **kwargs): | def run(self, *args, **kwargs): | ||||
"""This method is called by the celery worker when a task is received. | """This method is called by the celery worker when a task is received. | ||||
Should not be overridden as we need our special events to be sent for | Should not be overridden as we need our special events to be sent for | ||||
the reccurrent scheduler. Override run_task instead.""" | the reccurrent scheduler. Override run_task instead.""" | ||||
try: | return self.run_task(*args, **kwargs) | ||||
result = self.run_task(*args, **kwargs) | |||||
except Exception as e: | |||||
self.send_event('task-result-exception') | |||||
raise e from None | |||||
else: | |||||
self.send_event('task-result', result=result) | |||||
return result | |||||
def run_task(self, *args, **kwargs): | def run_task(self, *args, **kwargs): | ||||
"""Perform the task. | """Perform the task. | ||||
Must return a json-serializable value as it is passed back to the task | Must return a json-serializable value as it is passed back to the task | ||||
scheduler using a celery event. | scheduler using a celery event. | ||||
""" | """ | ||||
raise NotImplementedError('tasks must implement the run_task() method') | raise NotImplementedError('tasks must implement the run_task() method') | ||||
@property | |||||
def log(self): | |||||
if not hasattr(self, '__log'): | |||||
self.__log = get_task_logger('%s.%s' % | |||||
(__name__, self.__class__.__name__)) | |||||
return self.__log |