Page MenuHomeSoftware Heritage

task.py
No OneTemporary

# Copyright (C) 2015-2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import celery.app.task
from celery.utils.log import get_task_logger
from celery.app.task import TaskType
if TaskType is type:
# From Celery 3.1.25, celery/celery/app/task.py
# Copyright (c) 2015 Ask Solem & contributors. All rights reserved.
# Copyright (c) 2012-2014 GoPivotal, Inc. All rights reserved.
# Copyright (c) 2009, 2010, 2011, 2012 Ask Solem, and individual
# contributors. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the
# distribution.
# * Neither the name of Ask Solem, nor the names of its contributors
# may be used to endorse or promote products derived from this
# software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL Ask Solem OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
# THE POSSIBILITY OF SUCH DAMAGE.
from celery import current_app
from celery.local import Proxy
from celery.utils import gen_task_name
class _CompatShared(object):
def __init__(self, name, cons):
self.name = name
self.cons = cons
def __hash__(self):
return hash(self.name)
def __repr__(self):
return '<OldTask: %r>' % (self.name, )
def __call__(self, app):
return self.cons(app)
class TaskType(type):
"""Meta class for tasks.
Automatically registers the task in the task registry (except if the
:attr:`Task.abstract`` attribute is set).
If no :attr:`Task.name` attribute is provided, then the name is
generated from the module and class name.
"""
_creation_count = {} # used by old non-abstract task classes
def __new__(cls, name, bases, attrs):
new = super(TaskType, cls).__new__
task_module = attrs.get('__module__') or '__main__'
# - Abstract class: abstract attribute should not be inherited.
abstract = attrs.pop('abstract', None)
if abstract or not attrs.get('autoregister', True):
return new(cls, name, bases, attrs)
# The 'app' attribute is now a property, with the real app located
# in the '_app' attribute. Previously this was a regular attribute,
# so we should support classes defining it.
app = attrs.pop('_app', None) or attrs.pop('app', None)
# Attempt to inherit app from one the bases
if not isinstance(app, Proxy) and app is None:
for base in bases:
if getattr(base, '_app', None):
app = base._app
break
else:
app = current_app._get_current_object()
attrs['_app'] = app
# - Automatically generate missing/empty name.
task_name = attrs.get('name')
if not task_name:
attrs['name'] = task_name = gen_task_name(app, name,
task_module)
if not attrs.get('_decorated'):
# non decorated tasks must also be shared in case
# an app is created multiple times due to modules
# imported under multiple names.
# Hairy stuff, here to be compatible with 2.x.
# People should not use non-abstract task classes anymore,
# use the task decorator.
from celery._state import connect_on_app_finalize
unique_name = '.'.join([task_module, name])
if unique_name not in cls._creation_count:
# the creation count is used as a safety
# so that the same task is not added recursively
# to the set of constructors.
cls._creation_count[unique_name] = 1
connect_on_app_finalize(_CompatShared(
unique_name,
lambda app: TaskType.__new__(cls, name, bases,
dict(attrs, _app=app)),
))
# - Create and register class.
# Because of the way import happens (recursively)
# we may or may not be the first time the task tries to register
# with the framework. There should only be one class for each task
# name, so we always return the registered version.
tasks = app._tasks
if task_name not in tasks:
tasks.register(new(cls, name, bases, attrs))
instance = tasks[task_name]
instance.bind(app)
return instance.__class__
class Task(celery.app.task.Task, metaclass=TaskType):
"""a schedulable task (abstract class)
Sub-classes must implement the run() method. Sub-classes that
want their tasks to get routed to a non-default task queue must
override the task_queue attribute.
Current implementation is based on Celery. See
http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for
how to use tasks once instantiated
"""
abstract = True
task_queue = 'celery'
def run(self, *args, **kwargs):
"""This method is called by the celery worker when a task is received.
Should not be overridden as we need our special events to be sent for
the reccurrent scheduler. Override run_task instead."""
try:
result = self.run_task(*args, **kwargs)
except Exception as e:
self.send_event('task-result-exception')
raise e from None
else:
self.send_event('task-result', result=result)
return result
def run_task(self, *args, **kwargs):
"""Perform the task.
Must return a json-serializable value as it is passed back to the task
scheduler using a celery event.
"""
raise NotImplementedError('tasks must implement the run_task() method')
@property
def log(self):
if not hasattr(self, '__log'):
self.__log = get_task_logger('%s.%s' %
(__name__, self.__class__.__name__))
return self.__log

File Metadata

Mime Type
text/x-python
Expires
Tue, Apr 15, 2:21 AM (8 h, 9 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3273441

Event Timeline