diff --git a/swh/core/config.py b/swh/core/config.py index d53aaa5..ac0ac4d 100644 --- a/swh/core/config.py +++ b/swh/core/config.py @@ -1,284 +1,357 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import configparser import logging import os import yaml +from itertools import chain +from copy import deepcopy logger = logging.getLogger(__name__) SWH_CONFIG_DIRECTORIES = [ '~/.config/swh', '~/.swh', '/etc/softwareheritage', ] SWH_GLOBAL_CONFIG = 'global.ini' SWH_DEFAULT_GLOBAL_CONFIG = { 'content_size_limit': ('int', 100 * 1024 * 1024), 'log_db': ('str', 'dbname=softwareheritage-log'), } SWH_CONFIG_EXTENSIONS = [ '.yml', '.ini', ] # conversion per type _map_convert_fn = { 'int': int, 'bool': lambda x: x.lower() == 'true', 'list[str]': lambda x: [value.strip() for value in x.split(',')], 'list[int]': lambda x: [int(value.strip()) for value in x.split(',')], } _map_check_fn = { 'int': lambda x: isinstance(x, int), 'bool': lambda x: isinstance(x, bool), 'list[str]': lambda x: (isinstance(x, list) and all(isinstance(y, str) for y in x)), 'list[int]': lambda x: (isinstance(x, list) and all(isinstance(y, int) for y in x)), } def exists_accessible(file): """Check whether a file exists, and is accessible. Returns: True if the file exists and is accessible False if the file does not exist Raises: PermissionError if the file cannot be read. """ try: os.stat(file) except PermissionError: raise except FileNotFoundError: return False else: if os.access(file, os.R_OK): return True else: raise PermissionError("Permission denied: %r" % file) def config_basepath(config_path): """Return the base path of a configuration file""" if config_path.endswith(('.ini', '.yml')): return config_path[:-4] return config_path def read_raw_config(base_config_path): """Read the raw config corresponding to base_config_path. Can read yml or ini files. """ yml_file = base_config_path + '.yml' if exists_accessible(yml_file): logger.info('Loading config file %s', yml_file) with open(yml_file) as f: return yaml.safe_load(f) ini_file = base_config_path + '.ini' if exists_accessible(ini_file): config = configparser.ConfigParser() config.read(ini_file) if 'main' in config._sections: logger.info('Loading config file %s', ini_file) return config._sections['main'] else: logger.warning('Ignoring config file %s (no [main] section)', ini_file) return {} def config_exists(config_path): """Check whether the given config exists""" basepath = config_basepath(config_path) return any(exists_accessible(basepath + extension) for extension in SWH_CONFIG_EXTENSIONS) def read(conf_file=None, default_conf=None): """Read the user's configuration file. Fill in the gap using `default_conf`. `default_conf` is similar to this:: DEFAULT_CONF = { 'a': ('str', '/tmp/swh-loader-git/log'), 'b': ('str', 'dbname=swhloadergit') 'c': ('bool', true) 'e': ('bool', None) 'd': ('int', 10) } If conf_file is None, return the default config. """ conf = {} if conf_file: base_config_path = config_basepath(os.path.expanduser(conf_file)) conf = read_raw_config(base_config_path) if not default_conf: default_conf = {} # remaining missing default configuration key are set # also type conversion is enforced for underneath layer for key in default_conf: nature_type, default_value = default_conf[key] val = conf.get(key, None) if val is None: # fallback to default value conf[key] = default_value elif not _map_check_fn.get(nature_type, lambda x: True)(val): # value present but not in the proper format, force type conversion conf[key] = _map_convert_fn.get(nature_type, lambda x: x)(val) return conf def priority_read(conf_filenames, default_conf=None): """Try reading the configuration files from conf_filenames, in order, and return the configuration from the first one that exists. default_conf has the same specification as it does in read. """ # Try all the files in order for filename in conf_filenames: full_filename = os.path.expanduser(filename) if config_exists(full_filename): return read(full_filename, default_conf) # Else, return the default configuration return read(None, default_conf) def merge_default_configs(base_config, *other_configs): """Merge several default config dictionaries, from left to right""" full_config = base_config.copy() for config in other_configs: full_config.update(config) return full_config +def merge_configs(base, other): + """Merge two config dictionaries + + This does merge config dicts recursively, with the rules, for every value + of the dicts (with 'val' not being a dict): + + - None + type -> type + - type + None -> None + - dict + dict -> dict (merged) + - val + dict -> TypeError + - dict + val -> TypeError + - val + val -> val (other) + + so merging + + { + 'key1': { + 'skey1': value1, + 'skey2': {'sskey1': value2}, + }, + 'key2': value3, + } + + with + + { + 'key1': { + 'skey1': value4, + 'skey2': {'sskey2': value5}, + }, + 'key3': value6, + } + + will give: + + { + 'key1': { + 'skey1': value4, # <-- note this + 'skey2': { + 'sskey1': value2, + 'sskey2': value5, + }, + }, + 'key2': value3, + 'key3': value6, + } + + Note that no type checking is done for anything but dicts. + """ + if not isinstance(base, dict) or not isinstance(other, dict): + raise TypeError( + 'Cannot merge a %s with a %s' % (type(base), type(other))) + + output = {} + allkeys = set(chain(base.keys(), other.keys())) + for k in allkeys: + vb = base.get(k) + vo = other.get(k) + + if isinstance(vo, dict): + output[k] = merge_configs(vb is not None and vb or {}, vo) + elif isinstance(vb, dict) and k in other and other[k] is not None: + output[k] = merge_configs(vb, vo is not None and vo or {}) + elif k in other: + output[k] = deepcopy(vo) + else: + output[k] = deepcopy(vb) + + return output + + def swh_config_paths(base_filename): """Return the Software Heritage specific configuration paths for the given filename.""" return [os.path.join(dirname, base_filename) for dirname in SWH_CONFIG_DIRECTORIES] def prepare_folders(conf, *keys): """Prepare the folder mentioned in config under keys. """ def makedir(folder): if not os.path.exists(folder): os.makedirs(folder) for key in keys: makedir(conf[key]) def load_global_config(): """Load the global Software Heritage config""" return priority_read( swh_config_paths(SWH_GLOBAL_CONFIG), SWH_DEFAULT_GLOBAL_CONFIG, ) def load_named_config(name, default_conf=None, global_conf=True): """Load the config named `name` from the Software Heritage configuration paths. If global_conf is True (default), read the global configuration too. """ conf = {} if global_conf: conf.update(load_global_config()) conf.update(priority_read(swh_config_paths(name), default_conf)) return conf class SWHConfig: """Mixin to add configuration parsing abilities to classes The class should override the class attributes: - DEFAULT_CONFIG (default configuration to be parsed) - CONFIG_BASE_FILENAME (the filename of the configuration to be used) This class defines one classmethod, parse_config_file, which parses a configuration file using the default config as set in the class attribute. """ DEFAULT_CONFIG = {} CONFIG_BASE_FILENAME = '' @classmethod def parse_config_file(cls, base_filename=None, config_filename=None, additional_configs=None, global_config=True): """Parse the configuration file associated to the current class. By default, parse_config_file will load the configuration cls.CONFIG_BASE_FILENAME from one of the Software Heritage configuration directories, in order, unless it is overridden by base_filename or config_filename (which shortcuts the file lookup completely). Args: - base_filename (str) overrides the default cls.CONFIG_BASE_FILENAME - config_filename (str) sets the file to parse instead of the defaults set from cls.CONFIG_BASE_FILENAME - additional_configs (list of default configuration dicts) allows to override or extend the configuration set in cls.DEFAULT_CONFIG. - global_config (bool): Load the global configuration (default: True) """ if config_filename: config_filenames = [config_filename] else: if not base_filename: base_filename = cls.CONFIG_BASE_FILENAME config_filenames = swh_config_paths(base_filename) if not additional_configs: additional_configs = [] full_default_config = merge_default_configs(cls.DEFAULT_CONFIG, *additional_configs) config = {} if global_config: config = load_global_config() config.update(priority_read(config_filenames, full_default_config)) return config diff --git a/swh/core/tests/test_config.py b/swh/core/tests/test_config.py index fd3b700..8e5bbf8 100644 --- a/swh/core/tests/test_config.py +++ b/swh/core/tests/test_config.py @@ -1,225 +1,312 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import shutil import pytest import pkg_resources.extern.packaging.version from swh.core import config pytest_v = pkg_resources.get_distribution("pytest").parsed_version if pytest_v < pkg_resources.extern.packaging.version.parse('3.9'): @pytest.fixture def tmp_path(request): import tempfile import pathlib with tempfile.TemporaryDirectory() as tmpdir: yield pathlib.Path(tmpdir) default_conf = { 'a': ('int', 2), 'b': ('string', 'default-string'), 'c': ('bool', True), 'd': ('int', 10), 'e': ('int', None), 'f': ('bool', None), 'g': ('string', None), 'h': ('bool', True), 'i': ('bool', True), 'ls': ('list[str]', ['a', 'b', 'c']), 'li': ('list[int]', [42, 43]), } other_default_conf = { 'a': ('int', 3), } full_default_conf = default_conf.copy() full_default_conf['a'] = other_default_conf['a'] parsed_default_conf = { key: value for key, (type, value) in default_conf.items() } parsed_conffile = { 'a': 1, 'b': 'this is a string', 'c': True, 'd': 10, 'e': None, 'f': None, 'g': None, 'h': False, 'i': True, 'ls': ['list', 'of', 'strings'], 'li': [1, 2, 3, 4], } @pytest.fixture def swh_config(tmp_path): # create a temporary folder conffile = tmp_path / 'config.ini' conf_contents = """[main] a = 1 b = this is a string c = true h = false ls = list, of, strings li = 1, 2, 3, 4 """ conffile.open('w').write(conf_contents) return conffile @pytest.fixture def swh_config_unreadable(swh_config): # Create an unreadable, proper configuration file os.chmod(str(swh_config), 0o000) yield swh_config # Make the broken perms file readable again to be able to remove them os.chmod(str(swh_config), 0o644) @pytest.fixture def swh_config_unreadable_dir(swh_config): # Create a proper configuration file in an unreadable directory perms_broken_dir = swh_config.parent / 'unreadabledir' perms_broken_dir.mkdir() shutil.move(str(swh_config), str(perms_broken_dir)) os.chmod(str(perms_broken_dir), 0o000) yield perms_broken_dir / swh_config.name # Make the broken perms items readable again to be able to remove them os.chmod(str(perms_broken_dir), 0o755) @pytest.fixture def swh_config_empty(tmp_path): # create a temporary folder conffile = tmp_path / 'config.ini' conffile.touch() return conffile def test_read(swh_config): # when res = config.read(str(swh_config), default_conf) # then assert res == parsed_conffile def test_read_empty_file(): # when res = config.read(None, default_conf) # then assert res == parsed_default_conf def test_support_non_existing_conffile(tmp_path): # when res = config.read(str(tmp_path / 'void.ini'), default_conf) # then assert res == parsed_default_conf def test_support_empty_conffile(swh_config_empty): # when res = config.read(str(swh_config_empty), default_conf) # then assert res == parsed_default_conf def test_raise_on_broken_directory_perms(swh_config_unreadable_dir): with pytest.raises(PermissionError): config.read(str(swh_config_unreadable_dir), default_conf) def test_raise_on_broken_file_perms(swh_config_unreadable): with pytest.raises(PermissionError): config.read(str(swh_config_unreadable), default_conf) def test_merge_default_configs(): # when res = config.merge_default_configs(default_conf, other_default_conf) # then assert res == full_default_conf def test_priority_read_nonexist_conf(swh_config): noexist = str(swh_config.parent / 'void.ini') # when res = config.priority_read([noexist, str(swh_config)], default_conf) # then assert res == parsed_conffile def test_priority_read_conf_nonexist_empty(swh_config): noexist = swh_config.parent / 'void.ini' empty = swh_config.parent / 'empty.ini' empty.touch() # when res = config.priority_read([str(p) for p in ( swh_config, noexist, empty)], default_conf) # then assert res == parsed_conffile def test_priority_read_empty_conf_nonexist(swh_config): noexist = swh_config.parent / 'void.ini' empty = swh_config.parent / 'empty.ini' empty.touch() # when res = config.priority_read([str(p) for p in ( empty, swh_config, noexist)], default_conf) # then assert res == parsed_default_conf def test_swh_config_paths(): res = config.swh_config_paths('foo/bar.ini') assert res == [ '~/.config/swh/foo/bar.ini', '~/.swh/foo/bar.ini', '/etc/softwareheritage/foo/bar.ini', ] def test_prepare_folder(tmp_path): # given conf = {'path1': str(tmp_path / 'path1'), 'path2': str(tmp_path / 'path2' / 'depth1')} # the folders does not exists assert not os.path.exists(conf['path1']), "path1 should not exist." assert not os.path.exists(conf['path2']), "path2 should not exist." # when config.prepare_folders(conf, 'path1') # path1 exists but not path2 assert os.path.exists(conf['path1']), "path1 should now exist!" assert not os.path.exists(conf['path2']), "path2 should not exist." # path1 already exists, skips it but creates path2 config.prepare_folders(conf, 'path1', 'path2') assert os.path.exists(conf['path1']), "path1 should still exist!" assert os.path.exists(conf['path2']), "path2 should now exist." + + +def test_merge_config(): + cfg_a = { + 'a': 42, + 'b': [1, 2, 3], + 'c': None, + 'd': {'gheez': 27}, + 'e': { + 'ea': 'Mr. Bungle', + 'eb': None, + 'ec': [11, 12, 13], + 'ed': {'eda': 'Secret Chief 3', + 'edb': 'Faith No More'}, + 'ee': 451, + }, + 'f': 'Janis', + } + cfg_b = { + 'a': 43, + 'b': [41, 42, 43], + 'c': 'Tom Waits', + 'd': None, + 'e': { + 'ea': 'Igorrr', + 'ec': [51, 52], + 'ed': {'edb': 'Sleepytime Gorilla Museum', + 'edc': 'Nils Peter Molvaer'}, + }, + 'g': 'Hüsker Dü', + } + + # merge A, B + cfg_m = config.merge_configs(cfg_a, cfg_b) + assert cfg_m == { + 'a': 43, # b takes precedence + 'b': [41, 42, 43], # b takes precedence + 'c': 'Tom Waits', # b takes precedence + 'd': None, # b['d'] takes precedence (explicit None) + 'e': { + 'ea': 'Igorrr', # a takes precedence + 'eb': None, # only in a + 'ec': [51, 52], # b takes precedence + 'ed': { + 'eda': 'Secret Chief 3', # only in a + 'edb': 'Sleepytime Gorilla Museum', # b takes precedence + 'edc': 'Nils Peter Molvaer'}, # only defined in b + 'ee': 451, + }, + 'f': 'Janis', # only defined in a + 'g': 'Hüsker Dü', # only defined in b + } + + # merge B, A + cfg_m = config.merge_configs(cfg_b, cfg_a) + assert cfg_m == { + 'a': 42, # a takes precedence + 'b': [1, 2, 3], # a takes precedence + 'c': None, # a takes precedence + 'd': {'gheez': 27}, # a takes precedence + 'e': { + 'ea': 'Mr. Bungle', # a takes precedence + 'eb': None, # only defined in a + 'ec': [11, 12, 13], # a takes precedence + 'ed': { + 'eda': 'Secret Chief 3', # only in a + 'edb': 'Faith No More', # a takes precedence + 'edc': 'Nils Peter Molvaer'}, # only in b + 'ee': 451, + }, + 'f': 'Janis', # only in a + 'g': 'Hüsker Dü', # only in b + } + + +def test_merge_config_type_error(): + for v in (1, 'str', None): + with pytest.raises(TypeError): + config.merge_configs(v, {}) + with pytest.raises(TypeError): + config.merge_configs({}, v) + + for v in (1, 'str'): + with pytest.raises(TypeError): + config.merge_configs({'a': v}, {'a': {}}) + with pytest.raises(TypeError): + config.merge_configs({'a': {}}, {'a': v})