diff --git a/swh/scheduler/celery_backend/config.py b/swh/scheduler/celery_backend/config.py --- a/swh/scheduler/celery_backend/config.py +++ b/swh/scheduler/celery_backend/config.py @@ -44,9 +44,10 @@ We use the command-line loglevel for tasks only, as we never really care about the debug messages from celery. """ - if loglevel is None: loglevel = logging.DEBUG + if isinstance(loglevel, str): + loglevel = logging._nameToLevel[loglevel] formatter = logging.Formatter(format) @@ -65,19 +66,18 @@ systemd_journal.setFormatter(formatter) root_logger.addHandler(systemd_journal) - celery_logger = logging.getLogger('celery') - celery_logger.setLevel(logging.INFO) - + logging.getLogger('celery').setLevel(logging.INFO) + # Silence amqp heartbeat_tick messages + logger = logging.getLogger('amqp') + logger.addFilter(lambda record: not record.msg.startswith( + 'heartbeat_tick')) + logger.setLevel(logging.DEBUG) # Silence useless "Starting new HTTP connection" messages - urllib3_logger = logging.getLogger('urllib3') - urllib3_logger.setLevel(logging.WARNING) - - swh_logger = logging.getLogger('swh') - swh_logger.setLevel(loglevel) + logging.getLogger('urllib3').setLevel(logging.WARNING) + logging.getLogger('swh').setLevel(loglevel) # get_task_logger makes the swh tasks loggers children of celery.task - celery_task_logger = logging.getLogger('celery.task') - celery_task_logger.setLevel(loglevel) + logging.getLogger('celery.task').setLevel(loglevel) @celeryd_after_setup.connect diff --git a/swh/scheduler/celery_backend/listener.py b/swh/scheduler/celery_backend/listener.py --- a/swh/scheduler/celery_backend/listener.py +++ b/swh/scheduler/celery_backend/listener.py @@ -13,7 +13,7 @@ from celery.events import EventReceiver from swh.scheduler import get_scheduler -from .config import app as main_app +from .config import setup_log_handler, app as main_app class ReliableEventReceiver(EventReceiver): @@ -58,19 +58,21 @@ def event_monitor(app, backend): + logger = logging.getLogger('swh.scheduler.listener') actions = { 'last_send': utcnow() - 2*ACTION_SEND_DELAY, 'queue': [], } def try_perform_actions(actions=actions): - if not actions['queue']: - return - if utcnow() - actions['last_send'] > ACTION_SEND_DELAY or \ - len(actions['queue']) > ACTION_QUEUE_MAX_LENGTH: + logger.debug('Try perform pending actions') + if actions['queue'] and ( + len(actions['queue']) > ACTION_QUEUE_MAX_LENGTH or + utcnow() - actions['last_send'] > ACTION_SEND_DELAY): perform_actions(actions) def perform_actions(actions, backend=backend): + logger.info('Perform %s pending actions' % len(actions['queue'])) action_map = { 'start_task_run': backend.start_task_run, 'end_task_run': backend.end_task_run, @@ -101,8 +103,8 @@ try_perform_actions() def task_started(event, message): - logging.debug('#### task_started: event: %s' % event) - logging.debug('#### task_started: message: %s' % message) + logger.debug('task_started: event: %s' % event) + logger.debug('task_started: message: %s' % message) queue_action({ 'action': 'start_task_run', @@ -117,11 +119,11 @@ }) def task_succeeded(event, message): - logging.debug('#### task_succeeded: event: %s' % event) - logging.debug('#### task_succeeded: message: %s' % message) + logger.debug('task_succeeded: event: %s' % event) + logger.debug('task_succeeded: message: %s' % message) result = event['result'] - logging.debug('#### task_succeeded: result: %s' % result) + logger.debug('task_succeeded: result: %s' % result) try: status = result.get('status') if status == 'success': @@ -141,8 +143,8 @@ }) def task_failed(event, message): - logging.debug('#### task_failed: event: %s' % event) - logging.debug('#### task_failed: message: %s' % message) + logger.debug('task_failed: event: %s' % event) + logger.debug('task_failed: message: %s' % message) queue_action({ 'action': 'end_task_run', @@ -176,11 +178,13 @@ '--database', '-d', help='Scheduling database DSN') @click.option('--url', '-u', help="(Optional) Scheduler's url access") -@click.option('--verbose', is_flag=True, default=False, - help='Default to be silent') -def main(cls, database, url, verbose): - if verbose: - logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO) +@click.option('--log-level', '-l', default='INFO', + type=click.Choice(logging._nameToLevel.keys()), + help='Log level (default to INFO)') +def main(cls, database, url, log_level): + setup_log_handler(loglevel=log_level, colorize=False, + format='[%(levelname)s] %(name)s -- %(message)s') + # logging.basicConfig(level=level) scheduler = None override_config = {}