diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py --- a/swh/scheduler/celery_backend/runner.py +++ b/swh/scheduler/celery_backend/runner.py @@ -4,11 +4,12 @@ # See top-level LICENSE file for more information import arrow +import logging from swh.scheduler import get_scheduler, compute_nb_tasks_from -from .config import app as main_app +logger = logging.getLogger(__name__) # Max batch size for tasks MAX_NUM_TASKS = 10000 @@ -64,12 +65,15 @@ num_tasks, num_tasks_priority = compute_nb_tasks_from( num_tasks) - pending_tasks.extend( - backend.grab_ready_tasks( + grabbed_tasks = backend.grab_ready_tasks( task_type_name, num_tasks=num_tasks, num_tasks_priority=num_tasks_priority, - cursor=cursor)) + cursor=cursor) + if grabbed_tasks: + pending_tasks.extend(grabbed_tasks) + logger.info('Grabbed %s tasks %s', + len(grabbed_tasks), task_type_name) if not pending_tasks: return all_backend_tasks @@ -83,7 +87,6 @@ celery_result = app.send_task( backend_name, args=args, kwargs=kwargs, ) - data = { 'task': task['id'], 'backend_id': celery_result.id, @@ -91,15 +94,15 @@ } backend_tasks.append(data) + logger.debug('Sent %s celery tasks', len(backend_tasks)) backend.mass_schedule_task_runs(backend_tasks, cursor=cursor) - backend.commit() - all_backend_tasks.extend(backend_tasks) def main(): + from .config import app as main_app for module in main_app.conf.CELERY_IMPORTS: __import__(module) diff --git a/swh/scheduler/cli.py b/swh/scheduler/cli.py --- a/swh/scheduler/cli.py +++ b/swh/scheduler/cli.py @@ -498,9 +498,11 @@ logger.debug('Scheduler %s' % scheduler) try: while True: - logger.info('Run ready tasks') + logger.debug('Run ready tasks') try: - run_ready_tasks(scheduler, app) + ntasks = len(run_ready_tasks(scheduler, app)) + if ntasks: + logger.info('Scheduled %s tasks', ntasks) except Exception: scheduler.rollback() logger.exception('Unexpected error in run_ready_tasks()')