Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/runner.py
# Copyright (C) 2015-2018 The Software Heritage developers | # Copyright (C) 2015-2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import arrow | import arrow | ||||
import logging | import logging | ||||
from kombu.utils.uuid import uuid | from kombu.utils.uuid import uuid | ||||
from swh.core.statsd import statsd | |||||
from swh.scheduler import get_scheduler, compute_nb_tasks_from | from swh.scheduler import get_scheduler, compute_nb_tasks_from | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
# Max batch size for tasks | # Max batch size for tasks | ||||
MAX_NUM_TASKS = 10000 | MAX_NUM_TASKS = 10000 | ||||
▲ Show 20 Lines • Show All 50 Lines • ▼ Show 20 Lines | while True: | ||||
grabbed_tasks = backend.grab_ready_tasks( | grabbed_tasks = backend.grab_ready_tasks( | ||||
task_type_name, | task_type_name, | ||||
num_tasks=num_tasks, | num_tasks=num_tasks, | ||||
num_tasks_priority=num_tasks_priority) | num_tasks_priority=num_tasks_priority) | ||||
if grabbed_tasks: | if grabbed_tasks: | ||||
pending_tasks.extend(grabbed_tasks) | pending_tasks.extend(grabbed_tasks) | ||||
logger.info('Grabbed %s tasks %s', | logger.info('Grabbed %s tasks %s', | ||||
len(grabbed_tasks), task_type_name) | len(grabbed_tasks), task_type_name) | ||||
statsd.increment( | |||||
'swh_scheduler_runner_scheduled_task_total', | |||||
len(grabbed_tasks), | |||||
tags={'task_type': task_type_name}) | |||||
if not pending_tasks: | if not pending_tasks: | ||||
return all_backend_tasks | return all_backend_tasks | ||||
backend_tasks = [] | backend_tasks = [] | ||||
celery_tasks = [] | celery_tasks = [] | ||||
for task in pending_tasks: | for task in pending_tasks: | ||||
args = task['arguments']['args'] | args = task['arguments']['args'] | ||||
kwargs = task['arguments']['kwargs'] | kwargs = task['arguments']['kwargs'] | ||||
Show All 37 Lines |