Page MenuHomeSoftware Heritage

D312.id1028.diff
No OneTemporary

D312.id1028.diff

diff --git a/swh/scheduler/celery_backend/listener.py b/swh/scheduler/celery_backend/listener.py
--- a/swh/scheduler/celery_backend/listener.py
+++ b/swh/scheduler/celery_backend/listener.py
@@ -22,12 +22,18 @@
super(ReliableEventReceiver, self).__init__(
channel, handlers, routing_key, node_id, app, queue_prefix, accept)
+ try:
+ queue_arguments = self._get_queue_arguments()
+ except AttributeError: # HACK: buster's celery version does
+ # not support this
+ queue_arguments = None
+
self.queue = Queue('.'.join([self.queue_prefix, self.node_id]),
exchange=self.exchange,
routing_key=self.routing_key,
auto_delete=False,
durable=True,
- queue_arguments=self._get_queue_arguments())
+ queue_arguments=queue_arguments)
def get_consumers(self, consumer, channel):
return [consumer(queues=[self.queue],
@@ -35,6 +41,12 @@
accept=self.accept)]
def _receive(self, body, message):
+ print('## event-receiver: body', body)
+ print('## event-receiver: message', message)
+ if isinstance(body, list): # HACK: buster's celery version
+ # sometimes returns body as list
+ # of 1 element
+ body = body[0]
type, body = self.event_from_message(body)
self.process(type, body, message)
@@ -42,6 +54,9 @@
"""Process the received event by dispatching it to the appropriate
handler."""
handler = self.handlers.get(type) or self.handlers.get('*')
+ print('## event-receiver: type', type)
+ print('## event-receiver: event', event)
+ print('## event-receiver: handler', handler)
handler and handler(event, message)
@@ -93,6 +108,9 @@
try_perform_actions()
def task_started(event, message):
+ print('#### task_started: event', event)
+ print('#### task_started: message', message)
+
queue_action({
'action': 'start_task_run',
'args': [event['uuid']],
@@ -106,8 +124,11 @@
})
def task_succeeded(event, message):
+ print('#### task_succeeded: event', event)
+ print('#### task_succeeded: message', message)
result = event['result']
+ print('#### task_succeeded: result', result)
try:
status = result.get('status')
if status == 'success':
@@ -127,6 +148,9 @@
})
def task_failed(event, message):
+ print('#### task_failed: event', event)
+ print('#### task_failed: message', message)
+
queue_action({
'action': 'end_task_run',
'args': [event['uuid']],

File Metadata

Mime Type
text/plain
Expires
Sun, Aug 17, 8:48 PM (5 h, 19 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3222110

Event Timeline