diff --git a/swh/core/config.py b/swh/core/config.py index 7673252..f1aebe6 100644 --- a/swh/core/config.py +++ b/swh/core/config.py @@ -1,78 +1,95 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import configparser import os # conversion per type _map_convert_fn = { 'int': int, 'bool': lambda x: x.lower() == 'true', 'list[str]': lambda x: [value.strip() for value in x.split(',')], 'list[int]': lambda x: [int(value.strip()) for value in x.split(',')], } def read(conf_file=None, default_conf=None): """Read the user's configuration file. Fill in the gap using `default_conf`. `default_conf` is similar to this: DEFAULT_CONF = { 'a': ('string', '/tmp/swh-loader-git/log'), 'b': ('string', 'dbname=swhloadergit') 'c': ('bool', true) 'e': ('bool', None) 'd': ('int', 10) } If conf_file is None, return the default config. """ conf = {} if conf_file: config_path = os.path.expanduser(conf_file) if os.path.exists(config_path): config = configparser.ConfigParser(defaults=default_conf) config.read(os.path.expanduser(conf_file)) if 'main' in config._sections: conf = config._sections['main'] if not default_conf: default_conf = {} # remaining missing default configuration key are set # also type conversion is enforced for underneath layer for key in default_conf: nature_type, default_value = default_conf[key] val = conf.get(key, None) if not val: # fallback to default value conf[key] = default_value else: # value present but in string format, force type conversion conf[key] = _map_convert_fn.get(nature_type, lambda x: x)(val) return conf +def priority_read(conf_filenames, default_conf=None): + """Try reading the configuration files from conf_filenames, in order, + and return the configuration from the first one that exists. + + default_conf has the same specification as it does in read. + """ + + # Try all the files in order + for filename in conf_filenames: + full_filename = os.path.expanduser(filename) + if os.path.exists(full_filename): + return read(full_filename, default_conf) + + # Else, return the default configuration + return read(None, default_conf) + + def merge_default_configs(base_config, *other_configs): """Merge several default config dictionaries, from left to right""" full_config = base_config.copy() for config in other_configs: full_config.update(config) return full_config def prepare_folders(conf, *keys): """Prepare the folder mentioned in config under keys. """ def makedir(folder): if not os.path.exists(folder): os.makedirs(folder) for key in keys: makedir(conf[key]) diff --git a/swh/core/tests/test_config.py b/swh/core/tests/test_config.py index 504a057..610148e 100644 --- a/swh/core/tests/test_config.py +++ b/swh/core/tests/test_config.py @@ -1,145 +1,176 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tempfile import unittest import os import shutil from nose.tools import istest from swh.core import config class ConfReaderTest(unittest.TestCase): @classmethod def setUpClass(cls): # create a temporary folder cls.tmpdir = tempfile.mkdtemp(prefix='test-swh-core.') cls.conffile = os.path.join(cls.tmpdir, 'config.ini') with open(cls.conffile, 'w') as conf: conf.write("""[main] a = 1 b = this is a string c = true ls = list, of, strings li = 1, 2, 3, 4 """) cls.non_existing_conffile = os.path.join(cls.tmpdir, 'config-nonexisting.ini') cls.empty_conffile = os.path.join(cls.tmpdir, 'empty.ini') open(cls.empty_conffile, 'w').close() cls.default_conf = { 'a': ('int', 2), 'b': ('string', 'default-string'), 'c': ('bool', True), 'd': ('int', 10), 'e': ('int', None), 'f': ('bool', None), 'g': ('string', None), 'ls': ('list[str]', ['a', 'b', 'c']), 'li': ('list[int]', [42, 43]), } cls.other_default_conf = { 'a': ('int', 3), } cls.full_default_conf = cls.default_conf.copy() cls.full_default_conf['a'] = cls.other_default_conf['a'] cls.parsed_default_conf = { key: value for key, (type, value) in cls.default_conf.items() } + cls.parsed_conffile = { + 'a': 1, + 'b': 'this is a string', + 'c': True, + 'd': 10, + 'e': None, + 'f': None, + 'g': None, + 'ls': ['list', 'of', 'strings'], + 'li': [1, 2, 3, 4], + } + @classmethod def tearDownClass(cls): shutil.rmtree(cls.tmpdir) @istest def read(self): # when res = config.read(self.conffile, self.default_conf) # then - self.assertEquals(res, { - 'a': 1, - 'b': 'this is a string', - 'c': True, - 'd': 10, - 'e': None, - 'f': None, - 'g': None, - 'ls': ['list', 'of', 'strings'], - 'li': [1, 2, 3, 4], - }) + self.assertEquals(res, self.parsed_conffile) @istest def read_empty_file(self): # when res = config.read(None, self.default_conf) # then self.assertEquals(res, self.parsed_default_conf) @istest def support_non_existing_conffile(self): # when res = config.read(self.non_existing_conffile, self.default_conf) # then self.assertEquals(res, self.parsed_default_conf) @istest def support_empty_conffile(self): # when res = config.read(self.empty_conffile, self.default_conf) # then self.assertEquals(res, self.parsed_default_conf) @istest def merge_default_configs(self): # when res = config.merge_default_configs(self.default_conf, self.other_default_conf) # then self.assertEquals(res, self.full_default_conf) + @istest + def priority_read(self): + # when + res = config.priority_read([self.non_existing_conffile, self.conffile], + self.default_conf) + + # then + self.assertEquals(res, self.parsed_conffile) + + # when + res = config.priority_read([ + self.conffile, + self.non_existing_conffile, + self.empty_conffile, + ], self.default_conf) + + # then + self.assertEquals(res, self.parsed_conffile) + + # when + res = config.priority_read([ + self.empty_conffile, + self.conffile, + self.non_existing_conffile, + ], self.default_conf) + + # then + self.assertEquals(res, self.parsed_default_conf) + @istest def prepare_folder(self): # given conf = {'path1': os.path.join(self.tmpdir, 'path1'), 'path2': os.path.join(self.tmpdir, 'path2', 'depth1')} # the folders does not exists self.assertFalse(os.path.exists(conf['path1']), "path1 should not exist.") self.assertFalse(os.path.exists(conf['path2']), "path2 should not exist.") # when config.prepare_folders(conf, 'path1') # path1 exists but not path2 self.assertTrue(os.path.exists(conf['path1']), "path1 should now exist!") self.assertFalse(os.path.exists(conf['path2']), "path2 should not exist.") # path1 already exists, skips it but creates path2 config.prepare_folders(conf, 'path1', 'path2') self.assertTrue(os.path.exists(conf['path1']), "path1 should still exist!") self.assertTrue(os.path.exists(conf['path2']), "path2 should now exist.")