diff --git a/swh/scheduler/celery_backend/listener.py b/swh/scheduler/celery_backend/listener.py --- a/swh/scheduler/celery_backend/listener.py +++ b/swh/scheduler/celery_backend/listener.py @@ -3,10 +3,13 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import click import datetime import logging +import time import socket +import sys + +import click from arrow import utcnow from kombu import Queue @@ -90,7 +93,10 @@ backend.commit() for message in messages: - message.ack() + if not message.acknowledged: + message.ack() + else: + logger.info('message already acknowledged: %s', message) actions['queue'] = [] actions['last_send'] = utcnow() @@ -99,7 +105,11 @@ try_perform_actions() def catchall_event(event, message): - message.ack() + logger.info('event: %s, message:%s', event, message) + if not message.acknowledged: + message.ack() + else: + logger.info('message already acknowledged: %s', message) try_perform_actions() def task_started(event, message): @@ -168,7 +178,22 @@ node_id='listener-%s' % socket.gethostname(), ) - recv.capture(limit=None, timeout=None, wakeup=True) + errors = 0 + while True: + try: + recv.capture(limit=None, timeout=None, wakeup=True) + errors = 0 + except KeyboardInterrupt: + logger.exception('Keyboard interrupt, exiting') + break + except Exception: + logger.exception('Unexpected exception') + if errors < 5: + time.sleep(errors) + errors += 1 + else: + logger.error('Too many consecutive errors, exiting') + sys.exit(1) @click.command()