diff --git a/swh/scheduler/celery_backend/listener.py b/swh/scheduler/celery_backend/listener.py --- a/swh/scheduler/celery_backend/listener.py +++ b/swh/scheduler/celery_backend/listener.py @@ -5,6 +5,7 @@ import click import datetime +import logging import socket from arrow import utcnow @@ -26,22 +27,29 @@ exchange=self.exchange, routing_key=self.routing_key, auto_delete=False, - durable=True, - queue_arguments=self._get_queue_arguments()) + durable=True) def get_consumers(self, consumer, channel): return [consumer(queues=[self.queue], callbacks=[self._receive], no_ack=False, accept=self.accept)] - def _receive(self, body, message): - type, body = self.event_from_message(body) - self.process(type, body, message) + def _receive(self, bodies, message): + logging.debug('## event-receiver: bodies: %s' % bodies) + logging.debug('## event-receiver: message: %s' % message) + if not isinstance(bodies, list): # celery<4 returned body as element + bodies = [bodies] + for body in bodies: + type, body = self.event_from_message(body) + self.process(type, body, message) def process(self, type, event, message): """Process the received event by dispatching it to the appropriate handler.""" handler = self.handlers.get(type) or self.handlers.get('*') + logging.debug('## event-receiver: type: %s' % type) + logging.debug('## event-receiver: event: %s' % event) + logging.debug('## event-receiver: handler: %s' % handler) handler and handler(event, message) @@ -93,6 +101,9 @@ try_perform_actions() def task_started(event, message): + logging.debug('#### task_started: event: %s' % event) + logging.debug('#### task_started: message: %s' % message) + queue_action({ 'action': 'start_task_run', 'args': [event['uuid']], @@ -106,8 +117,11 @@ }) def task_succeeded(event, message): + logging.debug('#### task_succeeded: event: %s' % event) + logging.debug('#### task_succeeded: message: %s' % message) result = event['result'] + logging.debug('#### task_succeeded: result: %s' % result) try: status = result.get('status') if status == 'success': @@ -127,6 +141,9 @@ }) def task_failed(event, message): + logging.debug('#### task_failed: event: %s' % event) + logging.debug('#### task_failed: message: %s' % message) + queue_action({ 'action': 'end_task_run', 'args': [event['uuid']], @@ -159,7 +176,12 @@ '--database', '-d', help='Scheduling database DSN') @click.option('--url', '-u', help="(Optional) Scheduler's url access") -def main(cls, database, url): +@click.option('--verbose', is_flag=True, default=False, + help='Default to be silent') +def main(cls, database, url, verbose): + if verbose: + logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO) + scheduler = None override_config = {} if cls == 'local':