Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/listener.py
Show First 20 Lines • Show All 75 Lines • ▼ Show 20 Lines | def event_monitor(app, backend): | ||||
def perform_actions(actions, backend=backend): | def perform_actions(actions, backend=backend): | ||||
logger.info('Perform %s pending actions' % len(actions['queue'])) | logger.info('Perform %s pending actions' % len(actions['queue'])) | ||||
action_map = { | action_map = { | ||||
'start_task_run': backend.start_task_run, | 'start_task_run': backend.start_task_run, | ||||
'end_task_run': backend.end_task_run, | 'end_task_run': backend.end_task_run, | ||||
} | } | ||||
messages = [] | messages = [] | ||||
cursor = backend.cursor() | db = backend.get_db() | ||||
cursor = db.cursor(None) | |||||
for action in actions['queue']: | for action in actions['queue']: | ||||
messages.append(action['message']) | messages.append(action['message']) | ||||
function = action_map[action['action']] | function = action_map[action['action']] | ||||
args = action.get('args', ()) | args = action.get('args', ()) | ||||
kwargs = action.get('kwargs', {}) | kwargs = action.get('kwargs', {}) | ||||
kwargs['cursor'] = cursor | kwargs['cur'] = cursor | ||||
function(*args, **kwargs) | function(*args, **kwargs) | ||||
backend.commit() | db.commit() | ||||
for message in messages: | for message in messages: | ||||
if not message.acknowledged: | if not message.acknowledged: | ||||
message.ack() | message.ack() | ||||
else: | else: | ||||
logger.info('message already acknowledged: %s', message) | logger.info('message already acknowledged: %s', message) | ||||
actions['queue'] = [] | actions['queue'] = [] | ||||
actions['last_send'] = utcnow() | actions['last_send'] = utcnow() | ||||
▲ Show 20 Lines • Show All 106 Lines • Show Last 20 Lines |