diff --git a/swh/core/cli/__init__.py b/swh/core/cli/__init__.py index 580d07f..c2dee46 100644 --- a/swh/core/cli/__init__.py +++ b/swh/core/cli/__init__.py @@ -1,139 +1,189 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import logging.config +from typing import Optional import warnings import click import pkg_resources LOG_LEVEL_NAMES = ["NOTSET", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) logger = logging.getLogger(__name__) class AliasedGroup(click.Group): """A simple Group that supports command aliases, as well as notes related to options""" def __init__(self, name=None, commands=None, **attrs): self.option_notes = attrs.pop("option_notes", None) self.aliases = {} super().__init__(name, commands, **attrs) def get_command(self, ctx, cmd_name): return super().get_command(ctx, self.aliases.get(cmd_name, cmd_name)) def add_alias(self, name, alias): if not isinstance(name, str): name = name.name self.aliases[alias] = name def format_options(self, ctx, formatter): click.Command.format_options(self, ctx, formatter) if self.option_notes: with formatter.section("Notes"): formatter.write_text(self.option_notes) self.format_commands(ctx, formatter) def clean_exit_on_signal(signal, frame): """Raise a SystemExit exception to let command-line clients wind themselves down on exit""" raise SystemExit(0) +def validate_loglevel_params(ctx, param, value): + """Validate the --log-level parameters, with multiple values""" + if value is None: + return None + return [validate_loglevel(ctx, param, v) for v in value] + + +def validate_loglevel(ctx, param, value): + """Validate a single loglevel specification, of the form LOGLEVEL or + module:LOGLEVEL.""" + if ":" in value: + try: + module, log_level = value.split(":") + except ValueError: + raise click.BadParameter( + "Invalid log level specification `%s`, " + "needs to be in format `module:LOGLEVEL`" % value + ) + else: + module = None + log_level = value + + if log_level not in LOG_LEVEL_NAMES: + raise click.BadParameter( + "Log level %s unknown (in `%s`) needs to be one of %s" + % (log_level, value, ", ".join(LOG_LEVEL_NAMES)) + ) + + return (module, log_level) + + @click.group( context_settings=CONTEXT_SETTINGS, cls=AliasedGroup, option_notes="""\ -If both options are present, --log-level will override the root logger -configuration set in --log-config. +If both options are present, --log-level values will override the configuration +in --log-config. The --log-config YAML must conform to the logging.config.dictConfig schema documented at https://docs.python.org/3/library/logging.config.html. """, ) @click.option( "--log-level", "-l", + "log_levels", default=None, - type=click.Choice(LOG_LEVEL_NAMES), - help="Log level (defaults to INFO).", + callback=validate_loglevel_params, + multiple=True, + help=( + "Log level (defaults to INFO). " + "Can override the log level for a specific module, by using the " + "`specific.module:LOGLEVEL` syntax (e.g. `--log-level swh.core:DEBUG` " + "will enable DEBUG logging for swh.core)." + ), ) @click.option( "--log-config", default=None, type=click.File("r"), help="Python yaml logging configuration file.", ) @click.option( "--sentry-dsn", default=None, help="DSN of the Sentry instance to report to" ) @click.option( "--sentry-debug/--no-sentry-debug", default=False, hidden=True, help="Enable debugging of sentry", ) @click.pass_context -def swh(ctx, log_level, log_config, sentry_dsn, sentry_debug): +def swh(ctx, log_levels, log_config, sentry_dsn, sentry_debug): """Command line interface for Software Heritage. """ import signal import yaml from ..sentry import init_sentry signal.signal(signal.SIGTERM, clean_exit_on_signal) signal.signal(signal.SIGINT, clean_exit_on_signal) init_sentry(sentry_dsn, debug=sentry_debug) - if log_level is None and log_config is None: - log_level = "INFO" + set_default_loglevel: Optional[str] = None if log_config: logging.config.dictConfig(yaml.safe_load(log_config.read())) + set_default_loglevel = logging.root.getEffectiveLevel() + + if not log_levels: + log_levels = [] - if log_level: + for module, log_level in log_levels: + logger = logging.getLogger(module) log_level = logging.getLevelName(log_level) - logging.root.setLevel(log_level) + logger.setLevel(log_level) + + if module is None: + set_default_loglevel = log_level + + if not set_default_loglevel: + logging.root.setLevel("INFO") + set_default_loglevel = "INFO" ctx.ensure_object(dict) - ctx.obj["log_level"] = log_level + ctx.obj["log_level"] = set_default_loglevel def main(): # Even though swh() sets up logging, we need an earlier basic logging setup # for the next few logging statements logging.basicConfig() # load plugins that define cli sub commands for entry_point in pkg_resources.iter_entry_points("swh.cli.subcommands"): try: cmd = entry_point.load() if isinstance(cmd, click.BaseCommand): # for BW compat, auto add click commands warnings.warn( f"{entry_point.name}: automagic addition of click commands " f"to the main swh group is deprecated", DeprecationWarning, ) swh.add_command(cmd, name=entry_point.name) # otherwise it's expected to be a module which has been loaded # it's the responsibility of the click commands/groups in this # module to transitively have the main swh group as parent. except Exception as e: logger.warning("Could not load subcommand %s: %s", entry_point.name, str(e)) return swh(auto_envvar_prefix="SWH") if __name__ == "__main__": main() diff --git a/swh/core/tests/test_cli.py b/swh/core/tests/test_cli.py index ca40fbf..0eb86e4 100644 --- a/swh/core/tests/test_cli.py +++ b/swh/core/tests/test_cli.py @@ -1,315 +1,345 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import textwrap from typing import List from unittest.mock import patch import click from click.testing import CliRunner import pkg_resources import pytest help_msg_snippets = ( ( "Usage", ( "swh [OPTIONS] COMMAND [ARGS]...", "Command line interface for Software Heritage.", ), ), ("Options", ("-l, --log-level", "--log-config", "--sentry-dsn", "-h, --help",)), ) def get_section(cli_output: str, section: str) -> List[str]: """Get the given `section` of the `cli_output`""" result = [] in_section = False for line in cli_output.splitlines(): if not line: continue if in_section: if not line.startswith(" "): break else: if line.startswith(section): in_section = True if in_section: result.append(line) return result def assert_section_contains(cli_output: str, section: str, snippet: str) -> bool: """Check that a given `section` of the `cli_output` contains the given `snippet`""" section_lines = get_section(cli_output, section) assert section_lines, "Section %s not found in output %r" % (section, cli_output) for line in section_lines: if snippet in line: return True else: assert False, "%r not found in section %r of output %r" % ( snippet, section, cli_output, ) def test_swh_help(swhmain): runner = CliRunner() result = runner.invoke(swhmain, ["-h"]) assert result.exit_code == 0 for section, snippets in help_msg_snippets: for snippet in snippets: assert_section_contains(result.output, section, snippet) result = runner.invoke(swhmain, ["--help"]) assert result.exit_code == 0 for section, snippets in help_msg_snippets: for snippet in snippets: assert_section_contains(result.output, section, snippet) def test_command(swhmain): @swhmain.command(name="test") @click.pass_context def swhtest(ctx): click.echo("Hello SWH!") runner = CliRunner() with patch("sentry_sdk.init") as sentry_sdk_init: result = runner.invoke(swhmain, ["test"]) sentry_sdk_init.assert_not_called() assert result.exit_code == 0 assert result.output.strip() == "Hello SWH!" def test_loglevel_default(caplog, swhmain): @swhmain.command(name="test") @click.pass_context def swhtest(ctx): assert logging.root.level == 20 click.echo("Hello SWH!") runner = CliRunner() result = runner.invoke(swhmain, ["test"]) assert result.exit_code == 0 assert result.output.strip() == """Hello SWH!""" def test_loglevel_error(caplog, swhmain): @swhmain.command(name="test") @click.pass_context def swhtest(ctx): assert logging.root.level == 40 click.echo("Hello SWH!") runner = CliRunner() result = runner.invoke(swhmain, ["-l", "ERROR", "test"]) assert result.exit_code == 0 assert result.output.strip() == """Hello SWH!""" def test_loglevel_debug(caplog, swhmain): @swhmain.command(name="test") @click.pass_context def swhtest(ctx): assert logging.root.level == 10 click.echo("Hello SWH!") runner = CliRunner() result = runner.invoke(swhmain, ["-l", "DEBUG", "test"]) assert result.exit_code == 0 assert result.output.strip() == """Hello SWH!""" def test_sentry(swhmain): @swhmain.command(name="test") @click.pass_context def swhtest(ctx): click.echo("Hello SWH!") runner = CliRunner() with patch("sentry_sdk.init") as sentry_sdk_init: result = runner.invoke(swhmain, ["--sentry-dsn", "test_dsn", "test"]) assert result.exit_code == 0 assert result.output.strip() == """Hello SWH!""" sentry_sdk_init.assert_called_once_with( dsn="test_dsn", debug=False, integrations=[], release=None, environment=None, ) def test_sentry_debug(swhmain): @swhmain.command(name="test") @click.pass_context def swhtest(ctx): click.echo("Hello SWH!") runner = CliRunner() with patch("sentry_sdk.init") as sentry_sdk_init: result = runner.invoke( swhmain, ["--sentry-dsn", "test_dsn", "--sentry-debug", "test"] ) assert result.exit_code == 0 assert result.output.strip() == """Hello SWH!""" sentry_sdk_init.assert_called_once_with( dsn="test_dsn", debug=True, integrations=[], release=None, environment=None, ) def test_sentry_env(swhmain): @swhmain.command(name="test") @click.pass_context def swhtest(ctx): click.echo("Hello SWH!") runner = CliRunner() with patch("sentry_sdk.init") as sentry_sdk_init: env = { "SWH_SENTRY_DSN": "test_dsn", "SWH_SENTRY_DEBUG": "1", } result = runner.invoke(swhmain, ["test"], env=env, auto_envvar_prefix="SWH") assert result.exit_code == 0 assert result.output.strip() == """Hello SWH!""" sentry_sdk_init.assert_called_once_with( dsn="test_dsn", debug=True, integrations=[], release=None, environment=None, ) def test_sentry_env_main_package(swhmain): @swhmain.command(name="test") @click.pass_context def swhtest(ctx): click.echo("Hello SWH!") runner = CliRunner() with patch("sentry_sdk.init") as sentry_sdk_init: env = { "SWH_SENTRY_DSN": "test_dsn", "SWH_MAIN_PACKAGE": "swh.core", "SWH_SENTRY_ENVIRONMENT": "tests", } result = runner.invoke(swhmain, ["test"], env=env, auto_envvar_prefix="SWH") assert result.exit_code == 0 version = pkg_resources.get_distribution("swh.core").version assert result.output.strip() == """Hello SWH!""" sentry_sdk_init.assert_called_once_with( dsn="test_dsn", debug=False, integrations=[], release="swh.core@" + version, environment="tests", ) @pytest.fixture def log_config_path(tmp_path): log_config = textwrap.dedent( """\ --- version: 1 formatters: formatter: format: 'custom format:%(name)s:%(levelname)s:%(message)s' handlers: console: class: logging.StreamHandler stream: ext://sys.stdout formatter: formatter level: DEBUG root: level: DEBUG handlers: - console loggers: dontshowdebug: level: INFO """ ) (tmp_path / "log_config.yml").write_text(log_config) yield str(tmp_path / "log_config.yml") -def test_log_config(caplog, log_config_path, swhmain): +def test_log_config(log_config_path, swhmain): @swhmain.command(name="test") @click.pass_context def swhtest(ctx): logging.debug("Root log debug") logging.info("Root log info") logging.getLogger("dontshowdebug").debug("Not shown") logging.getLogger("dontshowdebug").info("Shown") runner = CliRunner() result = runner.invoke(swhmain, ["--log-config", log_config_path, "test",],) assert result.exit_code == 0 assert result.output.strip() == "\n".join( [ "custom format:root:DEBUG:Root log debug", "custom format:root:INFO:Root log info", "custom format:dontshowdebug:INFO:Shown", ] ) -def test_log_config_log_level_interaction(caplog, log_config_path, swhmain): +def test_log_config_log_level_interaction(log_config_path, swhmain): @swhmain.command(name="test") @click.pass_context def swhtest(ctx): logging.debug("Root log debug") logging.info("Root log info") logging.getLogger("dontshowdebug").debug("Not shown") logging.getLogger("dontshowdebug").info("Shown") runner = CliRunner() result = runner.invoke( swhmain, ["--log-config", log_config_path, "--log-level", "INFO", "test",], ) assert result.exit_code == 0 assert result.output.strip() == "\n".join( [ "custom format:root:INFO:Root log info", "custom format:dontshowdebug:INFO:Shown", ] ) +def test_multiple_log_level_behavior(swhmain): + @swhmain.command(name="test") + @click.pass_context + def swhtest(ctx): + assert logging.getLevelName(logging.root.level) == "DEBUG" + assert logging.getLevelName(logging.getLogger("dontshowdebug").level) == "INFO" + return 0 + + runner = CliRunner() + result = runner.invoke( + swhmain, ["--log-level", "DEBUG", "--log-level", "dontshowdebug:INFO", "test",] + ) + + assert result.exit_code == 0, result.output + + +def test_invalid_log_level(swhmain): + runner = CliRunner() + result = runner.invoke(swhmain, ["--log-level", "broken:broken:DEBUG"]) + + assert result.exit_code != 0 + assert "Invalid log level specification" in result.output + + runner = CliRunner() + result = runner.invoke(swhmain, ["--log-level", "UNKNOWN"]) + + assert result.exit_code != 0 + assert "Log level UNKNOWN unknown" in result.output + + def test_aliased_command(swhmain): @swhmain.command(name="canonical-test") @click.pass_context def swhtest(ctx): "A test command." click.echo("Hello SWH!") swhmain.add_alias(swhtest, "othername") runner = CliRunner() # check we have only 'canonical-test' listed in the usage help msg result = runner.invoke(swhmain, ["-h"]) assert result.exit_code == 0 assert "canonical-test A test command." in result.output assert "othername" not in result.output # check we can execute the cmd with 'canonical-test' result = runner.invoke(swhmain, ["canonical-test"]) assert result.exit_code == 0 assert result.output.strip() == """Hello SWH!""" # check we can also execute the cmd with the alias 'othername' result = runner.invoke(swhmain, ["othername"]) assert result.exit_code == 0 assert result.output.strip() == """Hello SWH!"""