Changeset View
Changeset View
Standalone View
Standalone View
swh/core/config.py
# Copyright (C) 2015 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import configparser | |||||
import logging | import logging | ||||
import os | import os | ||||
import yaml | import yaml | ||||
from itertools import chain | from itertools import chain | ||||
from copy import deepcopy | from copy import deepcopy | ||||
from typing import Any, Dict, Optional, Tuple | from typing import Any, Callable, Dict, Optional, Tuple | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
SWH_CONFIG_DIRECTORIES = [ | SWH_CONFIG_DIRECTORIES = [ | ||||
"~/.config/swh", | "~/.config/swh", | ||||
"~/.swh", | "~/.swh", | ||||
"/etc/softwareheritage", | "/etc/softwareheritage", | ||||
] | ] | ||||
SWH_GLOBAL_CONFIG = "global.ini" | SWH_GLOBAL_CONFIG = "global.yml" | ||||
SWH_DEFAULT_GLOBAL_CONFIG = { | SWH_DEFAULT_GLOBAL_CONFIG = { | ||||
"max_content_size": ("int", 100 * 1024 * 1024), | "max_content_size": ("int", 100 * 1024 * 1024), | ||||
"log_db": ("str", "dbname=softwareheritage-log"), | |||||
} | } | ||||
ardumont: This needs to get dropped as well indeed!
[1] D3963#97708 | |||||
SWH_CONFIG_EXTENSIONS = [ | SWH_CONFIG_EXTENSIONS = [ | ||||
".yml", | ".yml", | ||||
".ini", | |||||
] | ] | ||||
# conversion per type | # conversion per type | ||||
_map_convert_fn = { | _map_convert_fn: Dict[str, Callable] = { | ||||
"int": int, | "int": int, | ||||
"bool": lambda x: x.lower() == "true", | "bool": lambda x: x.lower() == "true", | ||||
"list[str]": lambda x: [value.strip() for value in x.split(",")], | "list[str]": lambda x: [value.strip() for value in x.split(",")], | ||||
"list[int]": lambda x: [int(value.strip()) for value in x.split(",")], | "list[int]": lambda x: [int(value.strip()) for value in x.split(",")], | ||||
} | } | ||||
_map_check_fn = { | _map_check_fn: Dict[str, Callable] = { | ||||
"int": lambda x: isinstance(x, int), | "int": lambda x: isinstance(x, int), | ||||
"bool": lambda x: isinstance(x, bool), | "bool": lambda x: isinstance(x, bool), | ||||
"list[str]": lambda x: (isinstance(x, list) and all(isinstance(y, str) for y in x)), | "list[str]": lambda x: (isinstance(x, list) and all(isinstance(y, str) for y in x)), | ||||
"list[int]": lambda x: (isinstance(x, list) and all(isinstance(y, int) for y in x)), | "list[int]": lambda x: (isinstance(x, list) and all(isinstance(y, int) for y in x)), | ||||
} | } | ||||
def exists_accessible(file): | def exists_accessible(file): | ||||
Show All 15 Lines | except FileNotFoundError: | ||||
return False | return False | ||||
else: | else: | ||||
if os.access(file, os.R_OK): | if os.access(file, os.R_OK): | ||||
return True | return True | ||||
else: | else: | ||||
raise PermissionError("Permission denied: %r" % file) | raise PermissionError("Permission denied: %r" % file) | ||||
def config_basepath(config_path): | def config_basepath(config_path: str) -> str: | ||||
"""Return the base path of a configuration file""" | """Return the base path of a configuration file""" | ||||
if config_path.endswith((".ini", ".yml")): | if config_path.endswith(".yml"): | ||||
return config_path[:-4] | return config_path[:-4] | ||||
return config_path | return config_path | ||||
def read_raw_config(base_config_path): | def read_raw_config(base_config_path: str) -> Dict[str, Any]: | ||||
"""Read the raw config corresponding to base_config_path. | """Read the raw config corresponding to base_config_path. | ||||
Can read yml or ini files. | Can read yml files. | ||||
""" | """ | ||||
yml_file = base_config_path + ".yml" | yml_file = f"{base_config_path}.yml" | ||||
if exists_accessible(yml_file): | if exists_accessible(yml_file): | ||||
logger.info("Loading config file %s", yml_file) | logger.info("Loading config file %s", yml_file) | ||||
with open(yml_file) as f: | with open(yml_file) as f: | ||||
return yaml.safe_load(f) | return yaml.safe_load(f) | ||||
ini_file = base_config_path + ".ini" | |||||
if exists_accessible(ini_file): | |||||
config = configparser.ConfigParser() | |||||
config.read(ini_file) | |||||
if "main" in config._sections: | |||||
logger.info("Loading config file %s", ini_file) | |||||
return config._sections["main"] | |||||
else: | |||||
logger.warning("Ignoring config file %s (no [main] section)", ini_file) | |||||
return {} | return {} | ||||
def config_exists(config_path): | def config_exists(config_path): | ||||
"""Check whether the given config exists""" | """Check whether the given config exists""" | ||||
basepath = config_basepath(config_path) | basepath = config_basepath(config_path) | ||||
return any( | return any( | ||||
exists_accessible(basepath + extension) for extension in SWH_CONFIG_EXTENSIONS | exists_accessible(basepath + extension) for extension in SWH_CONFIG_EXTENSIONS | ||||
) | ) | ||||
def read(conf_file=None, default_conf=None): | def read( | ||||
conf_file: Optional[str] = None, | |||||
default_conf: Optional[Dict[str, Tuple[str, Any]]] = None, | |||||
) -> Dict[str, Any]: | |||||
"""Read the user's configuration file. | """Read the user's configuration file. | ||||
Fill in the gap using `default_conf`. `default_conf` is similar to this:: | Fill in the gap using `default_conf`. `default_conf` is similar to this:: | ||||
DEFAULT_CONF = { | DEFAULT_CONF = { | ||||
'a': ('str', '/tmp/swh-loader-git/log'), | 'a': ('str', '/tmp/swh-loader-git/log'), | ||||
'b': ('str', 'dbname=swhloadergit') | 'b': ('str', 'dbname=swhloadergit') | ||||
'c': ('bool', true) | 'c': ('bool', true) | ||||
'e': ('bool', None) | 'e': ('bool', None) | ||||
'd': ('int', 10) | 'd': ('int', 10) | ||||
} | } | ||||
If conf_file is None, return the default config. | If conf_file is None, return the default config. | ||||
""" | """ | ||||
conf = {} | conf: Dict[str, Any] = {} | ||||
if conf_file: | if conf_file: | ||||
base_config_path = config_basepath(os.path.expanduser(conf_file)) | base_config_path = config_basepath(os.path.expanduser(conf_file)) | ||||
conf = read_raw_config(base_config_path) | conf = read_raw_config(base_config_path) or {} | ||||
if not default_conf: | if not default_conf: | ||||
default_conf = {} | return conf | ||||
Done Inline ActionsShould be return conf. ardumont: Should be `return conf`.
Fixing it and adding a test for it. | |||||
# remaining missing default configuration key are set | # remaining missing default configuration key are set | ||||
# also type conversion is enforced for underneath layer | # also type conversion is enforced for underneath layer | ||||
for key in default_conf: | for key, (nature_type, default_value) in default_conf.items(): | ||||
nature_type, default_value = default_conf[key] | |||||
val = conf.get(key, None) | val = conf.get(key, None) | ||||
if val is None: # fallback to default value | if val is None: # fallback to default value | ||||
conf[key] = default_value | conf[key] = default_value | ||||
elif not _map_check_fn.get(nature_type, lambda x: True)(val): | elif not _map_check_fn.get(nature_type, lambda x: True)(val): | ||||
# value present but not in the proper format, force type conversion | # value present but not in the proper format, force type conversion | ||||
conf[key] = _map_convert_fn.get(nature_type, lambda x: x)(val) | conf[key] = _map_convert_fn.get(nature_type, lambda x: x)(val) | ||||
return conf | return conf | ||||
▲ Show 20 Lines • Show All 211 Lines • Show Last 20 Lines |
This needs to get dropped as well indeed!
[1] D3963#97708