Changeset View
Changeset View
Standalone View
Standalone View
swh/core/config.py
Show All 10 Lines | |||||
from copy import deepcopy | from copy import deepcopy | ||||
from typing import Any, Dict, Optional, Tuple | from typing import Any, Dict, Optional, Tuple | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
SWH_CONFIG_DIRECTORIES = [ | SWH_CONFIG_DIRECTORIES = ["~/.config/swh", "~/.swh", "/etc/softwareheritage"] | ||||
'~/.config/swh', | |||||
'~/.swh', | |||||
'/etc/softwareheritage', | |||||
] | |||||
SWH_GLOBAL_CONFIG = 'global.ini' | SWH_GLOBAL_CONFIG = "global.ini" | ||||
SWH_DEFAULT_GLOBAL_CONFIG = { | SWH_DEFAULT_GLOBAL_CONFIG = { | ||||
'max_content_size': ('int', 100 * 1024 * 1024), | "max_content_size": ("int", 100 * 1024 * 1024), | ||||
'log_db': ('str', 'dbname=softwareheritage-log'), | "log_db": ("str", "dbname=softwareheritage-log"), | ||||
} | } | ||||
SWH_CONFIG_EXTENSIONS = [ | SWH_CONFIG_EXTENSIONS = [".yml", ".ini"] | ||||
'.yml', | |||||
'.ini', | |||||
] | |||||
# conversion per type | # conversion per type | ||||
_map_convert_fn = { | _map_convert_fn = { | ||||
'int': int, | "int": int, | ||||
'bool': lambda x: x.lower() == 'true', | "bool": lambda x: x.lower() == "true", | ||||
'list[str]': lambda x: [value.strip() for value in x.split(',')], | "list[str]": lambda x: [value.strip() for value in x.split(",")], | ||||
'list[int]': lambda x: [int(value.strip()) for value in x.split(',')], | "list[int]": lambda x: [int(value.strip()) for value in x.split(",")], | ||||
} | } | ||||
_map_check_fn = { | _map_check_fn = { | ||||
'int': lambda x: isinstance(x, int), | "int": lambda x: isinstance(x, int), | ||||
'bool': lambda x: isinstance(x, bool), | "bool": lambda x: isinstance(x, bool), | ||||
'list[str]': lambda x: (isinstance(x, list) and | "list[str]": lambda x: (isinstance(x, list) and all(isinstance(y, str) for y in x)), | ||||
all(isinstance(y, str) for y in x)), | "list[int]": lambda x: (isinstance(x, list) and all(isinstance(y, int) for y in x)), | ||||
'list[int]': lambda x: (isinstance(x, list) and | |||||
all(isinstance(y, int) for y in x)), | |||||
} | } | ||||
def exists_accessible(file): | def exists_accessible(file): | ||||
"""Check whether a file exists, and is accessible. | """Check whether a file exists, and is accessible. | ||||
Returns: | Returns: | ||||
True if the file exists and is accessible | True if the file exists and is accessible | ||||
Show All 13 Lines | else: | ||||
if os.access(file, os.R_OK): | if os.access(file, os.R_OK): | ||||
return True | return True | ||||
else: | else: | ||||
raise PermissionError("Permission denied: %r" % file) | raise PermissionError("Permission denied: %r" % file) | ||||
def config_basepath(config_path): | def config_basepath(config_path): | ||||
"""Return the base path of a configuration file""" | """Return the base path of a configuration file""" | ||||
if config_path.endswith(('.ini', '.yml')): | if config_path.endswith((".ini", ".yml")): | ||||
return config_path[:-4] | return config_path[:-4] | ||||
return config_path | return config_path | ||||
def read_raw_config(base_config_path): | def read_raw_config(base_config_path): | ||||
"""Read the raw config corresponding to base_config_path. | """Read the raw config corresponding to base_config_path. | ||||
Can read yml or ini files. | Can read yml or ini files. | ||||
""" | """ | ||||
yml_file = base_config_path + '.yml' | yml_file = base_config_path + ".yml" | ||||
if exists_accessible(yml_file): | if exists_accessible(yml_file): | ||||
logger.info('Loading config file %s', yml_file) | logger.info("Loading config file %s", yml_file) | ||||
with open(yml_file) as f: | with open(yml_file) as f: | ||||
return yaml.safe_load(f) | return yaml.safe_load(f) | ||||
ini_file = base_config_path + '.ini' | ini_file = base_config_path + ".ini" | ||||
if exists_accessible(ini_file): | if exists_accessible(ini_file): | ||||
config = configparser.ConfigParser() | config = configparser.ConfigParser() | ||||
config.read(ini_file) | config.read(ini_file) | ||||
if 'main' in config._sections: | if "main" in config._sections: | ||||
logger.info('Loading config file %s', ini_file) | logger.info("Loading config file %s", ini_file) | ||||
return config._sections['main'] | return config._sections["main"] | ||||
else: | else: | ||||
logger.warning('Ignoring config file %s (no [main] section)', | logger.warning("Ignoring config file %s (no [main] section)", ini_file) | ||||
ini_file) | |||||
return {} | return {} | ||||
def config_exists(config_path): | def config_exists(config_path): | ||||
"""Check whether the given config exists""" | """Check whether the given config exists""" | ||||
basepath = config_basepath(config_path) | basepath = config_basepath(config_path) | ||||
return any(exists_accessible(basepath + extension) | return any( | ||||
for extension in SWH_CONFIG_EXTENSIONS) | exists_accessible(basepath + extension) for extension in SWH_CONFIG_EXTENSIONS | ||||
) | |||||
def read(conf_file=None, default_conf=None): | def read(conf_file=None, default_conf=None): | ||||
"""Read the user's configuration file. | """Read the user's configuration file. | ||||
Fill in the gap using `default_conf`. `default_conf` is similar to this:: | Fill in the gap using `default_conf`. `default_conf` is similar to this:: | ||||
DEFAULT_CONF = { | DEFAULT_CONF = { | ||||
▲ Show 20 Lines • Show All 103 Lines • ▼ Show 20 Lines | def merge_configs(base, other): | ||||
... 'key2': 'value3', | ... 'key2': 'value3', | ||||
... 'key3': 'value6', | ... 'key3': 'value6', | ||||
... } | ... } | ||||
>>> assert merge_configs(d1, d2) == d3 | >>> assert merge_configs(d1, d2) == d3 | ||||
Note that no type checking is done for anything but dicts. | Note that no type checking is done for anything but dicts. | ||||
""" | """ | ||||
if not isinstance(base, dict) or not isinstance(other, dict): | if not isinstance(base, dict) or not isinstance(other, dict): | ||||
raise TypeError( | raise TypeError("Cannot merge a %s with a %s" % (type(base), type(other))) | ||||
'Cannot merge a %s with a %s' % (type(base), type(other))) | |||||
output = {} | output = {} | ||||
allkeys = set(chain(base.keys(), other.keys())) | allkeys = set(chain(base.keys(), other.keys())) | ||||
for k in allkeys: | for k in allkeys: | ||||
vb = base.get(k) | vb = base.get(k) | ||||
vo = other.get(k) | vo = other.get(k) | ||||
if isinstance(vo, dict): | if isinstance(vo, dict): | ||||
output[k] = merge_configs(vb is not None and vb or {}, vo) | output[k] = merge_configs(vb is not None and vb or {}, vo) | ||||
elif isinstance(vb, dict) and k in other and other[k] is not None: | elif isinstance(vb, dict) and k in other and other[k] is not None: | ||||
output[k] = merge_configs(vb, vo is not None and vo or {}) | output[k] = merge_configs(vb, vo is not None and vo or {}) | ||||
elif k in other: | elif k in other: | ||||
output[k] = deepcopy(vo) | output[k] = deepcopy(vo) | ||||
else: | else: | ||||
output[k] = deepcopy(vb) | output[k] = deepcopy(vb) | ||||
return output | return output | ||||
def swh_config_paths(base_filename): | def swh_config_paths(base_filename): | ||||
"""Return the Software Heritage specific configuration paths for the given | """Return the Software Heritage specific configuration paths for the given | ||||
filename.""" | filename.""" | ||||
return [os.path.join(dirname, base_filename) | return [os.path.join(dirname, base_filename) for dirname in SWH_CONFIG_DIRECTORIES] | ||||
for dirname in SWH_CONFIG_DIRECTORIES] | |||||
def prepare_folders(conf, *keys): | def prepare_folders(conf, *keys): | ||||
"""Prepare the folder mentioned in config under keys. | """Prepare the folder mentioned in config under keys. | ||||
""" | """ | ||||
def makedir(folder): | def makedir(folder): | ||||
if not os.path.exists(folder): | if not os.path.exists(folder): | ||||
os.makedirs(folder) | os.makedirs(folder) | ||||
for key in keys: | for key in keys: | ||||
makedir(conf[key]) | makedir(conf[key]) | ||||
def load_global_config(): | def load_global_config(): | ||||
"""Load the global Software Heritage config""" | """Load the global Software Heritage config""" | ||||
return priority_read( | return priority_read(swh_config_paths(SWH_GLOBAL_CONFIG), SWH_DEFAULT_GLOBAL_CONFIG) | ||||
swh_config_paths(SWH_GLOBAL_CONFIG), | |||||
SWH_DEFAULT_GLOBAL_CONFIG, | |||||
) | |||||
def load_named_config(name, default_conf=None, global_conf=True): | def load_named_config(name, default_conf=None, global_conf=True): | ||||
"""Load the config named `name` from the Software Heritage | """Load the config named `name` from the Software Heritage | ||||
configuration paths. | configuration paths. | ||||
If global_conf is True (default), read the global configuration | If global_conf is True (default), read the global configuration | ||||
too. | too. | ||||
Show All 18 Lines | class SWHConfig: | ||||
This class defines one classmethod, parse_config_file, which | This class defines one classmethod, parse_config_file, which | ||||
parses a configuration file using the default config as set in the | parses a configuration file using the default config as set in the | ||||
class attribute. | class attribute. | ||||
""" | """ | ||||
DEFAULT_CONFIG = {} # type: Dict[str, Tuple[str, Any]] | DEFAULT_CONFIG = {} # type: Dict[str, Tuple[str, Any]] | ||||
CONFIG_BASE_FILENAME = '' # type: Optional[str] | CONFIG_BASE_FILENAME = "" # type: Optional[str] | ||||
@classmethod | @classmethod | ||||
def parse_config_file(cls, base_filename=None, config_filename=None, | def parse_config_file( | ||||
additional_configs=None, global_config=True): | cls, | ||||
base_filename=None, | |||||
config_filename=None, | |||||
additional_configs=None, | |||||
global_config=True, | |||||
): | |||||
"""Parse the configuration file associated to the current class. | """Parse the configuration file associated to the current class. | ||||
By default, parse_config_file will load the configuration | By default, parse_config_file will load the configuration | ||||
cls.CONFIG_BASE_FILENAME from one of the Software Heritage | cls.CONFIG_BASE_FILENAME from one of the Software Heritage | ||||
configuration directories, in order, unless it is overridden | configuration directories, in order, unless it is overridden | ||||
by base_filename or config_filename (which shortcuts the file | by base_filename or config_filename (which shortcuts the file | ||||
lookup completely). | lookup completely). | ||||
Args: | Args: | ||||
- base_filename (str): overrides the default | - base_filename (str): overrides the default | ||||
cls.CONFIG_BASE_FILENAME | cls.CONFIG_BASE_FILENAME | ||||
- config_filename (str): sets the file to parse instead of | - config_filename (str): sets the file to parse instead of | ||||
the defaults set from cls.CONFIG_BASE_FILENAME | the defaults set from cls.CONFIG_BASE_FILENAME | ||||
- additional_configs: (list of default configuration dicts) | - additional_configs: (list of default configuration dicts) | ||||
allows to override or extend the configuration set in | allows to override or extend the configuration set in | ||||
cls.DEFAULT_CONFIG. | cls.DEFAULT_CONFIG. | ||||
- global_config (bool): Load the global configuration (default: | - global_config (bool): Load the global configuration (default: | ||||
True) | True) | ||||
""" | """ | ||||
if config_filename: | if config_filename: | ||||
config_filenames = [config_filename] | config_filenames = [config_filename] | ||||
elif 'SWH_CONFIG_FILENAME' in os.environ: | elif "SWH_CONFIG_FILENAME" in os.environ: | ||||
config_filenames = [os.environ['SWH_CONFIG_FILENAME']] | config_filenames = [os.environ["SWH_CONFIG_FILENAME"]] | ||||
else: | else: | ||||
if not base_filename: | if not base_filename: | ||||
base_filename = cls.CONFIG_BASE_FILENAME | base_filename = cls.CONFIG_BASE_FILENAME | ||||
config_filenames = swh_config_paths(base_filename) | config_filenames = swh_config_paths(base_filename) | ||||
if not additional_configs: | if not additional_configs: | ||||
additional_configs = [] | additional_configs = [] | ||||
full_default_config = merge_default_configs(cls.DEFAULT_CONFIG, | full_default_config = merge_default_configs( | ||||
*additional_configs) | cls.DEFAULT_CONFIG, *additional_configs | ||||
) | |||||
config = {} | config = {} | ||||
if global_config: | if global_config: | ||||
config = load_global_config() | config = load_global_config() | ||||
config.update(priority_read(config_filenames, full_default_config)) | config.update(priority_read(config_filenames, full_default_config)) | ||||
return config | return config |