diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py --- a/swh/scheduler/celery_backend/runner.py +++ b/swh/scheduler/celery_backend/runner.py @@ -20,7 +20,12 @@ MAX_NUM_TASKS = 10000 -def run_ready_tasks(backend: SchedulerInterface, app) -> List[Dict]: +def run_ready_tasks( + backend: SchedulerInterface, + app, + task_types: List[Dict] = [], + with_priority: bool = False, +) -> List[Dict]: """Schedule tasks ready to be scheduled. This lookups any tasks per task type and mass schedules those accordingly (send @@ -33,6 +38,11 @@ Args: backend: scheduler backend to interact with (read/update tasks) app (App): Celery application to send tasks to + task_types: The list of task types dict to iterate over. By default, empty. + When empty, the full list of task types referenced in the scheduler will be + used. + with_priority: If True, only tasks with priority set will be fetched and + scheduled. By default, False. Returns: A list of dictionaries:: @@ -52,50 +62,55 @@ """ all_backend_tasks: List[Dict] = [] while True: - task_types = {} + if not task_types: + task_types = backend.get_task_types() + task_types_d = {} pending_tasks = [] - for task_type in backend.get_task_types(): + for task_type in task_types: task_type_name = task_type["type"] - task_types[task_type_name] = task_type + task_types_d[task_type_name] = task_type max_queue_length = task_type["max_queue_length"] if max_queue_length is None: max_queue_length = 0 backend_name = task_type["backend_name"] - num_tasks = get_available_slots(app, backend_name, max_queue_length) - # only pull tasks if the buffer is at least 1/5th empty (= 80% - # full), to help postgresql use properly indexed queries. - if num_tasks > min(MAX_NUM_TASKS, max_queue_length) // 5: - # Only grab num_tasks tasks with no priority - grabbed_tasks = backend.grab_ready_tasks( - task_type_name, num_tasks=num_tasks + + if with_priority: + # grab max_queue_length (or 10) potential tasks with any priority for + # the same type (limit the result to avoid too long running queries) + grabbed_priority_tasks = backend.grab_ready_priority_tasks( + task_type_name, num_tasks=max_queue_length or 10 ) - if grabbed_tasks: - pending_tasks.extend(grabbed_tasks) + if grabbed_priority_tasks: + pending_tasks.extend(grabbed_priority_tasks) logger.info( - "Grabbed %s tasks %s", len(grabbed_tasks), task_type_name + "Grabbed %s tasks %s (priority)", + len(grabbed_priority_tasks), + task_type_name, ) statsd.increment( "swh_scheduler_runner_scheduled_task_total", - len(grabbed_tasks), + len(grabbed_priority_tasks), tags={"task_type": task_type_name}, ) - # grab max_queue_length (or 10) potential tasks with any priority for the - # same type (limit the result to avoid too long running queries) - grabbed_priority_tasks = backend.grab_ready_priority_tasks( - task_type_name, num_tasks=max_queue_length or 10 - ) - if grabbed_priority_tasks: - pending_tasks.extend(grabbed_priority_tasks) - logger.info( - "Grabbed %s tasks %s (priority)", - len(grabbed_priority_tasks), - task_type_name, - ) - statsd.increment( - "swh_scheduler_runner_scheduled_task_total", - len(grabbed_priority_tasks), - tags={"task_type": task_type_name}, - ) + else: + num_tasks = get_available_slots(app, backend_name, max_queue_length) + # only pull tasks if the buffer is at least 1/5th empty (= 80% + # full), to help postgresql use properly indexed queries. + if num_tasks > min(MAX_NUM_TASKS, max_queue_length) // 5: + # Only grab num_tasks tasks with no priority + grabbed_tasks = backend.grab_ready_tasks( + task_type_name, num_tasks=num_tasks + ) + if grabbed_tasks: + pending_tasks.extend(grabbed_tasks) + logger.info( + "Grabbed %s tasks %s", len(grabbed_tasks), task_type_name + ) + statsd.increment( + "swh_scheduler_runner_scheduled_task_total", + len(grabbed_tasks), + tags={"task_type": task_type_name}, + ) if not pending_tasks: return all_backend_tasks @@ -106,7 +121,7 @@ args = task["arguments"]["args"] kwargs = task["arguments"]["kwargs"] - backend_name = task_types[task["type"]]["backend_name"] + backend_name = task_types_d[task["type"]]["backend_name"] backend_id = uuid() celery_tasks.append( ( diff --git a/swh/scheduler/cli/admin.py b/swh/scheduler/cli/admin.py --- a/swh/scheduler/cli/admin.py +++ b/swh/scheduler/cli/admin.py @@ -1,4 +1,4 @@ -# Copyright (C) 2016-2019 The Software Heritage developers +# Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -23,8 +23,24 @@ "executed. Set to 0 (default) for a one shot." ), ) +@click.option( + "--task-type", + "task_type_names", + multiple=True, + default=[], + help="Task type names (e.g load-git, load-hg, list-github-full, ...) to schedule.", +) +@click.option( + "--with-priority/--without-priority", + is_flag=True, + default=False, + help=( + "Determine if those tasks should be the ones with priority or not." + "By default, this deals with tasks without any priority." + ), +) @click.pass_context -def runner(ctx, period): +def runner(ctx, period, task_type_names, with_priority): """Starts a swh-scheduler runner service. This process is responsible for checking for ready-to-run tasks and @@ -32,17 +48,25 @@ from swh.scheduler.celery_backend.config import build_app from swh.scheduler.celery_backend.runner import run_ready_tasks - app = build_app(ctx.obj["config"].get("celery")) + config = ctx.obj["config"] + app = build_app(config.get("celery")) app.set_current() logger = logging.getLogger(__name__ + ".runner") scheduler = ctx.obj["scheduler"] logger.debug("Scheduler %s" % scheduler) + task_types = [] + for task_type_name in task_type_names: + task_type = scheduler.get_task_type(task_type_name) + if not task_type: + raise ValueError(f"Unknown {task_type_name}") + task_types.append(task_type) + try: while True: logger.debug("Run ready tasks") try: - ntasks = len(run_ready_tasks(scheduler, app)) + ntasks = len(run_ready_tasks(scheduler, app, task_types, with_priority)) if ntasks: logger.info("Scheduled %s tasks", ntasks) except Exception: diff --git a/swh/scheduler/tests/test_celery_tasks.py b/swh/scheduler/tests/test_celery_tasks.py --- a/swh/scheduler/tests/test_celery_tasks.py +++ b/swh/scheduler/tests/test_celery_tasks.py @@ -179,7 +179,9 @@ assert task["priority"] is not None task_ids.add(task["id"]) - backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) + backend_tasks = run_ready_tasks( + swh_scheduler, swh_scheduler_celery_app, task_types=[], with_priority=True + ) assert len(backend_tasks) == len(tasks) scheduled_tasks = swh_scheduler.search_tasks(task_type=task_type_name) diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py --- a/swh/scheduler/tests/test_cli.py +++ b/swh/scheduler/tests/test_cli.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019-2020 The Software Heritage developers +# Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -6,6 +6,7 @@ import datetime from itertools import islice import logging +import random import re import tempfile from unittest.mock import patch @@ -793,3 +794,65 @@ # Check tasks tasks = swh_scheduler.search_tasks() _assert_origin_tasks_contraints(tasks, max_task_size, nb_origins, expected_origins) + + +def test_cli_task_runner_unknown_task_types(swh_scheduler, storage): + """When passing at least one unknown task type, the runner should fail.""" + + task_types = swh_scheduler.get_task_types() + task_type_names = [t["type"] for t in task_types] + known_task_type = random.choice(task_type_names) + unknown_task_type = "unknown-task-type" + assert unknown_task_type not in task_type_names + + with pytest.raises(ValueError, match="Unknown"): + invoke( + swh_scheduler, + False, + [ + "start-runner", + "--task-type", + known_task_type, + "--task-type", + unknown_task_type, + ], + ) + + +@pytest.mark.parametrize("flag_priority", ["--with-priority", "--without-priority"]) +def test_cli_task_runner_with_known_tasks( + swh_scheduler, storage, caplog, flag_priority +): + """Trigger runner with known tasks runs smoothly.""" + + task_types = swh_scheduler.get_task_types() + task_type_names = [t["type"] for t in task_types] + task_type_name = random.choice(task_type_names) + task_type_name2 = random.choice(task_type_names) + + # The runner will just iterate over the following known tasks and do noop. We are + # just checking the runner does not explode here. + result = invoke( + swh_scheduler, + False, + [ + "start-runner", + flag_priority, + "--task-type", + task_type_name, + "--task-type", + task_type_name2, + ], + ) + + assert result.exit_code == 0, result.output + + +def test_cli_task_runner_no_task(swh_scheduler, storage): + """Trigger runner with no parameter should run as before.""" + + # The runner will just iterate over the existing tasks from the scheduler and do + # noop. We are just checking the runner does not explode here. + result = invoke(swh_scheduler, False, ["start-runner",],) + + assert result.exit_code == 0, result.output