diff --git a/swh/scheduler/celery_backend/listener.py b/swh/scheduler/celery_backend/listener.py --- a/swh/scheduler/celery_backend/listener.py +++ b/swh/scheduler/celery_backend/listener.py @@ -22,12 +22,18 @@ super(ReliableEventReceiver, self).__init__( channel, handlers, routing_key, node_id, app, queue_prefix, accept) + try: + queue_arguments = self._get_queue_arguments() + except AttributeError: # HACK: buster's celery version does + # not support this + queue_arguments = self.queue.queue_arguments + self.queue = Queue('.'.join([self.queue_prefix, self.node_id]), exchange=self.exchange, routing_key=self.routing_key, auto_delete=False, durable=True, - queue_arguments=self._get_queue_arguments()) + queue_arguments=queue_arguments) def get_consumers(self, consumer, channel): return [consumer(queues=[self.queue], @@ -35,6 +41,12 @@ accept=self.accept)] def _receive(self, body, message): + print('## event-receiver: body', body) + print('## event-receiver: message', message) + if isinstance(body, list): # HACK: buster's celery version + # sometimes returns body as list + # of 1 element + body = body[0] type, body = self.event_from_message(body) self.process(type, body, message) @@ -42,6 +54,9 @@ """Process the received event by dispatching it to the appropriate handler.""" handler = self.handlers.get(type) or self.handlers.get('*') + print('## event-receiver: type', type) + print('## event-receiver: event', event) + print('## event-receiver: handler', handler) handler and handler(event, message) @@ -93,6 +108,9 @@ try_perform_actions() def task_started(event, message): + print('#### task_started: event', event) + print('#### task_started: message', message) + queue_action({ 'action': 'start_task_run', 'args': [event['uuid']], @@ -106,8 +124,11 @@ }) def task_succeeded(event, message): + print('#### task_succeeded: event', event) + print('#### task_succeeded: message', message) result = event['result'] + print('#### task_succeeded: result', result) try: status = result.get('status') if status == 'success': @@ -127,6 +148,9 @@ }) def task_failed(event, message): + print('#### task_failed: event', event) + print('#### task_failed: message', message) + queue_action({ 'action': 'end_task_run', 'args': [event['uuid']],