diff --git a/swh/scheduler/celery_backend/config.py b/swh/scheduler/celery_backend/config.py index 622efca..350d71e 100644 --- a/swh/scheduler/celery_backend/config.py +++ b/swh/scheduler/celery_backend/config.py @@ -1,212 +1,210 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os import urllib.parse from celery import Celery from celery.signals import setup_logging from celery.utils.log import ColorFormatter from celery.worker.control import Panel from kombu import Exchange, Queue from kombu.five import monotonic as _monotonic import requests from swh.core.config import load_named_config from swh.core.logger import JournalHandler DEFAULT_CONFIG_NAME = 'worker' CONFIG_NAME_ENVVAR = 'SWH_WORKER_INSTANCE' CONFIG_NAME_TEMPLATE = 'worker/%s' DEFAULT_CONFIG = { 'task_broker': ('str', 'amqp://guest@localhost//'), 'task_modules': ('list[str]', []), 'task_queues': ('list[str]', []), 'task_soft_time_limit': ('int', 0), } @setup_logging.connect def setup_log_handler(loglevel=None, logfile=None, format=None, colorize=None, **kwargs): """Setup logging according to Software Heritage preferences. We use the command-line loglevel for tasks only, as we never really care about the debug messages from celery. """ if loglevel is None: loglevel = logging.DEBUG formatter = logging.Formatter(format) root_logger = logging.getLogger('') root_logger.setLevel(logging.INFO) if loglevel == logging.DEBUG: color_formatter = ColorFormatter(format) if colorize else formatter console = logging.StreamHandler() console.setLevel(logging.DEBUG) console.setFormatter(color_formatter) root_logger.addHandler(console) systemd_journal = JournalHandler() systemd_journal.setLevel(logging.DEBUG) systemd_journal.setFormatter(formatter) root_logger.addHandler(systemd_journal) celery_logger = logging.getLogger('celery') celery_logger.setLevel(logging.INFO) # Silence useless "Starting new HTTP connection" messages urllib3_logger = logging.getLogger('urllib3') urllib3_logger.setLevel(logging.WARNING) swh_logger = logging.getLogger('swh') swh_logger.setLevel(loglevel) # get_task_logger makes the swh tasks loggers children of celery.task celery_task_logger = logging.getLogger('celery.task') celery_task_logger.setLevel(loglevel) @Panel.register def monotonic(state): """Get the current value for the monotonic clock""" return {'monotonic': _monotonic()} class TaskRouter: """Route tasks according to the task_queue attribute in the task class""" - def route_for_task(self, task, args=None, kwargs=None): - task_class = app.tasks[task] - if hasattr(task_class, 'task_queue'): - return {'queue': task_class.task_queue} - return None + def route_for_task(self, task, *args, **kwargs): + if task.startswith('swh.'): + return {'queue': task} class CustomCelery(Celery): def get_queue_stats(self, queue_name): """Get the statistics regarding a queue on the broker. Arguments: queue_name: name of the queue to check Returns a dictionary raw from the RabbitMQ management API; or `None` if the current configuration does not use RabbitMQ. Interesting keys: - consumers (number of consumers for the queue) - messages (number of messages in queue) - messages_unacknowledged (number of messages currently being processed) Documentation: https://www.rabbitmq.com/management.html#http-api """ conn_info = self.connection().info() if conn_info['transport'] == 'memory': # We're running in a test environment, without RabbitMQ. return None url = 'http://{hostname}:{port}/api/queues/{vhost}/{queue}'.format( hostname=conn_info['hostname'], port=conn_info['port'] + 10000, vhost=urllib.parse.quote(conn_info['virtual_host'], safe=''), queue=urllib.parse.quote(queue_name, safe=''), ) credentials = (conn_info['userid'], conn_info['password']) r = requests.get(url, auth=credentials) if r.status_code == 404: return {} if r.status_code != 200: raise ValueError('Got error %s when reading queue stats: %s' % ( r.status_code, r.json())) return r.json() def get_queue_length(self, queue_name): """Shortcut to get a queue's length""" stats = self.get_queue_stats(queue_name) if stats: return stats.get('messages') INSTANCE_NAME = os.environ.get(CONFIG_NAME_ENVVAR) if INSTANCE_NAME: CONFIG_NAME = CONFIG_NAME_TEMPLATE % INSTANCE_NAME else: CONFIG_NAME = DEFAULT_CONFIG_NAME # Load the Celery config CONFIG = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) # Celery Queues CELERY_QUEUES = [Queue('celery', Exchange('celery'), routing_key='celery')] for queue in CONFIG['task_queues']: CELERY_QUEUES.append(Queue(queue, Exchange(queue), routing_key=queue)) # Instantiate the Celery app app = CustomCelery() app.conf.update( # The broker broker_url=CONFIG['task_broker'], # Timezone configuration: all in UTC enable_utc=True, timezone='UTC', # Imported modules imports=CONFIG['task_modules'], # Time (in seconds, or a timedelta object) for when after stored task # tombstones will be deleted. None means to never expire results. result_expires=None, # A string identifying the default serialization method to use. Can # be json (default), pickle, yaml, msgpack, or any custom # serialization methods that have been registered with task_serializer='msgpack', # Result serialization format result_serializer='msgpack', # Late ack means the task messages will be acknowledged after the task has # been executed, not just before, which is the default behavior. task_acks_late=True, # A string identifying the default serialization method to use. # Can be pickle (default), json, yaml, msgpack or any custom serialization # methods that have been registered with kombu.serialization.registry accept_content=['msgpack', 'json'], # If True the task will report its status as “started” # when the task is executed by a worker. task_track_started=True, # Default compression used for task messages. Can be gzip, bzip2 # (if available), or any custom compression schemes registered # in the Kombu compression registry. # result_compression='bzip2', # task_compression='bzip2', # Disable all rate limits, even if tasks has explicit rate limits set. # (Disabling rate limits altogether is recommended if you don’t have any # tasks using them.) worker_disable_rate_limits=True, # Task hard time limit in seconds. The worker processing the task will be # killed and replaced with a new one when this is exceeded. # task_time_limit=3600, # Task soft time limit in seconds. # The SoftTimeLimitExceeded exception will be raised when this is exceeded. # The task can catch this to e.g. clean up before the hard time limit # comes. task_soft_time_limit=CONFIG['task_soft_time_limit'], # Task routing task_routes=TaskRouter(), # Task queues this worker will consume from task_queues=CELERY_QUEUES, # Allow pool restarts from remote worker_pool_restarts=True, # Do not prefetch tasks worker_prefetch_multiplier=1, # Send events worker_send_task_events=True, # Do not send useless task_sent events task_send_sent_event=False, ) diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py index 561b0b7..ff9ae6c 100644 --- a/swh/scheduler/celery_backend/runner.py +++ b/swh/scheduler/celery_backend/runner.py @@ -1,114 +1,111 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import arrow -from celery import group from swh.scheduler import get_scheduler, compute_nb_tasks_from from .config import app as main_app # Max batch size for tasks MAX_NUM_TASKS = 10000 def run_ready_tasks(backend, app): """Run tasks that are ready Args: backend (Scheduler): backend to read tasks to schedule app (App): Celery application to send tasks to Returns: A list of dictionaries:: { 'task': the scheduler's task id, 'backend_id': Celery's task id, 'scheduler': arrow.utcnow() } The result can be used to block-wait for the tasks' results:: backend_tasks = run_ready_tasks(self.scheduler, app) for task in backend_tasks: AsyncResult(id=task['backend_id']).get() """ all_backend_tasks = [] while True: cursor = backend.cursor() task_types = {} pending_tasks = [] for task_type in backend.get_task_types(cursor=cursor): task_type_name = task_type['type'] task_types[task_type_name] = task_type max_queue_length = task_type['max_queue_length'] backend_name = task_type['backend_name'] - if max_queue_length and backend_name in app.tasks: - queue_name = app.tasks[backend_name].task_queue + if max_queue_length: try: - queue_length = app.get_queue_length(queue_name) + queue_length = app.get_queue_length(backend_name) except ValueError: queue_length = None if queue_length is None: # Running without RabbitMQ (probably a test env). num_tasks = MAX_NUM_TASKS else: num_tasks = min(max_queue_length - queue_length, MAX_NUM_TASKS) else: num_tasks = MAX_NUM_TASKS if num_tasks > 0: num_tasks, num_tasks_priority = compute_nb_tasks_from( num_tasks) pending_tasks.extend( backend.grab_ready_tasks( task_type_name, num_tasks=num_tasks, num_tasks_priority=num_tasks_priority, cursor=cursor)) if not pending_tasks: return all_backend_tasks - celery_tasks = [] + backend_tasks = [] for task in pending_tasks: args = task['arguments']['args'] kwargs = task['arguments']['kwargs'] - celery_task = app.tasks[ - task_types[task['type']]['backend_name'] - ].s(*args, **kwargs) + backend_name = task_types[task['type']]['backend_name'] + celery_result = app.send_task( + backend_name, args=args, kwargs=kwargs, + ) - celery_tasks.append(celery_task) + data = { + 'task': task['id'], + 'backend_id': celery_result.id, + 'scheduled': arrow.utcnow(), + } - group_result = group(celery_tasks).delay() - - backend_tasks = [{ - 'task': task['id'], - 'backend_id': group_result.results[i].id, - 'scheduled': arrow.utcnow(), - } for i, task in enumerate(pending_tasks)] + backend_tasks.append(data) backend.mass_schedule_task_runs(backend_tasks, cursor=cursor) backend.commit() all_backend_tasks.extend(backend_tasks) if __name__ == '__main__': for module in main_app.conf.CELERY_IMPORTS: __import__(module) main_backend = get_scheduler('local') try: run_ready_tasks(main_backend, main_app) except Exception: main_backend.rollback() raise diff --git a/swh/scheduler/task.py b/swh/scheduler/task.py index c2ca450..d26eb56 100644 --- a/swh/scheduler/task.py +++ b/swh/scheduler/task.py @@ -1,179 +1,178 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import celery.app.task from celery.utils.log import get_task_logger from celery.app.task import TaskType if TaskType is type: # From Celery 3.1.25, celery/celery/app/task.py # Copyright (c) 2015 Ask Solem & contributors. All rights reserved. # Copyright (c) 2012-2014 GoPivotal, Inc. All rights reserved. # Copyright (c) 2009, 2010, 2011, 2012 Ask Solem, and individual # contributors. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are # met: # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the # distribution. # * Neither the name of Ask Solem, nor the names of its contributors # may be used to endorse or promote products derived from this # software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, # THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL Ask Solem OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # THE POSSIBILITY OF SUCH DAMAGE. from celery import current_app from celery.local import Proxy from celery.utils import gen_task_name class _CompatShared(object): def __init__(self, name, cons): self.name = name self.cons = cons def __hash__(self): return hash(self.name) def __repr__(self): return '' % (self.name, ) def __call__(self, app): return self.cons(app) class TaskType(type): """Meta class for tasks. Automatically registers the task in the task registry (except if the :attr:`Task.abstract`` attribute is set). If no :attr:`Task.name` attribute is provided, then the name is generated from the module and class name. """ _creation_count = {} # used by old non-abstract task classes def __new__(cls, name, bases, attrs): new = super(TaskType, cls).__new__ task_module = attrs.get('__module__') or '__main__' # - Abstract class: abstract attribute should not be inherited. abstract = attrs.pop('abstract', None) if abstract or not attrs.get('autoregister', True): return new(cls, name, bases, attrs) # The 'app' attribute is now a property, with the real app located # in the '_app' attribute. Previously this was a regular attribute, # so we should support classes defining it. app = attrs.pop('_app', None) or attrs.pop('app', None) # Attempt to inherit app from one the bases if not isinstance(app, Proxy) and app is None: for base in bases: if getattr(base, '_app', None): app = base._app break else: app = current_app._get_current_object() attrs['_app'] = app # - Automatically generate missing/empty name. task_name = attrs.get('name') if not task_name: attrs['name'] = task_name = gen_task_name(app, name, task_module) if not attrs.get('_decorated'): # non decorated tasks must also be shared in case # an app is created multiple times due to modules # imported under multiple names. # Hairy stuff, here to be compatible with 2.x. # People should not use non-abstract task classes anymore, # use the task decorator. from celery._state import connect_on_app_finalize unique_name = '.'.join([task_module, name]) if unique_name not in cls._creation_count: # the creation count is used as a safety # so that the same task is not added recursively # to the set of constructors. cls._creation_count[unique_name] = 1 connect_on_app_finalize(_CompatShared( unique_name, lambda app: TaskType.__new__(cls, name, bases, dict(attrs, _app=app)), )) # - Create and register class. # Because of the way import happens (recursively) # we may or may not be the first time the task tries to register # with the framework. There should only be one class for each task # name, so we always return the registered version. tasks = app._tasks if task_name not in tasks: tasks.register(new(cls, name, bases, attrs)) instance = tasks[task_name] instance.bind(app) return instance.__class__ class Task(celery.app.task.Task, metaclass=TaskType): """a schedulable task (abstract class) Sub-classes must implement the run_task() method. Sub-classes that want their tasks to get routed to a non-default task queue must override the task_queue attribute. Current implementation is based on Celery. See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for how to use tasks once instantiated """ abstract = True - task_queue = 'celery' def run(self, *args, **kwargs): """This method is called by the celery worker when a task is received. Should not be overridden as we need our special events to be sent for the reccurrent scheduler. Override run_task instead.""" try: result = self.run_task(*args, **kwargs) except Exception as e: self.send_event('task-result-exception') raise e from None else: self.send_event('task-result', result=result) return result def run_task(self, *args, **kwargs): """Perform the task. Must return a json-serializable value as it is passed back to the task scheduler using a celery event. """ raise NotImplementedError('tasks must implement the run_task() method') @property def log(self): if not hasattr(self, '__log'): self.__log = get_task_logger('%s.%s' % (__name__, self.__class__.__name__)) return self.__log diff --git a/swh/scheduler/tests/scheduler_testing.py b/swh/scheduler/tests/scheduler_testing.py index ba11ec3..ef4e441 100644 --- a/swh/scheduler/tests/scheduler_testing.py +++ b/swh/scheduler/tests/scheduler_testing.py @@ -1,71 +1,76 @@ import glob import pytest import os.path import datetime from celery.result import AsyncResult from celery.contrib.testing.worker import start_worker import celery.contrib.testing.tasks # noqa from swh.core.tests.db_testing import DbTestFixture, DB_DUMP_TYPES from swh.core.utils import numfile_sortkey as sortkey from swh.scheduler import get_scheduler from swh.scheduler.celery_backend.runner import run_ready_tasks from swh.scheduler.celery_backend.config import app from swh.scheduler.tests.celery_testing import CeleryTestFixture from . import SQL_DIR DUMP_FILES = os.path.join(SQL_DIR, '*.sql') @pytest.mark.db class SchedulerTestFixture(CeleryTestFixture, DbTestFixture): """Base class for test case classes, providing an SWH scheduler as the `scheduler` attribute.""" SCHEDULER_DB_NAME = 'softwareheritage-scheduler-test-fixture' def add_scheduler_task_type(self, task_type, backend_name): task_type = { 'type': task_type, 'description': 'Update a git repository', 'backend_name': backend_name, 'default_interval': datetime.timedelta(days=64), 'min_interval': datetime.timedelta(hours=12), 'max_interval': datetime.timedelta(days=64), 'backoff_factor': 2, 'max_queue_length': None, 'num_retries': 7, 'retry_delay': datetime.timedelta(hours=2), } self.scheduler.create_task_type(task_type) def run_ready_tasks(self): """Runs the scheduler and a Celery worker, then blocks until all tasks are completed.""" + + # Make sure the worker is listening to all task-specific queues + for task in self.scheduler.get_task_types(): + app.amqp.queues.select_add(task['backend_name']) + with start_worker(app): backend_tasks = run_ready_tasks(self.scheduler, app) for task in backend_tasks: AsyncResult(id=task['backend_id']).get() @classmethod def setUpClass(cls): all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) all_dump_files = [(x, DB_DUMP_TYPES[os.path.splitext(x)[1]]) for x in all_dump_files] cls.add_db(name=cls.SCHEDULER_DB_NAME, dumps=all_dump_files) super().setUpClass() def setUp(self): super().setUp() self.scheduler_config = { 'scheduling_db': 'dbname=' + self.SCHEDULER_DB_NAME} self.scheduler = get_scheduler('local', self.scheduler_config) def tearDown(self): self.scheduler.close_connection() super().tearDown()