diff --git a/swh/core/config.py b/swh/core/config.py index 4c7fcc7..bcf124f 100644 --- a/swh/core/config.py +++ b/swh/core/config.py @@ -1,364 +1,353 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import configparser import logging import os import yaml from itertools import chain from copy import deepcopy -from typing import Any, Dict, Optional, Tuple +from typing import Any, Callable, Dict, Optional, Tuple logger = logging.getLogger(__name__) SWH_CONFIG_DIRECTORIES = [ "~/.config/swh", "~/.swh", "/etc/softwareheritage", ] -SWH_GLOBAL_CONFIG = "global.ini" +SWH_GLOBAL_CONFIG = "global.yml" SWH_DEFAULT_GLOBAL_CONFIG = { "max_content_size": ("int", 100 * 1024 * 1024), - "log_db": ("str", "dbname=softwareheritage-log"), } SWH_CONFIG_EXTENSIONS = [ ".yml", - ".ini", ] # conversion per type -_map_convert_fn = { +_map_convert_fn: Dict[str, Callable] = { "int": int, "bool": lambda x: x.lower() == "true", "list[str]": lambda x: [value.strip() for value in x.split(",")], "list[int]": lambda x: [int(value.strip()) for value in x.split(",")], } -_map_check_fn = { +_map_check_fn: Dict[str, Callable] = { "int": lambda x: isinstance(x, int), "bool": lambda x: isinstance(x, bool), "list[str]": lambda x: (isinstance(x, list) and all(isinstance(y, str) for y in x)), "list[int]": lambda x: (isinstance(x, list) and all(isinstance(y, int) for y in x)), } def exists_accessible(file): """Check whether a file exists, and is accessible. Returns: True if the file exists and is accessible False if the file does not exist Raises: PermissionError if the file cannot be read. """ try: os.stat(file) except PermissionError: raise except FileNotFoundError: return False else: if os.access(file, os.R_OK): return True else: raise PermissionError("Permission denied: %r" % file) -def config_basepath(config_path): +def config_basepath(config_path: str) -> str: """Return the base path of a configuration file""" - if config_path.endswith((".ini", ".yml")): + if config_path.endswith(".yml"): return config_path[:-4] return config_path -def read_raw_config(base_config_path): +def read_raw_config(base_config_path: str) -> Dict[str, Any]: """Read the raw config corresponding to base_config_path. - Can read yml or ini files. + Can read yml files. """ - yml_file = base_config_path + ".yml" + yml_file = f"{base_config_path}.yml" if exists_accessible(yml_file): logger.info("Loading config file %s", yml_file) with open(yml_file) as f: return yaml.safe_load(f) - ini_file = base_config_path + ".ini" - if exists_accessible(ini_file): - config = configparser.ConfigParser() - config.read(ini_file) - if "main" in config._sections: - logger.info("Loading config file %s", ini_file) - return config._sections["main"] - else: - logger.warning("Ignoring config file %s (no [main] section)", ini_file) - return {} def config_exists(config_path): """Check whether the given config exists""" basepath = config_basepath(config_path) return any( exists_accessible(basepath + extension) for extension in SWH_CONFIG_EXTENSIONS ) -def read(conf_file=None, default_conf=None): +def read( + conf_file: Optional[str] = None, + default_conf: Optional[Dict[str, Tuple[str, Any]]] = None, +) -> Dict[str, Any]: """Read the user's configuration file. Fill in the gap using `default_conf`. `default_conf` is similar to this:: DEFAULT_CONF = { 'a': ('str', '/tmp/swh-loader-git/log'), 'b': ('str', 'dbname=swhloadergit') 'c': ('bool', true) 'e': ('bool', None) 'd': ('int', 10) } If conf_file is None, return the default config. """ - conf = {} + conf: Dict[str, Any] = {} if conf_file: base_config_path = config_basepath(os.path.expanduser(conf_file)) - conf = read_raw_config(base_config_path) + conf = read_raw_config(base_config_path) or {} if not default_conf: - default_conf = {} + return conf # remaining missing default configuration key are set # also type conversion is enforced for underneath layer - for key in default_conf: - nature_type, default_value = default_conf[key] + for key, (nature_type, default_value) in default_conf.items(): val = conf.get(key, None) if val is None: # fallback to default value conf[key] = default_value elif not _map_check_fn.get(nature_type, lambda x: True)(val): # value present but not in the proper format, force type conversion conf[key] = _map_convert_fn.get(nature_type, lambda x: x)(val) return conf def priority_read(conf_filenames, default_conf=None): """Try reading the configuration files from conf_filenames, in order, and return the configuration from the first one that exists. default_conf has the same specification as it does in read. """ # Try all the files in order for filename in conf_filenames: full_filename = os.path.expanduser(filename) if config_exists(full_filename): return read(full_filename, default_conf) # Else, return the default configuration return read(None, default_conf) def merge_default_configs(base_config, *other_configs): """Merge several default config dictionaries, from left to right""" full_config = base_config.copy() for config in other_configs: full_config.update(config) return full_config def merge_configs(base, other): """Merge two config dictionaries This does merge config dicts recursively, with the rules, for every value of the dicts (with 'val' not being a dict): - None + type -> type - type + None -> None - dict + dict -> dict (merged) - val + dict -> TypeError - dict + val -> TypeError - val + val -> val (other) for instance: >>> d1 = { ... 'key1': { ... 'skey1': 'value1', ... 'skey2': {'sskey1': 'value2'}, ... }, ... 'key2': 'value3', ... } with >>> d2 = { ... 'key1': { ... 'skey1': 'value4', ... 'skey2': {'sskey2': 'value5'}, ... }, ... 'key3': 'value6', ... } will give: >>> d3 = { ... 'key1': { ... 'skey1': 'value4', # <-- note this ... 'skey2': { ... 'sskey1': 'value2', ... 'sskey2': 'value5', ... }, ... }, ... 'key2': 'value3', ... 'key3': 'value6', ... } >>> assert merge_configs(d1, d2) == d3 Note that no type checking is done for anything but dicts. """ if not isinstance(base, dict) or not isinstance(other, dict): raise TypeError("Cannot merge a %s with a %s" % (type(base), type(other))) output = {} allkeys = set(chain(base.keys(), other.keys())) for k in allkeys: vb = base.get(k) vo = other.get(k) if isinstance(vo, dict): output[k] = merge_configs(vb is not None and vb or {}, vo) elif isinstance(vb, dict) and k in other and other[k] is not None: output[k] = merge_configs(vb, vo is not None and vo or {}) elif k in other: output[k] = deepcopy(vo) else: output[k] = deepcopy(vb) return output def swh_config_paths(base_filename): """Return the Software Heritage specific configuration paths for the given filename.""" return [os.path.join(dirname, base_filename) for dirname in SWH_CONFIG_DIRECTORIES] def prepare_folders(conf, *keys): """Prepare the folder mentioned in config under keys. """ def makedir(folder): if not os.path.exists(folder): os.makedirs(folder) for key in keys: makedir(conf[key]) def load_global_config(): """Load the global Software Heritage config""" return priority_read( swh_config_paths(SWH_GLOBAL_CONFIG), SWH_DEFAULT_GLOBAL_CONFIG, ) def load_named_config(name, default_conf=None, global_conf=True): """Load the config named `name` from the Software Heritage configuration paths. If global_conf is True (default), read the global configuration too. """ conf = {} if global_conf: conf.update(load_global_config()) conf.update(priority_read(swh_config_paths(name), default_conf)) return conf class SWHConfig: """Mixin to add configuration parsing abilities to classes The class should override the class attributes: - DEFAULT_CONFIG (default configuration to be parsed) - CONFIG_BASE_FILENAME (the filename of the configuration to be used) This class defines one classmethod, parse_config_file, which parses a configuration file using the default config as set in the class attribute. """ DEFAULT_CONFIG = {} # type: Dict[str, Tuple[str, Any]] CONFIG_BASE_FILENAME = "" # type: Optional[str] @classmethod def parse_config_file( cls, base_filename=None, config_filename=None, additional_configs=None, global_config=True, ): """Parse the configuration file associated to the current class. By default, parse_config_file will load the configuration cls.CONFIG_BASE_FILENAME from one of the Software Heritage configuration directories, in order, unless it is overridden by base_filename or config_filename (which shortcuts the file lookup completely). Args: - base_filename (str): overrides the default cls.CONFIG_BASE_FILENAME - config_filename (str): sets the file to parse instead of the defaults set from cls.CONFIG_BASE_FILENAME - additional_configs: (list of default configuration dicts) allows to override or extend the configuration set in cls.DEFAULT_CONFIG. - global_config (bool): Load the global configuration (default: True) """ if config_filename: config_filenames = [config_filename] elif "SWH_CONFIG_FILENAME" in os.environ: config_filenames = [os.environ["SWH_CONFIG_FILENAME"]] else: if not base_filename: base_filename = cls.CONFIG_BASE_FILENAME config_filenames = swh_config_paths(base_filename) if not additional_configs: additional_configs = [] full_default_config = merge_default_configs( cls.DEFAULT_CONFIG, *additional_configs ) config = {} if global_config: config = load_global_config() config.update(priority_read(config_filenames, full_default_config)) return config diff --git a/swh/core/tests/test_config.py b/swh/core/tests/test_config.py index 973b98d..6ee50d5 100644 --- a/swh/core/tests/test_config.py +++ b/swh/core/tests/test_config.py @@ -1,314 +1,328 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os -import shutil import pytest +import shutil import pkg_resources.extern.packaging.version +import yaml + from swh.core import config pytest_v = pkg_resources.get_distribution("pytest").parsed_version if pytest_v < pkg_resources.extern.packaging.version.parse("3.9"): @pytest.fixture def tmp_path(request): import tempfile import pathlib with tempfile.TemporaryDirectory() as tmpdir: yield pathlib.Path(tmpdir) default_conf = { "a": ("int", 2), "b": ("string", "default-string"), "c": ("bool", True), "d": ("int", 10), "e": ("int", None), "f": ("bool", None), "g": ("string", None), "h": ("bool", True), "i": ("bool", True), "ls": ("list[str]", ["a", "b", "c"]), "li": ("list[int]", [42, 43]), } other_default_conf = { "a": ("int", 3), } full_default_conf = default_conf.copy() full_default_conf["a"] = other_default_conf["a"] parsed_default_conf = {key: value for key, (type, value) in default_conf.items()} parsed_conffile = { "a": 1, "b": "this is a string", "c": True, "d": 10, "e": None, "f": None, "g": None, "h": False, "i": True, "ls": ["list", "of", "strings"], "li": [1, 2, 3, 4], } @pytest.fixture def swh_config(tmp_path): # create a temporary folder - conffile = tmp_path / "config.ini" - conf_contents = """[main] -a = 1 -b = this is a string -c = true -h = false -ls = list, of, strings -li = 1, 2, 3, 4 + conffile = tmp_path / "config.yml" + conf_contents = """ +a: 1 +b: this is a string +c: true +h: false +ls: list, of, strings +li: 1, 2, 3, 4 """ conffile.open("w").write(conf_contents) return conffile @pytest.fixture def swh_config_unreadable(swh_config): # Create an unreadable, proper configuration file os.chmod(str(swh_config), 0o000) yield swh_config # Make the broken perms file readable again to be able to remove them os.chmod(str(swh_config), 0o644) @pytest.fixture def swh_config_unreadable_dir(swh_config): # Create a proper configuration file in an unreadable directory perms_broken_dir = swh_config.parent / "unreadabledir" perms_broken_dir.mkdir() shutil.move(str(swh_config), str(perms_broken_dir)) os.chmod(str(perms_broken_dir), 0o000) yield perms_broken_dir / swh_config.name # Make the broken perms items readable again to be able to remove them os.chmod(str(perms_broken_dir), 0o755) @pytest.fixture def swh_config_empty(tmp_path): # create a temporary folder - conffile = tmp_path / "config.ini" + conffile = tmp_path / "config.yml" conffile.touch() return conffile def test_read(swh_config): # when res = config.read(str(swh_config), default_conf) # then assert res == parsed_conffile +def test_read_no_default_conf(swh_config): + """If no default config if provided to read, this should directly parse the config file + yaml + + """ + config_path = str(swh_config) + actual_config = config.read(config_path) + with open(config_path) as f: + expected_config = yaml.safe_load(f) + assert actual_config == expected_config + + def test_read_empty_file(): # when res = config.read(None, default_conf) # then assert res == parsed_default_conf def test_support_non_existing_conffile(tmp_path): # when - res = config.read(str(tmp_path / "void.ini"), default_conf) + res = config.read(str(tmp_path / "void.yml"), default_conf) # then assert res == parsed_default_conf def test_support_empty_conffile(swh_config_empty): # when res = config.read(str(swh_config_empty), default_conf) # then assert res == parsed_default_conf def test_raise_on_broken_directory_perms(swh_config_unreadable_dir): with pytest.raises(PermissionError): config.read(str(swh_config_unreadable_dir), default_conf) def test_raise_on_broken_file_perms(swh_config_unreadable): with pytest.raises(PermissionError): config.read(str(swh_config_unreadable), default_conf) def test_merge_default_configs(): # when res = config.merge_default_configs(default_conf, other_default_conf) # then assert res == full_default_conf def test_priority_read_nonexist_conf(swh_config): - noexist = str(swh_config.parent / "void.ini") + noexist = str(swh_config.parent / "void.yml") # when res = config.priority_read([noexist, str(swh_config)], default_conf) # then assert res == parsed_conffile def test_priority_read_conf_nonexist_empty(swh_config): - noexist = swh_config.parent / "void.ini" - empty = swh_config.parent / "empty.ini" + noexist = swh_config.parent / "void.yml" + empty = swh_config.parent / "empty.yml" empty.touch() # when res = config.priority_read( [str(p) for p in (swh_config, noexist, empty)], default_conf ) # then assert res == parsed_conffile def test_priority_read_empty_conf_nonexist(swh_config): - noexist = swh_config.parent / "void.ini" - empty = swh_config.parent / "empty.ini" + noexist = swh_config.parent / "void.yml" + empty = swh_config.parent / "empty.yml" empty.touch() # when res = config.priority_read( [str(p) for p in (empty, swh_config, noexist)], default_conf ) # then assert res == parsed_default_conf def test_swh_config_paths(): - res = config.swh_config_paths("foo/bar.ini") + res = config.swh_config_paths("foo/bar.yml") assert res == [ - "~/.config/swh/foo/bar.ini", - "~/.swh/foo/bar.ini", - "/etc/softwareheritage/foo/bar.ini", + "~/.config/swh/foo/bar.yml", + "~/.swh/foo/bar.yml", + "/etc/softwareheritage/foo/bar.yml", ] def test_prepare_folder(tmp_path): # given conf = { "path1": str(tmp_path / "path1"), "path2": str(tmp_path / "path2" / "depth1"), } # the folders does not exists assert not os.path.exists(conf["path1"]), "path1 should not exist." assert not os.path.exists(conf["path2"]), "path2 should not exist." # when config.prepare_folders(conf, "path1") # path1 exists but not path2 assert os.path.exists(conf["path1"]), "path1 should now exist!" assert not os.path.exists(conf["path2"]), "path2 should not exist." # path1 already exists, skips it but creates path2 config.prepare_folders(conf, "path1", "path2") assert os.path.exists(conf["path1"]), "path1 should still exist!" assert os.path.exists(conf["path2"]), "path2 should now exist." def test_merge_config(): cfg_a = { "a": 42, "b": [1, 2, 3], "c": None, "d": {"gheez": 27}, "e": { "ea": "Mr. Bungle", "eb": None, "ec": [11, 12, 13], "ed": {"eda": "Secret Chief 3", "edb": "Faith No More"}, "ee": 451, }, "f": "Janis", } cfg_b = { "a": 43, "b": [41, 42, 43], "c": "Tom Waits", "d": None, "e": { "ea": "Igorrr", "ec": [51, 52], "ed": {"edb": "Sleepytime Gorilla Museum", "edc": "Nils Peter Molvaer"}, }, "g": "Hüsker Dü", } # merge A, B cfg_m = config.merge_configs(cfg_a, cfg_b) assert cfg_m == { "a": 43, # b takes precedence "b": [41, 42, 43], # b takes precedence "c": "Tom Waits", # b takes precedence "d": None, # b['d'] takes precedence (explicit None) "e": { "ea": "Igorrr", # a takes precedence "eb": None, # only in a "ec": [51, 52], # b takes precedence "ed": { "eda": "Secret Chief 3", # only in a "edb": "Sleepytime Gorilla Museum", # b takes precedence "edc": "Nils Peter Molvaer", }, # only defined in b "ee": 451, }, "f": "Janis", # only defined in a "g": "Hüsker Dü", # only defined in b } # merge B, A cfg_m = config.merge_configs(cfg_b, cfg_a) assert cfg_m == { "a": 42, # a takes precedence "b": [1, 2, 3], # a takes precedence "c": None, # a takes precedence "d": {"gheez": 27}, # a takes precedence "e": { "ea": "Mr. Bungle", # a takes precedence "eb": None, # only defined in a "ec": [11, 12, 13], # a takes precedence "ed": { "eda": "Secret Chief 3", # only in a "edb": "Faith No More", # a takes precedence "edc": "Nils Peter Molvaer", }, # only in b "ee": 451, }, "f": "Janis", # only in a "g": "Hüsker Dü", # only in b } def test_merge_config_type_error(): for v in (1, "str", None): with pytest.raises(TypeError): config.merge_configs(v, {}) with pytest.raises(TypeError): config.merge_configs({}, v) for v in (1, "str"): with pytest.raises(TypeError): config.merge_configs({"a": v}, {"a": {}}) with pytest.raises(TypeError): config.merge_configs({"a": {}}, {"a": v})