diff --git a/swh/scheduler/celery_backend/listener.py b/swh/scheduler/celery_backend/listener.py --- a/swh/scheduler/celery_backend/listener.py +++ b/swh/scheduler/celery_backend/listener.py @@ -5,6 +5,7 @@ import click import datetime +import logging import socket from arrow import utcnow @@ -22,12 +23,18 @@ super(ReliableEventReceiver, self).__init__( channel, handlers, routing_key, node_id, app, queue_prefix, accept) + try: + queue_arguments = self._get_queue_arguments() + except AttributeError: # HACK: buster's celery version does + # not support this + queue_arguments = self.queue.queue_arguments + self.queue = Queue('.'.join([self.queue_prefix, self.node_id]), exchange=self.exchange, routing_key=self.routing_key, auto_delete=False, durable=True, - queue_arguments=self._get_queue_arguments()) + queue_arguments=queue_arguments) def get_consumers(self, consumer, channel): return [consumer(queues=[self.queue], @@ -35,6 +42,12 @@ accept=self.accept)] def _receive(self, body, message): + logging.debug('## event-receiver: body: %s' % body) + logging.debug('## event-receiver: message: %s' % message) + if isinstance(body, list): # HACK: buster's celery version + # sometimes returns body as list + # of 1 element + body = body[0] type, body = self.event_from_message(body) self.process(type, body, message) @@ -42,6 +55,9 @@ """Process the received event by dispatching it to the appropriate handler.""" handler = self.handlers.get(type) or self.handlers.get('*') + logging.debug('## event-receiver: type: %s' % type) + logging.debug('## event-receiver: event: %s' % event) + logging.debug('## event-receiver: handler: %s' % handler) handler and handler(event, message) @@ -93,6 +109,9 @@ try_perform_actions() def task_started(event, message): + logging.debug('#### task_started: event: %s' % event) + logging.debug('#### task_started: message: %s' % message) + queue_action({ 'action': 'start_task_run', 'args': [event['uuid']], @@ -106,8 +125,11 @@ }) def task_succeeded(event, message): + logging.debug('#### task_succeeded: event: %s' % event) + logging.debug('#### task_succeeded: message: %s' % message) result = event['result'] + logging.debug('#### task_succeeded: result: %s' % result) try: status = result.get('status') if status == 'success': @@ -127,6 +149,9 @@ }) def task_failed(event, message): + logging.debug('#### task_failed: event: %s' % event) + logging.debug('#### task_failed: message: %s' % message) + queue_action({ 'action': 'end_task_run', 'args': [event['uuid']], @@ -159,7 +184,12 @@ '--database', '-d', help='Scheduling database DSN') @click.option('--url', '-u', help="(Optional) Scheduler's url access") -def main(cls, database, url): +@click.option('--verbose', is_flag=True, default=False, + help='Default to be silent') +def main(cls, database, url, verbose): + if verbose: + logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO) + scheduler = None override_config = {} if cls == 'local':