diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -17,6 +17,9 @@ [mypy-kombu.*] ignore_missing_imports = True +[mypy-pika.*] +ignore_missing_imports = True + [mypy-pkg_resources.*] ignore_missing_imports = True diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -7,6 +7,7 @@ Click elasticsearch > 5.4 flask +pika >= 1.1.0 psycopg2 pyyaml vcversioner diff --git a/swh/scheduler/celery_backend/pika_listener.py b/swh/scheduler/celery_backend/pika_listener.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/celery_backend/pika_listener.py @@ -0,0 +1,108 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime +import json +import logging +import sys + +import pika + +from swh.core.statsd import statsd +from swh.scheduler import get_scheduler + +logger = logging.getLogger(__name__) + + +def utcnow(): + return datetime.datetime.now(tz=datetime.timezone.utc) + + +def get_listener(broker_url, queue_name, scheduler_backend): + connection = pika.BlockingConnection( + pika.URLParameters(broker_url) + ) + channel = connection.channel() + + channel.queue_declare(queue=queue_name, durable=True) + + exchange = 'celeryev' + routing_key = '#' + channel.queue_bind(queue=queue_name, exchange=exchange, + routing_key=routing_key) + + channel.basic_qos(prefetch_count=1000) + + channel.basic_consume( + queue=queue_name, + on_message_callback=get_on_message(scheduler_backend), + ) + + return channel + + +def get_on_message(scheduler_backend): + def on_message(channel, method_frame, properties, body): + try: + events = json.loads(body) + except Exception: + logger.warning('Could not parse body %r', body) + events = [] + + if not isinstance(events, list): + events = [events] + + for event in events: + logger.debug('Received event %r', event) + process_event(event, scheduler_backend) + + channel.basic_ack(delivery_tag=method_frame.delivery_tag) + + return on_message + + +def process_event(event, scheduler_backend): + uuid = event.get('uuid') + if not uuid: + return + + event_type = event['type'] + statsd.increment('swh_scheduler_listener_handled_event_total', + tags={'event_type': event_type}) + + if event_type == 'task-started': + scheduler_backend.start_task_run( + uuid, timestamp=utcnow(), + metadata={'worker': event.get('hostname')}, + ) + elif event_type == 'task-result': + result = event['result'] + + status = None + + if isinstance(result, dict) and 'status' in result: + status = result['status'] + if status == 'success': + status = 'eventful' if result.get('eventful') else 'uneventful' + + if status is None: + status = 'eventful' if result else 'uneventful' + + scheduler_backend.end_task_run(uuid, timestamp=utcnow(), + status=status, result=result) + elif event_type == 'task-failed': + scheduler_backend.end_task_run(uuid, timestamp=utcnow(), + status='failed') + + +if __name__ == '__main__': + url = sys.argv[1] + logging.basicConfig(level=logging.DEBUG) + scheduler_backend = get_scheduler('local', args={ + 'db': 'service=swh-scheduler' + }) + channel = get_listener(url, 'celeryev.test', scheduler_backend) + logger.info('Start consuming') + channel.start_consuming() diff --git a/swh/scheduler/cli/admin.py b/swh/scheduler/cli/admin.py --- a/swh/scheduler/cli/admin.py +++ b/swh/scheduler/cli/admin.py @@ -53,16 +53,21 @@ This service is responsible for listening at task lifecycle events and handle their workflow status in the database.""" - scheduler = ctx.obj['scheduler'] - if not scheduler: + scheduler_backend = ctx.obj['scheduler'] + if not scheduler_backend: raise ValueError('Scheduler class (local/remote) must be instantiated') - from swh.scheduler.celery_backend.config import build_app - app = build_app(ctx.obj['config'].get('celery')) - app.set_current() + broker = ctx.obj['config']\ + .get('celery', {})\ + .get('task_broker', 'amqp://guest@localhost/%2f') + + from swh.scheduler.celery_backend.pika_listener import get_listener - from swh.scheduler.celery_backend.listener import event_monitor - event_monitor(app, backend=scheduler) + listener = get_listener(broker, 'celeryev.listener', scheduler_backend) + try: + listener.start_consuming() + finally: + listener.stop_consuming() @cli.command('rpc-serve')