Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/listener.py
# Copyright (C) 2015-2018 The Software Heritage developers | # Copyright (C) 2015-2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import click | |||||
import datetime | import datetime | ||||
import logging | import logging | ||||
import time | |||||
import socket | import socket | ||||
import sys | |||||
import click | |||||
from arrow import utcnow | from arrow import utcnow | ||||
from kombu import Queue | from kombu import Queue | ||||
from celery.events import EventReceiver | from celery.events import EventReceiver | ||||
from swh.scheduler import get_scheduler | from swh.scheduler import get_scheduler | ||||
from .config import setup_log_handler, app as main_app | from .config import setup_log_handler, app as main_app | ||||
▲ Show 20 Lines • Show All 67 Lines • ▼ Show 20 Lines | def perform_actions(actions, backend=backend): | ||||
function = action_map[action['action']] | function = action_map[action['action']] | ||||
args = action.get('args', ()) | args = action.get('args', ()) | ||||
kwargs = action.get('kwargs', {}) | kwargs = action.get('kwargs', {}) | ||||
kwargs['cursor'] = cursor | kwargs['cursor'] = cursor | ||||
function(*args, **kwargs) | function(*args, **kwargs) | ||||
backend.commit() | backend.commit() | ||||
for message in messages: | for message in messages: | ||||
if not message.acknowledged: | |||||
message.ack() | message.ack() | ||||
else: | |||||
logger.info('message already acknowledged: %s', message) | |||||
actions['queue'] = [] | actions['queue'] = [] | ||||
actions['last_send'] = utcnow() | actions['last_send'] = utcnow() | ||||
def queue_action(action, actions=actions): | def queue_action(action, actions=actions): | ||||
actions['queue'].append(action) | actions['queue'].append(action) | ||||
try_perform_actions() | try_perform_actions() | ||||
def catchall_event(event, message): | def catchall_event(event, message): | ||||
logger.info('event: %s, message:%s', event, message) | |||||
if not message.acknowledged: | |||||
message.ack() | message.ack() | ||||
else: | |||||
logger.info('message already acknowledged: %s', message) | |||||
try_perform_actions() | try_perform_actions() | ||||
def task_started(event, message): | def task_started(event, message): | ||||
logger.debug('task_started: event: %s' % event) | logger.debug('task_started: event: %s' % event) | ||||
logger.debug('task_started: message: %s' % message) | logger.debug('task_started: message: %s' % message) | ||||
queue_action({ | queue_action({ | ||||
'action': 'start_task_run', | 'action': 'start_task_run', | ||||
▲ Show 20 Lines • Show All 52 Lines • ▼ Show 20 Lines | recv = ReliableEventReceiver( | ||||
'task-started': task_started, | 'task-started': task_started, | ||||
'task-result': task_succeeded, | 'task-result': task_succeeded, | ||||
'task-failed': task_failed, | 'task-failed': task_failed, | ||||
'*': catchall_event, | '*': catchall_event, | ||||
}, | }, | ||||
node_id='listener-%s' % socket.gethostname(), | node_id='listener-%s' % socket.gethostname(), | ||||
) | ) | ||||
errors = 0 | |||||
while True: | |||||
try: | |||||
recv.capture(limit=None, timeout=None, wakeup=True) | recv.capture(limit=None, timeout=None, wakeup=True) | ||||
errors = 0 | |||||
except KeyboardInterrupt: | |||||
logger.exception('Keyboard interrupt, exiting') | |||||
break | |||||
except Exception: | |||||
logger.exception('Unexpected exception') | |||||
if errors < 5: | |||||
time.sleep(errors) | |||||
errors += 1 | |||||
else: | |||||
logger.error('Too many consecutive errors, exiting') | |||||
vlorentz: this function is getting too long | |||||
sys.exit(1) | |||||
@click.command() | @click.command() | ||||
@click.option('--cls', '-c', default='local', | @click.option('--cls', '-c', default='local', | ||||
help="Scheduler's class, default to 'local'") | help="Scheduler's class, default to 'local'") | ||||
@click.option( | @click.option( | ||||
'--database', '-d', help='Scheduling database DSN') | '--database', '-d', help='Scheduling database DSN') | ||||
@click.option('--url', '-u', | @click.option('--url', '-u', | ||||
Show All 28 Lines |
this function is getting too long