diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py --- a/swh/scheduler/celery_backend/runner.py +++ b/swh/scheduler/celery_backend/runner.py @@ -17,7 +17,12 @@ logger = logging.getLogger(__name__) -def run_ready_tasks(backend: SchedulerInterface, app) -> List[Dict]: +def run_ready_tasks( + backend: SchedulerInterface, + app, + task_types_list: List[Dict] = [], + with_priority: bool = False, +) -> List[Dict]: """Schedule tasks ready to be scheduled. This lookups any tasks per task type and mass schedules those accordingly (send @@ -51,7 +56,7 @@ while True: task_types = {} pending_tasks = [] - for task_type in backend.get_task_types(): + for task_type in task_types_list: task_type_name = task_type["type"] task_types[task_type_name] = task_type @@ -59,40 +64,42 @@ backend_name = task_type["backend_name"] num_tasks = get_available_slots(app, backend_name, max_queue_length) - # only pull tasks if the buffer is at least 1/5th empty (= 80% - # full), to help postgresql use properly indexed queries. - if num_tasks > min(MAX_NUM_TASKS, max_queue_length) // 5: - # Only grab num_tasks tasks with no priority - grabbed_tasks = backend.grab_ready_tasks( - task_type_name, num_tasks=num_tasks + if with_priority: + # grab max_queue_length (or 10) potential tasks with any priority for + # the same type (limit the result to avoid too long running queries) + grabbed_priority_tasks = backend.grab_ready_priority_tasks( + task_type_name, num_tasks=max_queue_length or 10 ) - if grabbed_tasks: - pending_tasks.extend(grabbed_tasks) + if grabbed_priority_tasks: + pending_tasks.extend(grabbed_priority_tasks) logger.info( - "Grabbed %s tasks %s", len(grabbed_tasks), task_type_name + "Grabbed %s tasks %s (priority)", + len(grabbed_priority_tasks), + task_type_name, ) statsd.increment( "swh_scheduler_runner_scheduled_task_total", - len(grabbed_tasks), + len(grabbed_priority_tasks), tags={"task_type": task_type_name}, ) - # grab max_queue_length (or 10) potential tasks with any priority for the - # same type (limit the result to avoid too long running queries) - grabbed_priority_tasks = backend.grab_ready_priority_tasks( - task_type_name, num_tasks=max_queue_length or 10 - ) - if grabbed_priority_tasks: - pending_tasks.extend(grabbed_priority_tasks) - logger.info( - "Grabbed %s tasks %s (priority)", - len(grabbed_priority_tasks), - task_type_name, - ) - statsd.increment( - "swh_scheduler_runner_scheduled_task_total", - len(grabbed_priority_tasks), - tags={"task_type": task_type_name}, - ) + else: + # only pull tasks if the buffer is at least 1/5th empty (= 80% + # full), to help postgresql use properly indexed queries. + if num_tasks > min(MAX_NUM_TASKS, max_queue_length) // 5: + # Only grab num_tasks tasks with no priority + grabbed_tasks = backend.grab_ready_tasks( + task_type_name, num_tasks=num_tasks + ) + if grabbed_tasks: + pending_tasks.extend(grabbed_tasks) + logger.info( + "Grabbed %s tasks %s", len(grabbed_tasks), task_type_name + ) + statsd.increment( + "swh_scheduler_runner_scheduled_task_total", + len(grabbed_tasks), + tags={"task_type": task_type_name}, + ) if not pending_tasks: return all_backend_tasks diff --git a/swh/scheduler/cli/admin.py b/swh/scheduler/cli/admin.py --- a/swh/scheduler/cli/admin.py +++ b/swh/scheduler/cli/admin.py @@ -1,4 +1,4 @@ -# Copyright (C) 2016-2019 The Software Heritage developers +# Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -23,8 +23,26 @@ "executed. Set to 0 (default) for a one shot." ), ) +@click.option( + "--task-types", + "task_type_names", + default=None, + help=( + "Task types to scheduler. If not provided, this iterates over every " + "task types referenced in the scheduler backend." + ), +) +@click.option( + "--with-priority", + is_flag=True, + default=False, + help=( + "Determine if those tasks should be the ones with priority or not." + "By default, this deals with tasks without any priority." + ), +) @click.pass_context -def runner(ctx, period): +def runner(ctx, period, task_type_names, with_priority): """Starts a swh-scheduler runner service. This process is responsible for checking for ready-to-run tasks and @@ -32,17 +50,29 @@ from swh.scheduler.celery_backend.config import build_app from swh.scheduler.celery_backend.runner import run_ready_tasks - app = build_app(ctx.obj["config"].get("celery")) + config = ctx.obj["config"] + app = build_app(config.get("celery")) app.set_current() logger = logging.getLogger(__name__ + ".runner") scheduler = ctx.obj["scheduler"] logger.debug("Scheduler %s" % scheduler) + task_types = [] + if not task_type_names: + for task_type_name in task_type_names: + task_type = scheduler.get_task_type(task_type_name) + if not task_type: + raise ValueError(f"Unknown {task_type_name}") + task_types.append(task_type) + try: while True: + if not task_types: + task_types = scheduler.get_task_types() + logger.debug("Run ready tasks") try: - ntasks = len(run_ready_tasks(scheduler, app)) + ntasks = len(run_ready_tasks(scheduler, app, task_types, with_priority)) if ntasks: logger.info("Scheduled %s tasks", ntasks) except Exception: diff --git a/swh/scheduler/tests/test_celery_tasks.py b/swh/scheduler/tests/test_celery_tasks.py --- a/swh/scheduler/tests/test_celery_tasks.py +++ b/swh/scheduler/tests/test_celery_tasks.py @@ -100,7 +100,8 @@ swh_scheduler.create_tasks([create_task_dict("swh-test-ping", "oneshot")]) - backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) + task_types = swh_scheduler.get_task_types() + backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app, task_types) assert backend_tasks for task in backend_tasks: # Make sure the task completed @@ -135,7 +136,8 @@ assert task["priority"] is None task_ids.add(task["id"]) - backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) + task_types = swh_scheduler.get_task_types() + backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app, task_types) assert len(backend_tasks) == len(tasks) scheduled_tasks = swh_scheduler.search_tasks(task_type=task_type_name) @@ -179,7 +181,10 @@ assert task["priority"] is not None task_ids.add(task["id"]) - backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) + task_types = swh_scheduler.get_task_types() + backend_tasks = run_ready_tasks( + swh_scheduler, swh_scheduler_celery_app, task_types, with_priority=True + ) assert len(backend_tasks) == len(tasks) scheduled_tasks = swh_scheduler.search_tasks(task_type=task_type_name) @@ -204,7 +209,8 @@ swh_scheduler.create_tasks([create_task_dict("swh-test-error", "oneshot")]) - backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) + task_types = swh_scheduler.get_task_types() + backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app, task_types) assert len(backend_tasks) == 1 task = backend_tasks[0]