Changeset View
Changeset View
Standalone View
Standalone View
swh/core/config.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import logging | import logging | ||||
import os | import os | ||||
import yaml | import yaml | ||||
from itertools import chain | from itertools import chain | ||||
from copy import deepcopy | from copy import deepcopy | ||||
from typing import Any, Callable, Dict, Optional, Tuple | from typing import Any, Callable, Dict, List, Optional, Tuple | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
SWH_CONFIG_DIRECTORIES = [ | SWH_CONFIG_DIRECTORIES = [ | ||||
"~/.config/swh", | "~/.config/swh", | ||||
"~/.swh", | "~/.swh", | ||||
Show All 21 Lines | |||||
_map_check_fn: Dict[str, Callable] = { | _map_check_fn: Dict[str, Callable] = { | ||||
"int": lambda x: isinstance(x, int), | "int": lambda x: isinstance(x, int), | ||||
"bool": lambda x: isinstance(x, bool), | "bool": lambda x: isinstance(x, bool), | ||||
"list[str]": lambda x: (isinstance(x, list) and all(isinstance(y, str) for y in x)), | "list[str]": lambda x: (isinstance(x, list) and all(isinstance(y, str) for y in x)), | ||||
"list[int]": lambda x: (isinstance(x, list) and all(isinstance(y, int) for y in x)), | "list[int]": lambda x: (isinstance(x, list) and all(isinstance(y, int) for y in x)), | ||||
} | } | ||||
def exists_accessible(file): | def exists_accessible(filepath: str) -> bool: | ||||
"""Check whether a file exists, and is accessible. | """Check whether a file exists, and is accessible. | ||||
Returns: | Returns: | ||||
True if the file exists and is accessible | True if the file exists and is accessible | ||||
False if the file does not exist | False if the file does not exist | ||||
Raises: | Raises: | ||||
PermissionError if the file cannot be read. | PermissionError if the file cannot be read. | ||||
""" | """ | ||||
try: | try: | ||||
os.stat(file) | os.stat(filepath) | ||||
except PermissionError: | except PermissionError: | ||||
raise | raise | ||||
except FileNotFoundError: | except FileNotFoundError: | ||||
return False | return False | ||||
else: | else: | ||||
if os.access(file, os.R_OK): | if os.access(filepath, os.R_OK): | ||||
return True | return True | ||||
else: | else: | ||||
raise PermissionError("Permission denied: %r" % file) | raise PermissionError("Permission denied: {filepath!r}") | ||||
def config_basepath(config_path: str) -> str: | def config_basepath(config_path: str) -> str: | ||||
"""Return the base path of a configuration file""" | """Return the base path of a configuration file""" | ||||
if config_path.endswith(".yml"): | if config_path.endswith(".yml"): | ||||
return config_path[:-4] | return config_path[:-4] | ||||
return config_path | return config_path | ||||
▲ Show 20 Lines • Show All 57 Lines • ▼ Show 20 Lines | for key, (nature_type, default_value) in default_conf.items(): | ||||
conf[key] = default_value | conf[key] = default_value | ||||
elif not _map_check_fn.get(nature_type, lambda x: True)(val): | elif not _map_check_fn.get(nature_type, lambda x: True)(val): | ||||
# value present but not in the proper format, force type conversion | # value present but not in the proper format, force type conversion | ||||
conf[key] = _map_convert_fn.get(nature_type, lambda x: x)(val) | conf[key] = _map_convert_fn.get(nature_type, lambda x: x)(val) | ||||
return conf | return conf | ||||
def priority_read(conf_filenames, default_conf=None): | def priority_read( | ||||
conf_filenames: List[str], default_conf: Optional[Dict[str, Tuple[str, Any]]] = None | |||||
): | |||||
"""Try reading the configuration files from conf_filenames, in order, | """Try reading the configuration files from conf_filenames, in order, | ||||
and return the configuration from the first one that exists. | and return the configuration from the first one that exists. | ||||
default_conf has the same specification as it does in read. | default_conf has the same specification as it does in read. | ||||
""" | """ | ||||
# Try all the files in order | # Try all the files in order | ||||
for filename in conf_filenames: | for filename in conf_filenames: | ||||
Show All 10 Lines | def merge_default_configs(base_config, *other_configs): | ||||
full_config = base_config.copy() | full_config = base_config.copy() | ||||
for config in other_configs: | for config in other_configs: | ||||
full_config.update(config) | full_config.update(config) | ||||
return full_config | return full_config | ||||
def merge_configs(base, other): | def merge_configs(base: Optional[Dict[str, Any]], other: Optional[Dict[str, Any]]): | ||||
"""Merge two config dictionaries | """Merge two config dictionaries | ||||
This does merge config dicts recursively, with the rules, for every value | This does merge config dicts recursively, with the rules, for every value | ||||
of the dicts (with 'val' not being a dict): | of the dicts (with 'val' not being a dict): | ||||
- None + type -> type | - None + type -> type | ||||
- type + None -> None | - type + None -> None | ||||
- dict + dict -> dict (merged) | - dict + dict -> dict (merged) | ||||
▲ Show 20 Lines • Show All 54 Lines • ▼ Show 20 Lines | for k in allkeys: | ||||
elif k in other: | elif k in other: | ||||
output[k] = deepcopy(vo) | output[k] = deepcopy(vo) | ||||
else: | else: | ||||
output[k] = deepcopy(vb) | output[k] = deepcopy(vb) | ||||
return output | return output | ||||
def swh_config_paths(base_filename): | def swh_config_paths(base_filename: str) -> List[str]: | ||||
"""Return the Software Heritage specific configuration paths for the given | """Return the Software Heritage specific configuration paths for the given | ||||
filename.""" | filename.""" | ||||
return [os.path.join(dirname, base_filename) for dirname in SWH_CONFIG_DIRECTORIES] | return [os.path.join(dirname, base_filename) for dirname in SWH_CONFIG_DIRECTORIES] | ||||
def prepare_folders(conf, *keys): | def prepare_folders(conf, *keys): | ||||
"""Prepare the folder mentioned in config under keys. | """Prepare the folder mentioned in config under keys. | ||||
▲ Show 20 Lines • Show All 102 Lines • Show Last 20 Lines |