Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/runner.py
# Copyright (C) 2015-2018 The Software Heritage developers | # Copyright (C) 2015-2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import time | |||||
import arrow | import arrow | ||||
from celery import group | from celery import group | ||||
from swh.scheduler import get_scheduler, compute_nb_tasks_from | from swh.scheduler import get_scheduler, compute_nb_tasks_from | ||||
from .config import app as main_app | from .config import app as main_app | ||||
# Max batch size for tasks | # Max batch size for tasks | ||||
MAX_NUM_TASKS = 10000 | MAX_NUM_TASKS = 10000 | ||||
def run_ready_tasks(backend, app): | def run_ready_tasks(backend, app): | ||||
"""Run tasks that are ready | """Run tasks that are ready | ||||
Args | Args | ||||
backend (Scheduler): backend to read tasks to schedule | backend (Scheduler): backend to read tasks to schedule | ||||
app (App): Celery application to send tasks to | app (App): Celery application to send tasks to | ||||
Returns: | |||||
A list of dictionaries: | |||||
{ | |||||
'task': the scheduler's task id, | |||||
'backend_id': Celery's task id, | |||||
'scheduler': arrow.utcnow() | |||||
} | |||||
The result can be used to block-wait for the tasks' results: | |||||
backend_tasks = run_ready_tasks(self.scheduler, app) | |||||
for task in backend_tasks: | |||||
AsyncResult(id=task['backend_id']).get() | |||||
""" | """ | ||||
all_backend_tasks = [] | |||||
while True: | while True: | ||||
throttled = False | |||||
cursor = backend.cursor() | cursor = backend.cursor() | ||||
task_types = {} | task_types = {} | ||||
pending_tasks = [] | pending_tasks = [] | ||||
for task_type in backend.get_task_types(cursor=cursor): | for task_type in backend.get_task_types(cursor=cursor): | ||||
task_type_name = task_type['type'] | task_type_name = task_type['type'] | ||||
task_types[task_type_name] = task_type | task_types[task_type_name] = task_type | ||||
max_queue_length = task_type['max_queue_length'] | max_queue_length = task_type['max_queue_length'] | ||||
if max_queue_length: | |||||
backend_name = task_type['backend_name'] | backend_name = task_type['backend_name'] | ||||
if max_queue_length and backend_name in app.tasks: | |||||
queue_name = app.tasks[backend_name].task_queue | queue_name = app.tasks[backend_name].task_queue | ||||
queue_length = app.get_queue_length(queue_name) | queue_length = app.get_queue_length(queue_name) | ||||
num_tasks = min(max_queue_length - queue_length, | num_tasks = min(max_queue_length - queue_length, | ||||
MAX_NUM_TASKS) | MAX_NUM_TASKS) | ||||
else: | else: | ||||
num_tasks = MAX_NUM_TASKS | num_tasks = MAX_NUM_TASKS | ||||
if num_tasks > 0: | if num_tasks > 0: | ||||
num_tasks, num_tasks_priority = compute_nb_tasks_from( | num_tasks, num_tasks_priority = compute_nb_tasks_from( | ||||
num_tasks) | num_tasks) | ||||
pending_tasks.extend( | pending_tasks.extend( | ||||
backend.grab_ready_tasks( | backend.grab_ready_tasks( | ||||
task_type_name, | task_type_name, | ||||
num_tasks=num_tasks, | num_tasks=num_tasks, | ||||
num_tasks_priority=num_tasks_priority, | num_tasks_priority=num_tasks_priority, | ||||
cursor=cursor)) | cursor=cursor)) | ||||
if not pending_tasks: | if not pending_tasks: | ||||
break | return all_backend_tasks | ||||
douardda: There should be at least an explanation about why this refactoring in the celery runner is… | |||||
celery_tasks = [] | celery_tasks = [] | ||||
for task in pending_tasks: | for task in pending_tasks: | ||||
args = task['arguments']['args'] | args = task['arguments']['args'] | ||||
kwargs = task['arguments']['kwargs'] | kwargs = task['arguments']['kwargs'] | ||||
celery_task = app.tasks[ | celery_task = app.tasks[ | ||||
task_types[task['type']]['backend_name'] | task_types[task['type']]['backend_name'] | ||||
].s(*args, **kwargs) | ].s(*args, **kwargs) | ||||
celery_tasks.append(celery_task) | celery_tasks.append(celery_task) | ||||
group_result = group(celery_tasks).delay() | group_result = group(celery_tasks).delay() | ||||
backend_tasks = [{ | backend_tasks = [{ | ||||
'task': task['id'], | 'task': task['id'], | ||||
'backend_id': group_result.results[i].id, | 'backend_id': group_result.results[i].id, | ||||
'scheduled': arrow.utcnow(), | 'scheduled': arrow.utcnow(), | ||||
} for i, task in enumerate(pending_tasks)] | } for i, task in enumerate(pending_tasks)] | ||||
backend.mass_schedule_task_runs(backend_tasks, cursor=cursor) | backend.mass_schedule_task_runs(backend_tasks, cursor=cursor) | ||||
backend.commit() | backend.commit() | ||||
if throttled: | all_backend_tasks.extend(backend_tasks) | ||||
Done Inline ActionsIf I am understanding this correcly, that will become quite large... (as in too large, MemoryError) [1] https://grafana.softwareheritage.org/d/LmGkNMNik/sofwareheritage-scheduler?orgId=1 ardumont: If I am understanding this correcly, that will become quite large... (as in too large… | |||||
Done Inline ActionsAh but as you pointed out, there is the boundary (max_queue_length and all that ;). ardumont: Ah but as you pointed out, there is the boundary (max_queue_length and all that ;).
Cool… | |||||
time.sleep(10) | |||||
if __name__ == '__main__': | if __name__ == '__main__': | ||||
for module in main_app.conf.CELERY_IMPORTS: | for module in main_app.conf.CELERY_IMPORTS: | ||||
__import__(module) | __import__(module) | ||||
main_backend = get_scheduler('local') | main_backend = get_scheduler('local') | ||||
try: | try: | ||||
run_ready_tasks(main_backend, main_app) | run_ready_tasks(main_backend, main_app) | ||||
except Exception: | except Exception: | ||||
main_backend.rollback() | main_backend.rollback() | ||||
raise | raise |
There should be at least an explanation about why this refactoring in the celery runner is required to add a test fixture, and give a clue what's going one in this refactoring.
Besides, you de facto change the API of the run_ready_tasks function so this must be documented in the doc string and the rationals for this API change should be explained in the commit message.