diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py index e1fe84d..52ce42a 100644 --- a/swh/scheduler/celery_backend/runner.py +++ b/swh/scheduler/celery_backend/runner.py @@ -1,154 +1,169 @@ # Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from typing import Dict, List, Tuple from kombu.utils.uuid import uuid from swh.core.statsd import statsd from swh.scheduler import get_scheduler from swh.scheduler.celery_backend.config import get_available_slots from swh.scheduler.interface import SchedulerInterface from swh.scheduler.utils import utcnow logger = logging.getLogger(__name__) # Max batch size for tasks MAX_NUM_TASKS = 10000 -def run_ready_tasks(backend: SchedulerInterface, app) -> List[Dict]: +def run_ready_tasks( + backend: SchedulerInterface, + app, + task_types: List[Dict] = [], + with_priority: bool = False, +) -> List[Dict]: """Schedule tasks ready to be scheduled. This lookups any tasks per task type and mass schedules those accordingly (send messages to rabbitmq and mark as scheduled equivalent tasks in the scheduler backend). If tasks (per task type) with priority exist, they will get redirected to dedicated high priority queue (standard queue name prefixed with `save_code_now:`). Args: backend: scheduler backend to interact with (read/update tasks) app (App): Celery application to send tasks to + task_types: The list of task types dict to iterate over. By default, empty. + When empty, the full list of task types referenced in the scheduler will be + used. + with_priority: If True, only tasks with priority set will be fetched and + scheduled. By default, False. Returns: A list of dictionaries:: { 'task': the scheduler's task id, 'backend_id': Celery's task id, 'scheduler': utcnow() } The result can be used to block-wait for the tasks' results:: backend_tasks = run_ready_tasks(self.scheduler, app) for task in backend_tasks: AsyncResult(id=task['backend_id']).get() """ all_backend_tasks: List[Dict] = [] while True: - task_types = {} + if not task_types: + task_types = backend.get_task_types() + task_types_d = {} pending_tasks = [] - for task_type in backend.get_task_types(): + for task_type in task_types: task_type_name = task_type["type"] - task_types[task_type_name] = task_type + task_types_d[task_type_name] = task_type max_queue_length = task_type["max_queue_length"] if max_queue_length is None: max_queue_length = 0 backend_name = task_type["backend_name"] - num_tasks = get_available_slots(app, backend_name, max_queue_length) - # only pull tasks if the buffer is at least 1/5th empty (= 80% - # full), to help postgresql use properly indexed queries. - if num_tasks > min(MAX_NUM_TASKS, max_queue_length) // 5: - # Only grab num_tasks tasks with no priority - grabbed_tasks = backend.grab_ready_tasks( - task_type_name, num_tasks=num_tasks + + if with_priority: + # grab max_queue_length (or 10) potential tasks with any priority for + # the same type (limit the result to avoid too long running queries) + grabbed_priority_tasks = backend.grab_ready_priority_tasks( + task_type_name, num_tasks=max_queue_length or 10 ) - if grabbed_tasks: - pending_tasks.extend(grabbed_tasks) + if grabbed_priority_tasks: + pending_tasks.extend(grabbed_priority_tasks) logger.info( - "Grabbed %s tasks %s", len(grabbed_tasks), task_type_name + "Grabbed %s tasks %s (priority)", + len(grabbed_priority_tasks), + task_type_name, ) statsd.increment( "swh_scheduler_runner_scheduled_task_total", - len(grabbed_tasks), + len(grabbed_priority_tasks), tags={"task_type": task_type_name}, ) - # grab max_queue_length (or 10) potential tasks with any priority for the - # same type (limit the result to avoid too long running queries) - grabbed_priority_tasks = backend.grab_ready_priority_tasks( - task_type_name, num_tasks=max_queue_length or 10 - ) - if grabbed_priority_tasks: - pending_tasks.extend(grabbed_priority_tasks) - logger.info( - "Grabbed %s tasks %s (priority)", - len(grabbed_priority_tasks), - task_type_name, - ) - statsd.increment( - "swh_scheduler_runner_scheduled_task_total", - len(grabbed_priority_tasks), - tags={"task_type": task_type_name}, - ) + else: + num_tasks = get_available_slots(app, backend_name, max_queue_length) + # only pull tasks if the buffer is at least 1/5th empty (= 80% + # full), to help postgresql use properly indexed queries. + if num_tasks > min(MAX_NUM_TASKS, max_queue_length) // 5: + # Only grab num_tasks tasks with no priority + grabbed_tasks = backend.grab_ready_tasks( + task_type_name, num_tasks=num_tasks + ) + if grabbed_tasks: + pending_tasks.extend(grabbed_tasks) + logger.info( + "Grabbed %s tasks %s", len(grabbed_tasks), task_type_name + ) + statsd.increment( + "swh_scheduler_runner_scheduled_task_total", + len(grabbed_tasks), + tags={"task_type": task_type_name}, + ) if not pending_tasks: return all_backend_tasks backend_tasks = [] celery_tasks: List[Tuple[bool, str, str, List, Dict]] = [] for task in pending_tasks: args = task["arguments"]["args"] kwargs = task["arguments"]["kwargs"] - backend_name = task_types[task["type"]]["backend_name"] + backend_name = task_types_d[task["type"]]["backend_name"] backend_id = uuid() celery_tasks.append( ( task.get("priority") is not None, backend_name, backend_id, args, kwargs, ) ) data = { "task": task["id"], "backend_id": backend_id, "scheduled": utcnow(), } backend_tasks.append(data) logger.debug("Sent %s celery tasks", len(backend_tasks)) backend.mass_schedule_task_runs(backend_tasks) for with_priority, backend_name, backend_id, args, kwargs in celery_tasks: kw = dict(task_id=backend_id, args=args, kwargs=kwargs,) if with_priority: kw["queue"] = f"save_code_now:{backend_name}" app.send_task(backend_name, **kw) all_backend_tasks.extend(backend_tasks) def main(): from .config import app as main_app for module in main_app.conf.CELERY_IMPORTS: __import__(module) main_backend = get_scheduler("local") try: run_ready_tasks(main_backend, main_app) except Exception: main_backend.rollback() raise if __name__ == "__main__": main() diff --git a/swh/scheduler/cli/admin.py b/swh/scheduler/cli/admin.py index 2ce0873..c8bb619 100644 --- a/swh/scheduler/cli/admin.py +++ b/swh/scheduler/cli/admin.py @@ -1,110 +1,134 @@ -# Copyright (C) 2016-2019 The Software Heritage developers +# Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import logging import time import click from . import cli @cli.command("start-runner") @click.option( "--period", "-p", default=0, help=( "Period (in s) at witch pending tasks are checked and " "executed. Set to 0 (default) for a one shot." ), ) +@click.option( + "--task-type", + "task_type_names", + multiple=True, + default=[], + help="Task type names (e.g load-git, load-hg, list-github-full, ...) to schedule.", +) +@click.option( + "--with-priority/--without-priority", + is_flag=True, + default=False, + help=( + "Determine if those tasks should be the ones with priority or not." + "By default, this deals with tasks without any priority." + ), +) @click.pass_context -def runner(ctx, period): +def runner(ctx, period, task_type_names, with_priority): """Starts a swh-scheduler runner service. This process is responsible for checking for ready-to-run tasks and schedule them.""" from swh.scheduler.celery_backend.config import build_app from swh.scheduler.celery_backend.runner import run_ready_tasks - app = build_app(ctx.obj["config"].get("celery")) + config = ctx.obj["config"] + app = build_app(config.get("celery")) app.set_current() logger = logging.getLogger(__name__ + ".runner") scheduler = ctx.obj["scheduler"] logger.debug("Scheduler %s" % scheduler) + task_types = [] + for task_type_name in task_type_names: + task_type = scheduler.get_task_type(task_type_name) + if not task_type: + raise ValueError(f"Unknown {task_type_name}") + task_types.append(task_type) + try: while True: logger.debug("Run ready tasks") try: - ntasks = len(run_ready_tasks(scheduler, app)) + ntasks = len(run_ready_tasks(scheduler, app, task_types, with_priority)) if ntasks: logger.info("Scheduled %s tasks", ntasks) except Exception: logger.exception("Unexpected error in run_ready_tasks()") if not period: break time.sleep(period) except KeyboardInterrupt: ctx.exit(0) @cli.command("start-listener") @click.pass_context def listener(ctx): """Starts a swh-scheduler listener service. This service is responsible for listening at task lifecycle events and handle their workflow status in the database.""" scheduler_backend = ctx.obj["scheduler"] if not scheduler_backend: raise ValueError("Scheduler class (local/remote) must be instantiated") broker = ( ctx.obj["config"] .get("celery", {}) .get("task_broker", "amqp://guest@localhost/%2f") ) from swh.scheduler.celery_backend.pika_listener import get_listener listener = get_listener(broker, "celeryev.listener", scheduler_backend) try: listener.start_consuming() finally: listener.stop_consuming() @cli.command("rpc-serve") @click.option("--host", default="0.0.0.0", help="Host to run the scheduler server api") @click.option("--port", default=5008, type=click.INT, help="Binding port of the server") @click.option( "--debug/--nodebug", default=None, help=( "Indicates if the server should run in debug mode. " "Defaults to True if log-level is DEBUG, False otherwise." ), ) @click.pass_context def rpc_server(ctx, host, port, debug): """Starts a swh-scheduler API HTTP server. """ if ctx.obj["config"]["scheduler"]["cls"] == "remote": click.echo( "The API server can only be started with a 'local' " "configuration", err=True, ) ctx.exit(1) from swh.scheduler.api import server server.app.config.update(ctx.obj["config"]) if debug is None: debug = ctx.obj["log_level"] <= logging.DEBUG server.app.run(host, port=port, debug=bool(debug)) diff --git a/swh/scheduler/tests/test_celery_tasks.py b/swh/scheduler/tests/test_celery_tasks.py index 1458e62..102204c 100644 --- a/swh/scheduler/tests/test_celery_tasks.py +++ b/swh/scheduler/tests/test_celery_tasks.py @@ -1,281 +1,283 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Module in charge of testing the scheduler runner module""" from itertools import count from time import sleep from celery.result import AsyncResult, GroupResult from kombu import Exchange, Queue import pytest from swh.scheduler.celery_backend.runner import run_ready_tasks from swh.scheduler.tests.tasks import ( TASK_ADD, TASK_ECHO, TASK_ERROR, TASK_MULTIPING, TASK_PING, ) from swh.scheduler.utils import create_task_dict # Queues to subscribe. Due to the rerouting of high priority tasks, this module requires # to declare all queues/task names TEST_QUEUES = [ "celery", TASK_ECHO, TASK_ERROR, TASK_PING, TASK_ADD, TASK_MULTIPING, # and the high priority queue f"save_code_now:{TASK_ADD}", ] @pytest.fixture(scope="session") def swh_scheduler_celery_app(swh_scheduler_celery_app): swh_scheduler_celery_app.add_defaults( { "task_queues": [ Queue(queue, Exchange(queue), routing_key=queue) for queue in TEST_QUEUES ], } ) return swh_scheduler_celery_app def test_ping(swh_scheduler_celery_app, swh_scheduler_celery_worker): res = swh_scheduler_celery_app.send_task(TASK_PING) assert res res.wait() assert res.successful() assert res.result == "OK" def test_ping_with_kw(swh_scheduler_celery_app, swh_scheduler_celery_worker): res = swh_scheduler_celery_app.send_task(TASK_PING, kwargs={"a": 1}) assert res res.wait() assert res.successful() assert res.result == "OK (kw={'a': 1})" def test_multiping(swh_scheduler_celery_app, swh_scheduler_celery_worker): "Test that a task that spawns subtasks (group) works" res = swh_scheduler_celery_app.send_task(TASK_MULTIPING, kwargs={"n": 5}) assert res res.wait() assert res.successful() # retrieve the GroupResult for this task and wait for all the subtasks # to complete promise_id = res.result assert promise_id promise = GroupResult.restore(promise_id, app=swh_scheduler_celery_app) for i in range(5): if promise.ready(): break sleep(1) results = [x.get() for x in promise.results] assert len(results) == 5 for i in range(5): assert ("OK (kw={'i': %s})" % i) in results def test_scheduler_fixture( swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler ): "Test that the scheduler fixture works properly" task_type = swh_scheduler.get_task_type("swh-test-ping") assert task_type assert task_type["backend_name"] == TASK_PING swh_scheduler.create_tasks([create_task_dict("swh-test-ping", "oneshot")]) backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) assert backend_tasks for task in backend_tasks: # Make sure the task completed AsyncResult(id=task["backend_id"]).get() def test_run_ready_task_standard( swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler ): """Ensure scheduler runner schedules tasks ready for scheduling""" task_type_name, backend_name = "swh-test-add", TASK_ADD task_type = swh_scheduler.get_task_type(task_type_name) assert task_type assert task_type["backend_name"] == backend_name task_inputs = [ ("oneshot", (12, 30)), ("oneshot", (20, 10)), ("recurring", (30, 10)), ] tasks = swh_scheduler.create_tasks( create_task_dict(task_type_name, policy, *args) for (policy, args) in task_inputs ) assert len(tasks) == len(task_inputs) task_ids = set() for task in tasks: assert task["status"] == "next_run_not_scheduled" assert task["priority"] is None task_ids.add(task["id"]) backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) assert len(backend_tasks) == len(tasks) scheduled_tasks = swh_scheduler.search_tasks(task_type=task_type_name) assert len(scheduled_tasks) == len(tasks) for task in scheduled_tasks: assert task["status"] == "next_run_scheduled" assert task["id"] in task_ids # Ensure each task is indeed scheduled to the queue backend for i, (_, args) in enumerate(task_inputs): task = backend_tasks[i] value = AsyncResult(id=task["backend_id"]).get() assert value == sum(args) def test_run_ready_task_with_priority( swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler ): """Ensure scheduler runner schedules priority tasks ready for scheduling""" task_type_name, backend_name = "swh-test-add", TASK_ADD task_type = swh_scheduler.get_task_type(task_type_name) assert task_type assert task_type["backend_name"] == backend_name task_inputs = [ ("oneshot", (10, 22), "low"), ("oneshot", (20, 10), "normal"), ("recurring", (30, 10), "high"), ] tasks = swh_scheduler.create_tasks( create_task_dict(task_type_name, policy, *args, priority=priority) for (policy, args, priority) in task_inputs ) assert len(tasks) == len(task_inputs) task_ids = set() for task in tasks: assert task["status"] == "next_run_not_scheduled" assert task["priority"] is not None task_ids.add(task["id"]) - backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) + backend_tasks = run_ready_tasks( + swh_scheduler, swh_scheduler_celery_app, task_types=[], with_priority=True + ) assert len(backend_tasks) == len(tasks) scheduled_tasks = swh_scheduler.search_tasks(task_type=task_type_name) assert len(scheduled_tasks) == len(tasks) for task in scheduled_tasks: assert task["status"] == "next_run_scheduled" assert task["id"] in task_ids # Ensure each priority task is indeed scheduled to the queue backend for i, (_, args, _) in enumerate(task_inputs): task = backend_tasks[i] value = AsyncResult(id=task["backend_id"]).get() assert value == sum(args) def test_task_exception( swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler ): task_type = swh_scheduler.get_task_type("swh-test-error") assert task_type assert task_type["backend_name"] == TASK_ERROR swh_scheduler.create_tasks([create_task_dict("swh-test-error", "oneshot")]) backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) assert len(backend_tasks) == 1 task = backend_tasks[0] result = AsyncResult(id=task["backend_id"]) with pytest.raises(NotImplementedError): result.get() def test_statsd(swh_scheduler_celery_app, swh_scheduler_celery_worker, mocker): m = mocker.patch("swh.scheduler.task.Statsd._send_to_server") mocker.patch("swh.scheduler.task.ts", side_effect=count()) mocker.patch("swh.core.statsd.monotonic", side_effect=count()) res = swh_scheduler_celery_app.send_task(TASK_ECHO) assert res res.wait() assert res.successful() assert res.result == {} m.assert_any_call( "swh_task_called_count:1|c|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_start_ts:0|g|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_end_ts:1|g|" "#status:uneventful,task:swh.scheduler.tests.tasks.echo," "worker:unknown worker" ) m.assert_any_call( "swh_task_duration_seconds:1000|ms|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_success_count:1|c|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) def test_statsd_with_status( swh_scheduler_celery_app, swh_scheduler_celery_worker, mocker ): m = mocker.patch("swh.scheduler.task.Statsd._send_to_server") mocker.patch("swh.scheduler.task.ts", side_effect=count()) mocker.patch("swh.core.statsd.monotonic", side_effect=count()) res = swh_scheduler_celery_app.send_task(TASK_ECHO, kwargs={"status": "eventful"}) assert res res.wait() assert res.successful() assert res.result == {"status": "eventful"} m.assert_any_call( "swh_task_called_count:1|c|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_start_ts:0|g|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_end_ts:1|g|" "#status:eventful,task:swh.scheduler.tests.tasks.echo," "worker:unknown worker" ) m.assert_any_call( "swh_task_duration_seconds:1000|ms|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_success_count:1|c|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py index ae14d88..497780c 100644 --- a/swh/scheduler/tests/test_cli.py +++ b/swh/scheduler/tests/test_cli.py @@ -1,795 +1,858 @@ -# Copyright (C) 2019-2020 The Software Heritage developers +# Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from itertools import islice import logging +import random import re import tempfile from unittest.mock import patch from click.testing import CliRunner import pytest from swh.core.api.classes import stream_results from swh.model.model import Origin from swh.scheduler.cli import cli from swh.scheduler.utils import create_task_dict, utcnow from swh.storage import get_storage CLI_CONFIG = """ scheduler: cls: foo args: {} """ def invoke(scheduler, catch_exceptions, args): runner = CliRunner() with patch( "swh.scheduler.get_scheduler" ) as get_scheduler_mock, tempfile.NamedTemporaryFile( "a", suffix=".yml" ) as config_fd: config_fd.write(CLI_CONFIG) config_fd.seek(0) get_scheduler_mock.return_value = scheduler args = ["-C" + config_fd.name,] + args result = runner.invoke(cli, args, obj={"log_level": logging.WARNING}) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test_schedule_tasks(swh_scheduler): csv_data = ( b'swh-test-ping;[["arg1", "arg2"]];{"key": "value"};' + utcnow().isoformat().encode() + b"\n" + b'swh-test-ping;[["arg3", "arg4"]];{"key": "value"};' + utcnow().isoformat().encode() + b"\n" ) with tempfile.NamedTemporaryFile(suffix=".csv") as csv_fd: csv_fd.write(csv_data) csv_fd.seek(0) result = invoke( swh_scheduler, False, ["task", "schedule", "-d", ";", csv_fd.name] ) expected = r""" Created 2 tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: recurring Args: \['arg1', 'arg2'\] Keyword args: key: 'value' Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: recurring Args: \['arg3', 'arg4'\] Keyword args: key: 'value' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_schedule_tasks_columns(swh_scheduler): with tempfile.NamedTemporaryFile(suffix=".csv") as csv_fd: csv_fd.write(b'swh-test-ping;oneshot;["arg1", "arg2"];{"key": "value"}\n') csv_fd.seek(0) result = invoke( swh_scheduler, False, [ "task", "schedule", "-c", "type", "-c", "policy", "-c", "args", "-c", "kwargs", "-d", ";", csv_fd.name, ], ) expected = r""" Created 1 tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: 'arg1' 'arg2' Keyword args: key: 'value' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_schedule_task(swh_scheduler): result = invoke( swh_scheduler, False, ["task", "add", "swh-test-ping", "arg1", "arg2", "key=value",], ) expected = r""" Created 1 tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: recurring Args: 'arg1' 'arg2' Keyword args: key: 'value' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks_none(swh_scheduler): result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",]) expected = r""" Found 0 swh-test-ping tasks """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task2["next_run"] += datetime.timedelta(days=1) swh_scheduler.create_tasks([task1, task2]) result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",]) expected = r""" Found 1 swh-test-ping tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value1' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output swh_scheduler.grab_ready_tasks("swh-test-ping") result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",]) expected = r""" Found 0 swh-test-ping tasks """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks_filter(swh_scheduler): task = create_task_dict("swh-test-multiping", "oneshot", key="value") swh_scheduler.create_tasks([task]) result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",]) expected = r""" Found 0 swh-test-ping tasks """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks_filter_2(swh_scheduler): swh_scheduler.create_tasks( [ create_task_dict("swh-test-multiping", "oneshot", key="value"), create_task_dict("swh-test-ping", "oneshot", key="value2"), ] ) result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",]) expected = r""" Found 1 swh-test-ping tasks Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output # Fails because "task list-pending --limit 3" only returns 2 tasks, because # of how compute_nb_tasks_from works. @pytest.mark.xfail def test_list_pending_tasks_limit(swh_scheduler): swh_scheduler.create_tasks( [ create_task_dict("swh-test-ping", "oneshot", key="value%d" % i) for i in range(10) ] ) result = invoke( swh_scheduler, False, ["task", "list-pending", "swh-test-ping", "--limit", "3",] ) expected = r""" Found 2 swh-test-ping tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value0' Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value1' Task 3 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks_before(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task1["next_run"] += datetime.timedelta(days=3) task2["next_run"] += datetime.timedelta(days=1) swh_scheduler.create_tasks([task1, task2]) result = invoke( swh_scheduler, False, [ "task", "list-pending", "swh-test-ping", "--before", (datetime.date.today() + datetime.timedelta(days=2)).isoformat(), ], ) expected = r""" Found 1 swh-test-ping tasks Task 2 Next run: tomorrow \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task1["next_run"] += datetime.timedelta(days=3, hours=2) swh_scheduler.create_tasks([task1, task2]) swh_scheduler.grab_ready_tasks("swh-test-ping") result = invoke(swh_scheduler, False, ["task", "list",]) expected = r""" Found 2 tasks Task 1 Next run: .+ \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value1' Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_scheduled Priority:\x20 Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_id(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task3 = create_task_dict("swh-test-ping", "oneshot", key="value3") swh_scheduler.create_tasks([task1, task2, task3]) result = invoke(swh_scheduler, False, ["task", "list", "--task-id", "2",]) expected = r""" Found 1 tasks Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_id_2(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task3 = create_task_dict("swh-test-ping", "oneshot", key="value3") swh_scheduler.create_tasks([task1, task2, task3]) result = invoke( swh_scheduler, False, ["task", "list", "--task-id", "2", "--task-id", "3"] ) expected = r""" Found 2 tasks Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value2' Task 3 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value3' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_type(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-multiping", "oneshot", key="value2") task3 = create_task_dict("swh-test-ping", "oneshot", key="value3") swh_scheduler.create_tasks([task1, task2, task3]) result = invoke( swh_scheduler, False, ["task", "list", "--task-type", "swh-test-ping"] ) expected = r""" Found 2 tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value1' Task 3 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value3' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_limit(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task3 = create_task_dict("swh-test-ping", "oneshot", key="value3") swh_scheduler.create_tasks([task1, task2, task3]) result = invoke(swh_scheduler, False, ["task", "list", "--limit", "2",]) expected = r""" Found 2 tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value1' Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_before(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task1["next_run"] += datetime.timedelta(days=3, hours=2) swh_scheduler.create_tasks([task1, task2]) swh_scheduler.grab_ready_tasks("swh-test-ping") result = invoke( swh_scheduler, False, [ "task", "list", "--before", (datetime.date.today() + datetime.timedelta(days=2)).isoformat(), ], ) expected = r""" Found 1 tasks Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_scheduled Priority:\x20 Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_after(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task1["next_run"] += datetime.timedelta(days=3, hours=2) swh_scheduler.create_tasks([task1, task2]) swh_scheduler.grab_ready_tasks("swh-test-ping") result = invoke( swh_scheduler, False, [ "task", "list", "--after", (datetime.date.today() + datetime.timedelta(days=2)).isoformat(), ], ) expected = r""" Found 1 tasks Task 1 Next run: .+ \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value1' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def _fill_storage_with_origins(storage, nb_origins): origins = [Origin(url=f"http://example.com/{i}") for i in range(nb_origins)] storage.origin_add(origins) return origins @pytest.fixture def storage(): """An instance of in-memory storage that gets injected into the CLI functions.""" storage = get_storage(cls="memory") with patch("swh.storage.get_storage") as get_storage_mock: get_storage_mock.return_value = storage yield storage @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) def test_task_schedule_origins_dry_run(swh_scheduler, storage): """Tests the scheduling when origin_batch_size*task_batch_size is a divisor of nb_origins.""" _fill_storage_with_origins(storage, 90) result = invoke( swh_scheduler, False, ["task", "schedule_origins", "--dry-run", "swh-test-ping",], ) # Check the output expected = r""" Scheduled 3 tasks \(30 origins\). Scheduled 6 tasks \(60 origins\). Scheduled 9 tasks \(90 origins\). Done. """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) # Check scheduled tasks tasks = swh_scheduler.search_tasks() assert len(tasks) == 0 def _assert_origin_tasks_contraints(tasks, max_tasks, max_task_size, expected_origins): # check there are not too many tasks assert len(tasks) <= max_tasks # check tasks are not too large assert all(len(task["arguments"]["args"][0]) <= max_task_size for task in tasks) # check the tasks are exhaustive assert sum([len(task["arguments"]["args"][0]) for task in tasks]) == len( expected_origins ) assert set.union(*(set(task["arguments"]["args"][0]) for task in tasks)) == { origin.url for origin in expected_origins } @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) def test_task_schedule_origins(swh_scheduler, storage): """Tests the scheduling when neither origin_batch_size or task_batch_size is a divisor of nb_origins.""" origins = _fill_storage_with_origins(storage, 70) result = invoke( swh_scheduler, False, ["task", "schedule_origins", "swh-test-ping", "--batch-size", "20",], ) # Check the output expected = r""" Scheduled 3 tasks \(60 origins\). Scheduled 4 tasks \(70 origins\). Done. """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) # Check tasks tasks = swh_scheduler.search_tasks() _assert_origin_tasks_contraints(tasks, 4, 20, origins) assert all(task["arguments"]["kwargs"] == {} for task in tasks) def test_task_schedule_origins_kwargs(swh_scheduler, storage): """Tests support of extra keyword-arguments.""" origins = _fill_storage_with_origins(storage, 30) result = invoke( swh_scheduler, False, [ "task", "schedule_origins", "swh-test-ping", "--batch-size", "20", 'key1="value1"', 'key2="value2"', ], ) # Check the output expected = r""" Scheduled 2 tasks \(30 origins\). Done. """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) # Check tasks tasks = swh_scheduler.search_tasks() _assert_origin_tasks_contraints(tasks, 2, 20, origins) assert all( task["arguments"]["kwargs"] == {"key1": "value1", "key2": "value2"} for task in tasks ) def test_task_schedule_origins_with_limit(swh_scheduler, storage): """Tests support of extra keyword-arguments.""" _fill_storage_with_origins(storage, 50) limit = 20 expected_origins = list(islice(stream_results(storage.origin_list), limit)) nb_origins = len(expected_origins) assert nb_origins == limit max_task_size = 5 nb_tasks, remainder = divmod(nb_origins, max_task_size) assert remainder == 0 # made the numbers go round result = invoke( swh_scheduler, False, [ "task", "schedule_origins", "swh-test-ping", "--batch-size", max_task_size, "--limit", limit, ], ) # Check the output expected = rf""" Scheduled {nb_tasks} tasks \({nb_origins} origins\). Done. """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) tasks = swh_scheduler.search_tasks() _assert_origin_tasks_contraints(tasks, max_task_size, nb_origins, expected_origins) def test_task_schedule_origins_with_page_token(swh_scheduler, storage): """Tests support of extra keyword-arguments.""" nb_total_origins = 50 origins = _fill_storage_with_origins(storage, nb_total_origins) # prepare page_token and origins result expectancy page_result = storage.origin_list(limit=10) assert len(page_result.results) == 10 page_token = page_result.next_page_token assert page_token is not None # remove the first 10 origins listed as we won't see those in tasks expected_origins = [o for o in origins if o not in page_result.results] nb_origins = len(expected_origins) assert nb_origins == nb_total_origins - len(page_result.results) max_task_size = 10 nb_tasks, remainder = divmod(nb_origins, max_task_size) assert remainder == 0 result = invoke( swh_scheduler, False, [ "task", "schedule_origins", "swh-test-ping", "--batch-size", max_task_size, "--page-token", page_token, ], ) # Check the output expected = rf""" Scheduled {nb_tasks} tasks \({nb_origins} origins\). Done. """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) # Check tasks tasks = swh_scheduler.search_tasks() _assert_origin_tasks_contraints(tasks, max_task_size, nb_origins, expected_origins) + + +def test_cli_task_runner_unknown_task_types(swh_scheduler, storage): + """When passing at least one unknown task type, the runner should fail.""" + + task_types = swh_scheduler.get_task_types() + task_type_names = [t["type"] for t in task_types] + known_task_type = random.choice(task_type_names) + unknown_task_type = "unknown-task-type" + assert unknown_task_type not in task_type_names + + with pytest.raises(ValueError, match="Unknown"): + invoke( + swh_scheduler, + False, + [ + "start-runner", + "--task-type", + known_task_type, + "--task-type", + unknown_task_type, + ], + ) + + +@pytest.mark.parametrize("flag_priority", ["--with-priority", "--without-priority"]) +def test_cli_task_runner_with_known_tasks( + swh_scheduler, storage, caplog, flag_priority +): + """Trigger runner with known tasks runs smoothly.""" + + task_types = swh_scheduler.get_task_types() + task_type_names = [t["type"] for t in task_types] + task_type_name = random.choice(task_type_names) + task_type_name2 = random.choice(task_type_names) + + # The runner will just iterate over the following known tasks and do noop. We are + # just checking the runner does not explode here. + result = invoke( + swh_scheduler, + False, + [ + "start-runner", + flag_priority, + "--task-type", + task_type_name, + "--task-type", + task_type_name2, + ], + ) + + assert result.exit_code == 0, result.output + + +def test_cli_task_runner_no_task(swh_scheduler, storage): + """Trigger runner with no parameter should run as before.""" + + # The runner will just iterate over the existing tasks from the scheduler and do + # noop. We are just checking the runner does not explode here. + result = invoke(swh_scheduler, False, ["start-runner",],) + + assert result.exit_code == 0, result.output