diff --git a/requirements.txt b/requirements.txt index 7346acd..d8e1fee 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,11 +1,11 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html arrow -celery<4 +celery Click psycopg2 vcversioner diff --git a/swh/scheduler/task.py b/swh/scheduler/task.py index 053d3e3..a6d1b38 100644 --- a/swh/scheduler/task.py +++ b/swh/scheduler/task.py @@ -1,53 +1,179 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import celery +import celery.app.task from celery.utils.log import get_task_logger +from celery.app.task import TaskType +if TaskType is type: + # From Celery 3.1.25, celery/celery/app/task.py + # Copyright (c) 2015 Ask Solem & contributors. All rights reserved. + # Copyright (c) 2012-2014 GoPivotal, Inc. All rights reserved. + # Copyright (c) 2009, 2010, 2011, 2012 Ask Solem, and individual + # contributors. All rights reserved. + # + # Redistribution and use in source and binary forms, with or without + # modification, are permitted provided that the following conditions are + # met: + # * Redistributions of source code must retain the above copyright + # notice, this list of conditions and the following disclaimer. + # * Redistributions in binary form must reproduce the above copyright + # notice, this list of conditions and the following disclaimer in the + # documentation and/or other materials provided with the + # distribution. + # * Neither the name of Ask Solem, nor the names of its contributors + # may be used to endorse or promote products derived from this + # software without specific prior written permission. + # + # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS + # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + # THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL Ask Solem OR CONTRIBUTORS BE + # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + # THE POSSIBILITY OF SUCH DAMAGE. -class Task(celery.Task): + from celery import current_app + from celery.local import Proxy + from celery.utils import gen_task_name + + class _CompatShared(object): + + def __init__(self, name, cons): + self.name = name + self.cons = cons + + def __hash__(self): + return hash(self.name) + + def __repr__(self): + return '' % (self.name, ) + + def __call__(self, app): + return self.cons(app) + + class TaskType(type): + """Meta class for tasks. + + Automatically registers the task in the task registry (except if the + :attr:`Task.abstract`` attribute is set). + + If no :attr:`Task.name` attribute is provided, then the name is + generated from the module and class name. + + """ + _creation_count = {} # used by old non-abstract task classes + + def __new__(cls, name, bases, attrs): + new = super(TaskType, cls).__new__ + task_module = attrs.get('__module__') or '__main__' + + # - Abstract class: abstract attribute should not be inherited. + abstract = attrs.pop('abstract', None) + if abstract or not attrs.get('autoregister', True): + return new(cls, name, bases, attrs) + + # The 'app' attribute is now a property, with the real app located + # in the '_app' attribute. Previously this was a regular attribute, + # so we should support classes defining it. + app = attrs.pop('_app', None) or attrs.pop('app', None) + + # Attempt to inherit app from one the bases + if not isinstance(app, Proxy) and app is None: + for base in bases: + if getattr(base, '_app', None): + app = base._app + break + else: + app = current_app._get_current_object() + attrs['_app'] = app + + # - Automatically generate missing/empty name. + task_name = attrs.get('name') + if not task_name: + attrs['name'] = task_name = gen_task_name(app, name, + task_module) + + if not attrs.get('_decorated'): + # non decorated tasks must also be shared in case + # an app is created multiple times due to modules + # imported under multiple names. + # Hairy stuff, here to be compatible with 2.x. + # People should not use non-abstract task classes anymore, + # use the task decorator. + from celery._state import connect_on_app_finalize + unique_name = '.'.join([task_module, name]) + if unique_name not in cls._creation_count: + # the creation count is used as a safety + # so that the same task is not added recursively + # to the set of constructors. + cls._creation_count[unique_name] = 1 + connect_on_app_finalize(_CompatShared( + unique_name, + lambda app: TaskType.__new__(cls, name, bases, + dict(attrs, _app=app)), + )) + + # - Create and register class. + # Because of the way import happens (recursively) + # we may or may not be the first time the task tries to register + # with the framework. There should only be one class for each task + # name, so we always return the registered version. + tasks = app._tasks + if task_name not in tasks: + tasks.register(new(cls, name, bases, attrs)) + instance = tasks[task_name] + instance.bind(app) + return instance.__class__ + + +class Task(celery.app.task.Task, metaclass=TaskType): """a schedulable task (abstract class) Sub-classes must implement the run() method. Sub-classes that want their tasks to get routed to a non-default task queue must override the task_queue attribute. Current implementation is based on Celery. See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for how to use tasks once instantiated """ abstract = True task_queue = 'celery' def run(self, *args, **kwargs): """This method is called by the celery worker when a task is received. Should not be overridden as we need our special events to be sent for the reccurrent scheduler. Override run_task instead.""" try: result = self.run_task(*args, **kwargs) except Exception as e: self.send_event('task-result-exception') raise e from None else: self.send_event('task-result', result=result) return result def run_task(self, *args, **kwargs): """Perform the task. Must return a json-serializable value as it is passed back to the task scheduler using a celery event. """ raise NotImplementedError('tasks must implement the run_task() method') @property def log(self): if not hasattr(self, '__log'): self.__log = get_task_logger('%s.%s' % (__name__, self.__class__.__name__)) return self.__log diff --git a/swh/scheduler/tests/test_task.py b/swh/scheduler/tests/test_task.py index bdcd205..3d957ba 100644 --- a/swh/scheduler/tests/test_task.py +++ b/swh/scheduler/tests/test_task.py @@ -1,31 +1,31 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from nose.tools import istest from swh.scheduler import task class Task(unittest.TestCase): @istest def not_implemented_task(self): class NotImplementedTask(task.Task): pass with self.assertRaises(NotImplementedError): NotImplementedTask().run() @istest def add_task(self): class AddTask(task.Task): - def run(self, x, y): + def run_task(self, x, y): return x + y r = AddTask().apply([2, 3]) self.assertTrue(r.successful()) self.assertEqual(r.result, 5)