Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/listener.py
Show First 20 Lines • Show All 85 Lines • ▼ Show 20 Lines | def perform_actions(actions, backend=backend): | ||||
for action in actions['queue']: | for action in actions['queue']: | ||||
messages.append(action['message']) | messages.append(action['message']) | ||||
function = action_map[action['action']] | function = action_map[action['action']] | ||||
args = action.get('args', ()) | args = action.get('args', ()) | ||||
kwargs = action.get('kwargs', {}) | kwargs = action.get('kwargs', {}) | ||||
kwargs['cur'] = cursor | kwargs['cur'] = cursor | ||||
function(*args, **kwargs) | function(*args, **kwargs) | ||||
db.commit() | db.conn.commit() | ||||
for message in messages: | for message in messages: | ||||
if not message.acknowledged: | if not message.acknowledged: | ||||
message.ack() | message.ack() | ||||
else: | else: | ||||
logger.info('message already acknowledged: %s', message) | logger.info('message already acknowledged: %s', message) | ||||
actions['queue'] = [] | actions['queue'] = [] | ||||
actions['last_send'] = utcnow() | actions['last_send'] = utcnow() | ||||
▲ Show 20 Lines • Show All 106 Lines • Show Last 20 Lines |