diff --git a/swh/scheduler/celery_backend/listener.py b/swh/scheduler/celery_backend/listener.py index 9a4f453..cc419d8 100644 --- a/swh/scheduler/celery_backend/listener.py +++ b/swh/scheduler/celery_backend/listener.py @@ -1,205 +1,210 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import logging import time import sys import click from arrow import utcnow from kombu import Queue import celery from celery.events import EventReceiver +from swh.core.statsd import statsd + class ReliableEventReceiver(EventReceiver): def __init__(self, channel, handlers=None, routing_key='#', node_id=None, app=None, queue_prefix='celeryev', accept=None): super(ReliableEventReceiver, self).__init__( channel, handlers, routing_key, node_id, app, queue_prefix, accept) self.queue = Queue('.'.join([self.queue_prefix, self.node_id]), exchange=self.exchange, routing_key=self.routing_key, auto_delete=False, durable=True) def get_consumers(self, consumer, channel): return [consumer(queues=[self.queue], callbacks=[self._receive], no_ack=False, accept=self.accept)] def _receive(self, bodies, message): if not isinstance(bodies, list): # celery<4 returned body as element bodies = [bodies] for body in bodies: type, body = self.event_from_message(body) self.process(type, body, message) def process(self, type, event, message): """Process the received event by dispatching it to the appropriate handler.""" handler = self.handlers.get(type) or self.handlers.get('*') - handler and handler(event, message) + if handler: + handler(event, message) + statsd.increment('swh_scheduler_listener_handled_event_total', + tags={'event_type': type}) ACTION_SEND_DELAY = datetime.timedelta(seconds=1.0) ACTION_QUEUE_MAX_LENGTH = 1000 def event_monitor(app, backend): logger = logging.getLogger('swh.scheduler.listener') actions = { 'last_send': utcnow() - 2*ACTION_SEND_DELAY, 'queue': [], } def try_perform_actions(actions=actions): logger.debug('Try perform pending actions') if actions['queue'] and ( len(actions['queue']) > ACTION_QUEUE_MAX_LENGTH or utcnow() - actions['last_send'] > ACTION_SEND_DELAY): perform_actions(actions) def perform_actions(actions, backend=backend): logger.info('Perform %s pending actions' % len(actions['queue'])) action_map = { 'start_task_run': backend.start_task_run, 'end_task_run': backend.end_task_run, } messages = [] db = backend.get_db() try: cursor = db.cursor(None) for action in actions['queue']: messages.append(action['message']) function = action_map[action['action']] args = action.get('args', ()) kwargs = action.get('kwargs', {}) kwargs['cur'] = cursor function(*args, **kwargs) except Exception: db.conn.rollback() else: db.conn.commit() finally: backend.put_db(db) for message in messages: if not message.acknowledged: message.ack() actions['queue'] = [] actions['last_send'] = utcnow() def queue_action(action, actions=actions): actions['queue'].append(action) try_perform_actions() def catchall_event(event, message): logger.debug('event: %s %s', event['type'], event.get('name', 'N/A')) if not message.acknowledged: message.ack() try_perform_actions() def task_started(event, message): logger.debug('task_started: %s %s', event['type'], event.get('name', 'N/A')) queue_action({ 'action': 'start_task_run', 'args': [event['uuid']], 'kwargs': { 'timestamp': utcnow(), 'metadata': { 'worker': event['hostname'], }, }, 'message': message, }) def task_succeeded(event, message): logger.debug('task_succeeded: event: %s' % event) logger.debug(' message: %s' % message) result = event['result'] logger.debug('task_succeeded: result: %s' % result) try: status = result.get('status') if status == 'success': status = 'eventful' if result.get('eventful') else 'uneventful' except Exception: status = 'eventful' if result else 'uneventful' queue_action({ 'action': 'end_task_run', 'args': [event['uuid']], 'kwargs': { 'timestamp': utcnow(), 'status': status, 'result': result, }, 'message': message, }) def task_failed(event, message): logger.debug('task_failed: event: %s' % event) logger.debug(' message: %s' % message) queue_action({ 'action': 'end_task_run', 'args': [event['uuid']], 'kwargs': { 'timestamp': utcnow(), 'status': 'failed', }, 'message': message, }) recv = ReliableEventReceiver( celery.current_app.connection(), app=celery.current_app, handlers={ 'task-started': task_started, 'task-result': task_succeeded, 'task-failed': task_failed, '*': catchall_event, }, node_id='listener', ) errors = 0 while True: try: recv.capture(limit=None, timeout=None, wakeup=True) errors = 0 except KeyboardInterrupt: logger.exception('Keyboard interrupt, exiting') break except Exception: logger.exception('Unexpected exception') if errors < 5: time.sleep(errors) errors += 1 else: logger.error('Too many consecutive errors, exiting') sys.exit(1) @click.command() @click.pass_context def main(ctx): click.echo("Deprecated! Use 'swh-scheduler listener' instead.", err=True) ctx.exit(1) if __name__ == '__main__': main() diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py index 9e4dc5f..c9250ee 100644 --- a/swh/scheduler/celery_backend/runner.py +++ b/swh/scheduler/celery_backend/runner.py @@ -1,121 +1,125 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import arrow import logging from kombu.utils.uuid import uuid +from swh.core.statsd import statsd from swh.scheduler import get_scheduler, compute_nb_tasks_from logger = logging.getLogger(__name__) # Max batch size for tasks MAX_NUM_TASKS = 10000 def run_ready_tasks(backend, app): """Run tasks that are ready Args: backend (Scheduler): backend to read tasks to schedule app (App): Celery application to send tasks to Returns: A list of dictionaries:: { 'task': the scheduler's task id, 'backend_id': Celery's task id, 'scheduler': arrow.utcnow() } The result can be used to block-wait for the tasks' results:: backend_tasks = run_ready_tasks(self.scheduler, app) for task in backend_tasks: AsyncResult(id=task['backend_id']).get() """ all_backend_tasks = [] while True: task_types = {} pending_tasks = [] for task_type in backend.get_task_types(): task_type_name = task_type['type'] task_types[task_type_name] = task_type max_queue_length = task_type['max_queue_length'] backend_name = task_type['backend_name'] if max_queue_length: try: queue_length = app.get_queue_length(backend_name) except ValueError: queue_length = None if queue_length is None: # Running without RabbitMQ (probably a test env). num_tasks = MAX_NUM_TASKS else: num_tasks = min(max_queue_length - queue_length, MAX_NUM_TASKS) else: num_tasks = MAX_NUM_TASKS if num_tasks > 0: num_tasks, num_tasks_priority = compute_nb_tasks_from( num_tasks) grabbed_tasks = backend.grab_ready_tasks( task_type_name, num_tasks=num_tasks, num_tasks_priority=num_tasks_priority) if grabbed_tasks: pending_tasks.extend(grabbed_tasks) logger.info('Grabbed %s tasks %s', len(grabbed_tasks), task_type_name) - + statsd.increment( + 'swh_scheduler_runner_scheduled_task_total', + len(grabbed_tasks), + tags={'task_type': task_type_name}) if not pending_tasks: return all_backend_tasks backend_tasks = [] celery_tasks = [] for task in pending_tasks: args = task['arguments']['args'] kwargs = task['arguments']['kwargs'] backend_name = task_types[task['type']]['backend_name'] backend_id = uuid() celery_tasks.append((backend_name, backend_id, args, kwargs)) data = { 'task': task['id'], 'backend_id': backend_id, 'scheduled': arrow.utcnow(), } backend_tasks.append(data) logger.debug('Sent %s celery tasks', len(backend_tasks)) backend.mass_schedule_task_runs(backend_tasks) for backend_name, backend_id, args, kwargs in celery_tasks: app.send_task( backend_name, task_id=backend_id, args=args, kwargs=kwargs, ) all_backend_tasks.extend(backend_tasks) def main(): from .config import app as main_app for module in main_app.conf.CELERY_IMPORTS: __import__(module) main_backend = get_scheduler('local') try: run_ready_tasks(main_backend, main_app) except Exception: main_backend.rollback() raise if __name__ == '__main__': main()