diff --git a/swh/scheduler/cli/task_type.py b/swh/scheduler/cli/task_type.py index c6368eb..1557007 100644 --- a/swh/scheduler/cli/task_type.py +++ b/swh/scheduler/cli/task_type.py @@ -1,232 +1,231 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from importlib import import_module import logging from typing import Mapping # WARNING: do not import unnecessary things here to keep cli startup time under # control import click from pkg_resources import iter_entry_points from . import cli logger = logging.getLogger(__name__) DEFAULT_TASK_TYPE = { "full": { # for tasks like 'list_xxx_full()' "default_interval": "90 days", "min_interval": "90 days", "max_interval": "90 days", "backoff_factor": 1, }, "*": { # value if not suffix matches "default_interval": "1 day", "min_interval": "1 day", "max_interval": "1 day", "backoff_factor": 1, }, } PLUGIN_WORKER_DESCRIPTIONS = { entry_point.name: entry_point for entry_point in iter_entry_points("swh.workers") } @cli.group("task-type") @click.pass_context def task_type(ctx): """Manipulate task types.""" - pass + scheduler = ctx.obj["scheduler"] + if not scheduler: + raise ValueError("Scheduler class (local/remote) must be instantiated") @task_type.command("list") @click.option("--verbose", "-v", is_flag=True, default=False, help="Verbose mode") @click.option( "--task_type", "-t", multiple=True, default=None, help="List task types of given type", ) @click.option( "--task_name", "-n", multiple=True, default=None, help="List task types of given backend task name", ) @click.pass_context def list_task_types(ctx, verbose, task_type, task_name): click.echo("Known task types:") if verbose: tmpl = ( click.style("{type}: ", bold=True) + """{backend_name} {description} interval: {default_interval} [{min_interval}, {max_interval}] backoff_factor: {backoff_factor} max_queue_length: {max_queue_length} num_retries: {num_retries} retry_delay: {retry_delay} """ ) else: tmpl = "{type}:\n {description}" for tasktype in sorted( ctx.obj["scheduler"].get_task_types(), key=lambda x: x["type"] ): if task_type and tasktype["type"] not in task_type: continue if task_name and tasktype["backend_name"] not in task_name: continue click.echo(tmpl.format(**tasktype)) @task_type.command("register") @click.option( "--plugins", "-p", "plugins", multiple=True, default=("all",), type=click.Choice(["all"] + list(PLUGIN_WORKER_DESCRIPTIONS)), help="Registers task-types for provided plugins. " "Defaults to all", ) @click.pass_context def register_task_types(ctx, plugins): """Register missing task-type entries in the scheduler. According to declared tasks in each loaded worker (e.g. lister, loader, ...) plugins. """ import celery.app.task scheduler = ctx.obj["scheduler"] if plugins == ("all",): plugins = list(PLUGIN_WORKER_DESCRIPTIONS) for plugin in plugins: entrypoint = PLUGIN_WORKER_DESCRIPTIONS[plugin] logger.info("Loading entrypoint for plugin %s", plugin) registry_entry = entrypoint.load()() for task_module in registry_entry["task_modules"]: mod = import_module(task_module) for task_name in (x for x in dir(mod) if not x.startswith("_")): logger.debug("Loading task name %s", task_name) taskobj = getattr(mod, task_name) if isinstance(taskobj, celery.app.task.Task): tt_name = task_name.replace("_", "-") task_cfg = registry_entry.get("task_types", {}).get(tt_name, {}) ensure_task_type(task_module, tt_name, taskobj, task_cfg, scheduler) def ensure_task_type( task_module: str, task_type: str, swhtask, task_config: Mapping, scheduler ): """Ensure a given task-type (for the task_module) exists in the scheduler. Args: task_module: task module we are currently checking for task type consistency task_type: the type of the task to check/insert (correspond to the 'type' field in the db) swhtask (SWHTask): the SWHTask instance the task-type correspond to task_config: a dict with specific/overloaded values for the task-type to be created scheduler: the scheduler object used to access the scheduler db """ for suffix, defaults in DEFAULT_TASK_TYPE.items(): if task_type.endswith("-" + suffix): task_type_dict = defaults.copy() break else: task_type_dict = DEFAULT_TASK_TYPE["*"].copy() task_type_dict["type"] = task_type task_type_dict["backend_name"] = swhtask.name if swhtask.__doc__: task_type_dict["description"] = swhtask.__doc__.splitlines()[0] task_type_dict.update(task_config) current_task_type = scheduler.get_task_type(task_type) if current_task_type: # Ensure the existing task_type is consistent in the scheduler if current_task_type["backend_name"] != task_type_dict["backend_name"]: logger.warning( "Existing task type %s for module %s has a " "different backend name than current " "code version provides (%s vs. %s)", task_type, task_module, current_task_type["backend_name"], task_type_dict["backend_name"], ) else: logger.info("Create task type %s in scheduler", task_type) logger.debug(" %s", task_type_dict) scheduler.create_task_type(task_type_dict) @task_type.command("add") @click.argument("type", required=True) @click.argument("task-name", required=True) @click.argument("description", required=True) @click.option( "--default-interval", "-i", default="90 days", help='Default interval ("90 days" by default)', ) @click.option( "--min-interval", default=None, help="Minimum interval (default interval if not set)", ) @click.option( "--max-interval", "-i", default=None, help="Maximal interval (default interval if not set)", ) @click.option("--backoff-factor", "-f", type=float, default=1, help="Backoff factor") @click.pass_context def add_task_type( ctx, type, task_name, description, default_interval, min_interval, max_interval, backoff_factor, ): """Create a new task type """ - scheduler = ctx.obj["scheduler"] - if not scheduler: - raise ValueError("Scheduler class (local/remote) must be instantiated") task_type = dict( type=type, backend_name=task_name, description=description, default_interval=default_interval, min_interval=min_interval, max_interval=max_interval, backoff_factor=backoff_factor, max_queue_length=None, num_retries=None, retry_delay=None, ) - scheduler.create_task_type(task_type) + ctx.obj["scheduler"].create_task_type(task_type) click.echo("OK") diff --git a/swh/scheduler/tests/test_cli_task_type.py b/swh/scheduler/tests/test_cli_task_type.py index 4b2f6cf..64917d2 100644 --- a/swh/scheduler/tests/test_cli_task_type.py +++ b/swh/scheduler/tests/test_cli_task_type.py @@ -1,127 +1,139 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import traceback from click.testing import CliRunner import pkg_resources import pytest import yaml from swh.scheduler import get_scheduler from swh.scheduler.cli import cli FAKE_MODULE_ENTRY_POINTS = { "lister.gnu=swh.lister.gnu:register", "lister.pypi=swh.lister.pypi:register", } +@pytest.fixture +def cli_runner(): + return CliRunner() + + @pytest.fixture def mock_pkg_resources(monkeypatch): """Monkey patch swh.scheduler's mock_pkg_resources.iter_entry_point call """ def fake_iter_entry_points(*args, **kwargs): """Substitute fake function to return a fixed set of entrypoints """ from pkg_resources import Distribution, EntryPoint d = Distribution() return [EntryPoint.parse(entry, dist=d) for entry in FAKE_MODULE_ENTRY_POINTS] original_method = pkg_resources.iter_entry_points monkeypatch.setattr(pkg_resources, "iter_entry_points", fake_iter_entry_points) yield # reset monkeypatch: is that needed? monkeypatch.setattr(pkg_resources, "iter_entry_points", original_method) @pytest.fixture def local_sched_config(swh_scheduler_config): """Expose the local scheduler configuration """ return {"scheduler": {"cls": "local", **swh_scheduler_config}} @pytest.fixture def local_sched_configfile(local_sched_config, tmp_path): """Write in temporary location the local scheduler configuration """ configfile = tmp_path / "config.yml" configfile.write_text(yaml.dump(local_sched_config)) return configfile.as_posix() def test_register_ttypes_all( - mock_pkg_resources, local_sched_config, local_sched_configfile + cli_runner, mock_pkg_resources, local_sched_config, local_sched_configfile ): """Registering all task types""" for command in [ ["--config-file", local_sched_configfile, "task-type", "register"], ["--config-file", local_sched_configfile, "task-type", "register", "-p", "all"], [ "--config-file", local_sched_configfile, "task-type", "register", "-p", "lister.gnu", "-p", "lister.pypi", ], ]: - result = CliRunner().invoke(cli, command) + result = cli_runner.invoke(cli, command) assert result.exit_code == 0, traceback.print_exception(*result.exc_info) scheduler = get_scheduler(**local_sched_config["scheduler"]) all_tasks = [ "list-gnu-full", "list-pypi", ] for task in all_tasks: task_type_desc = scheduler.get_task_type(task) assert task_type_desc assert task_type_desc["type"] == task assert task_type_desc["backoff_factor"] == 1 def test_register_ttypes_filter( - mock_pkg_resources, local_sched_config, local_sched_configfile + mock_pkg_resources, cli_runner, local_sched_config, local_sched_configfile ): """Filtering on one worker should only register its associated task type """ - result = CliRunner().invoke( + result = cli_runner.invoke( cli, [ "--config-file", local_sched_configfile, "task-type", "register", "--plugins", "lister.gnu", ], ) assert result.exit_code == 0, traceback.print_exception(*result.exc_info) scheduler = get_scheduler(**local_sched_config["scheduler"]) all_tasks = [ "list-gnu-full", ] for task in all_tasks: task_type_desc = scheduler.get_task_type(task) assert task_type_desc assert task_type_desc["type"] == task assert task_type_desc["backoff_factor"] == 1 + + +@pytest.mark.parametrize("cli_command", ["list", "register", "add"]) +def test_cli_task_type_raise(cli_runner, cli_command): + """Without a proper configuration, the cli raises""" + with pytest.raises(ValueError, match="Scheduler class"): + cli_runner.invoke(cli, ["task-type", cli_command], catch_exceptions=False)