diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py --- a/swh/scheduler/celery_backend/runner.py +++ b/swh/scheduler/celery_backend/runner.py @@ -19,7 +19,12 @@ MAX_NUM_TASKS = 10000 -def run_ready_tasks(backend: SchedulerInterface, app) -> List[Dict]: +def run_ready_tasks( + backend: SchedulerInterface, + app, + task_types: List[Dict] = [], + with_priority: bool = False, +) -> List[Dict]: """Schedule tasks ready to be scheduled. This lookups any tasks per task type and mass schedules those accordingly (send @@ -32,6 +37,11 @@ Args: backend: scheduler backend to interact with (read/update tasks) app (App): Celery application to send tasks to + task_types: The list of task types dict to iterate over. By default, empty. + When empty, the full list of task types referenced in the scheduler will be + used. + with_priority: If True, only tasks with priority set will be fetched and + scheduled. By default, False. Returns: A list of dictionaries:: @@ -51,62 +61,70 @@ """ all_backend_tasks: List[Dict] = [] while True: - task_types = {} + if not task_types: + task_types = backend.get_task_types() + task_types_d = {} pending_tasks = [] - for task_type in backend.get_task_types(): + for task_type in task_types: task_type_name = task_type["type"] - task_types[task_type_name] = task_type + task_types_d[task_type_name] = task_type max_queue_length = task_type["max_queue_length"] if max_queue_length is None: max_queue_length = 0 backend_name = task_type["backend_name"] - if max_queue_length: - try: - queue_length = app.get_queue_length(backend_name) - except ValueError: - queue_length = None - - if queue_length is None: - # Running without RabbitMQ (probably a test env). - num_tasks = MAX_NUM_TASKS - else: - num_tasks = min(max_queue_length - queue_length, MAX_NUM_TASKS) - else: - num_tasks = MAX_NUM_TASKS - # only pull tasks if the buffer is at least 1/5th empty (= 80% - # full), to help postgresql use properly indexed queries. - if num_tasks > min(MAX_NUM_TASKS, max_queue_length) // 5: - # Only grab num_tasks tasks with no priority - grabbed_tasks = backend.grab_ready_tasks( - task_type_name, num_tasks=num_tasks + + if with_priority: + # grab max_queue_length (or 10) potential tasks with any priority for + # the same type (limit the result to avoid too long running queries) + grabbed_priority_tasks = backend.grab_ready_priority_tasks( + task_type_name, num_tasks=max_queue_length or 10 ) - if grabbed_tasks: - pending_tasks.extend(grabbed_tasks) + if grabbed_priority_tasks: + pending_tasks.extend(grabbed_priority_tasks) logger.info( - "Grabbed %s tasks %s", len(grabbed_tasks), task_type_name + "Grabbed %s tasks %s (priority)", + len(grabbed_priority_tasks), + task_type_name, ) statsd.increment( "swh_scheduler_runner_scheduled_task_total", - len(grabbed_tasks), + len(grabbed_priority_tasks), tags={"task_type": task_type_name}, ) - # grab max_queue_length (or 10) potential tasks with any priority for the - # same type (limit the result to avoid too long running queries) - grabbed_priority_tasks = backend.grab_ready_priority_tasks( - task_type_name, num_tasks=max_queue_length or 10 - ) - if grabbed_priority_tasks: - pending_tasks.extend(grabbed_priority_tasks) - logger.info( - "Grabbed %s tasks %s (priority)", - len(grabbed_priority_tasks), - task_type_name, - ) - statsd.increment( - "swh_scheduler_runner_scheduled_task_total", - len(grabbed_priority_tasks), - tags={"task_type": task_type_name}, - ) + else: + if max_queue_length: + try: + queue_length = app.get_queue_length(backend_name) + except ValueError: + queue_length = None + + if queue_length is None: + # Running without RabbitMQ (probably a test env). + num_tasks = MAX_NUM_TASKS + else: + num_tasks = min( + max_queue_length - queue_length, MAX_NUM_TASKS + ) + else: + num_tasks = MAX_NUM_TASKS + + # only pull tasks if the buffer is at least 1/5th empty (= 80% + # full), to help postgresql use properly indexed queries. + if num_tasks > min(MAX_NUM_TASKS, max_queue_length) // 5: + # Only grab num_tasks tasks with no priority + grabbed_tasks = backend.grab_ready_tasks( + task_type_name, num_tasks=num_tasks + ) + if grabbed_tasks: + pending_tasks.extend(grabbed_tasks) + logger.info( + "Grabbed %s tasks %s", len(grabbed_tasks), task_type_name + ) + statsd.increment( + "swh_scheduler_runner_scheduled_task_total", + len(grabbed_tasks), + tags={"task_type": task_type_name}, + ) if not pending_tasks: return all_backend_tasks @@ -117,7 +135,7 @@ args = task["arguments"]["args"] kwargs = task["arguments"]["kwargs"] - backend_name = task_types[task["type"]]["backend_name"] + backend_name = task_types_d[task["type"]]["backend_name"] backend_id = uuid() celery_tasks.append( ( diff --git a/swh/scheduler/cli/admin.py b/swh/scheduler/cli/admin.py --- a/swh/scheduler/cli/admin.py +++ b/swh/scheduler/cli/admin.py @@ -1,4 +1,4 @@ -# Copyright (C) 2016-2019 The Software Heritage developers +# Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -23,8 +23,26 @@ "executed. Set to 0 (default) for a one shot." ), ) +@click.option( + "--task-types", + "task_type_names", + default=None, + help=( + "Task types to scheduler. If not provided, this iterates over every " + "task types referenced in the scheduler backend." + ), +) +@click.option( + "--with-priority", + is_flag=True, + default=False, + help=( + "Determine if those tasks should be the ones with priority or not." + "By default, this deals with tasks without any priority." + ), +) @click.pass_context -def runner(ctx, period): +def runner(ctx, period, task_type_names, with_priority): """Starts a swh-scheduler runner service. This process is responsible for checking for ready-to-run tasks and @@ -32,17 +50,26 @@ from swh.scheduler.celery_backend.config import build_app from swh.scheduler.celery_backend.runner import run_ready_tasks - app = build_app(ctx.obj["config"].get("celery")) + config = ctx.obj["config"] + app = build_app(config.get("celery")) app.set_current() logger = logging.getLogger(__name__ + ".runner") scheduler = ctx.obj["scheduler"] logger.debug("Scheduler %s" % scheduler) + task_types = [] + if not task_type_names: + for task_type_name in task_type_names: + task_type = scheduler.get_task_type(task_type_name) + if not task_type: + raise ValueError(f"Unknown {task_type_name}") + task_types.append(task_type) + try: while True: logger.debug("Run ready tasks") try: - ntasks = len(run_ready_tasks(scheduler, app)) + ntasks = len(run_ready_tasks(scheduler, app, task_types, with_priority)) if ntasks: logger.info("Scheduled %s tasks", ntasks) except Exception: diff --git a/swh/scheduler/tests/test_celery_tasks.py b/swh/scheduler/tests/test_celery_tasks.py --- a/swh/scheduler/tests/test_celery_tasks.py +++ b/swh/scheduler/tests/test_celery_tasks.py @@ -179,7 +179,9 @@ assert task["priority"] is not None task_ids.add(task["id"]) - backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) + backend_tasks = run_ready_tasks( + swh_scheduler, swh_scheduler_celery_app, task_types=[], with_priority=True + ) assert len(backend_tasks) == len(tasks) scheduled_tasks = swh_scheduler.search_tasks(task_type=task_type_name)