Changeset View
Changeset View
Standalone View
Standalone View
swh/core/config.py
Show All 10 Lines | |||||
from copy import deepcopy | from copy import deepcopy | ||||
from typing import Any, Dict, Optional, Tuple | from typing import Any, Dict, Optional, Tuple | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
SWH_CONFIG_DIRECTORIES = [ | SWH_CONFIG_DIRECTORIES = ['~/.config/swh', '~/.swh', '/etc/softwareheritage'] | ||||
'~/.config/swh', | |||||
'~/.swh', | |||||
'/etc/softwareheritage', | |||||
] | |||||
SWH_GLOBAL_CONFIG = 'global.ini' | SWH_GLOBAL_CONFIG = 'global.ini' | ||||
SWH_DEFAULT_GLOBAL_CONFIG = { | SWH_DEFAULT_GLOBAL_CONFIG = { | ||||
'max_content_size': ('int', 100 * 1024 * 1024), | 'max_content_size': ('int', 100 * 1024 * 1024), | ||||
'log_db': ('str', 'dbname=softwareheritage-log'), | 'log_db': ('str', 'dbname=softwareheritage-log'), | ||||
} | } | ||||
SWH_CONFIG_EXTENSIONS = [ | SWH_CONFIG_EXTENSIONS = ['.yml', '.ini'] | ||||
'.yml', | |||||
'.ini', | |||||
] | |||||
# conversion per type | # conversion per type | ||||
_map_convert_fn = { | _map_convert_fn = { | ||||
'int': int, | 'int': int, | ||||
'bool': lambda x: x.lower() == 'true', | 'bool': lambda x: x.lower() == 'true', | ||||
'list[str]': lambda x: [value.strip() for value in x.split(',')], | 'list[str]': lambda x: [value.strip() for value in x.split(',')], | ||||
'list[int]': lambda x: [int(value.strip()) for value in x.split(',')], | 'list[int]': lambda x: [int(value.strip()) for value in x.split(',')], | ||||
} | } | ||||
_map_check_fn = { | _map_check_fn = { | ||||
'int': lambda x: isinstance(x, int), | 'int': lambda x: isinstance(x, int), | ||||
'bool': lambda x: isinstance(x, bool), | 'bool': lambda x: isinstance(x, bool), | ||||
'list[str]': lambda x: (isinstance(x, list) and | 'list[str]': lambda x: (isinstance(x, list) and all(isinstance(y, str) for y in x)), | ||||
all(isinstance(y, str) for y in x)), | 'list[int]': lambda x: (isinstance(x, list) and all(isinstance(y, int) for y in x)), | ||||
'list[int]': lambda x: (isinstance(x, list) and | |||||
all(isinstance(y, int) for y in x)), | |||||
} | } | ||||
def exists_accessible(file): | def exists_accessible(file): | ||||
"""Check whether a file exists, and is accessible. | """Check whether a file exists, and is accessible. | ||||
Returns: | Returns: | ||||
True if the file exists and is accessible | True if the file exists and is accessible | ||||
Show All 38 Lines | def read_raw_config(base_config_path): | ||||
ini_file = base_config_path + '.ini' | ini_file = base_config_path + '.ini' | ||||
if exists_accessible(ini_file): | if exists_accessible(ini_file): | ||||
config = configparser.ConfigParser() | config = configparser.ConfigParser() | ||||
config.read(ini_file) | config.read(ini_file) | ||||
if 'main' in config._sections: | if 'main' in config._sections: | ||||
logger.info('Loading config file %s', ini_file) | logger.info('Loading config file %s', ini_file) | ||||
return config._sections['main'] | return config._sections['main'] | ||||
else: | else: | ||||
logger.warning('Ignoring config file %s (no [main] section)', | logger.warning('Ignoring config file %s (no [main] section)', ini_file) | ||||
ini_file) | |||||
return {} | return {} | ||||
def config_exists(config_path): | def config_exists(config_path): | ||||
"""Check whether the given config exists""" | """Check whether the given config exists""" | ||||
basepath = config_basepath(config_path) | basepath = config_basepath(config_path) | ||||
return any(exists_accessible(basepath + extension) | return any( | ||||
for extension in SWH_CONFIG_EXTENSIONS) | exists_accessible(basepath + extension) for extension in SWH_CONFIG_EXTENSIONS | ||||
) | |||||
def read(conf_file=None, default_conf=None): | def read(conf_file=None, default_conf=None): | ||||
"""Read the user's configuration file. | """Read the user's configuration file. | ||||
Fill in the gap using `default_conf`. `default_conf` is similar to this:: | Fill in the gap using `default_conf`. `default_conf` is similar to this:: | ||||
DEFAULT_CONF = { | DEFAULT_CONF = { | ||||
▲ Show 20 Lines • Show All 103 Lines • ▼ Show 20 Lines | def merge_configs(base, other): | ||||
... 'key2': 'value3', | ... 'key2': 'value3', | ||||
... 'key3': 'value6', | ... 'key3': 'value6', | ||||
... } | ... } | ||||
>>> assert merge_configs(d1, d2) == d3 | >>> assert merge_configs(d1, d2) == d3 | ||||
Note that no type checking is done for anything but dicts. | Note that no type checking is done for anything but dicts. | ||||
""" | """ | ||||
if not isinstance(base, dict) or not isinstance(other, dict): | if not isinstance(base, dict) or not isinstance(other, dict): | ||||
raise TypeError( | raise TypeError('Cannot merge a %s with a %s' % (type(base), type(other))) | ||||
'Cannot merge a %s with a %s' % (type(base), type(other))) | |||||
output = {} | output = {} | ||||
allkeys = set(chain(base.keys(), other.keys())) | allkeys = set(chain(base.keys(), other.keys())) | ||||
for k in allkeys: | for k in allkeys: | ||||
vb = base.get(k) | vb = base.get(k) | ||||
vo = other.get(k) | vo = other.get(k) | ||||
if isinstance(vo, dict): | if isinstance(vo, dict): | ||||
output[k] = merge_configs(vb is not None and vb or {}, vo) | output[k] = merge_configs(vb is not None and vb or {}, vo) | ||||
elif isinstance(vb, dict) and k in other and other[k] is not None: | elif isinstance(vb, dict) and k in other and other[k] is not None: | ||||
output[k] = merge_configs(vb, vo is not None and vo or {}) | output[k] = merge_configs(vb, vo is not None and vo or {}) | ||||
elif k in other: | elif k in other: | ||||
output[k] = deepcopy(vo) | output[k] = deepcopy(vo) | ||||
else: | else: | ||||
output[k] = deepcopy(vb) | output[k] = deepcopy(vb) | ||||
return output | return output | ||||
def swh_config_paths(base_filename): | def swh_config_paths(base_filename): | ||||
"""Return the Software Heritage specific configuration paths for the given | """Return the Software Heritage specific configuration paths for the given | ||||
filename.""" | filename.""" | ||||
return [os.path.join(dirname, base_filename) | return [os.path.join(dirname, base_filename) for dirname in SWH_CONFIG_DIRECTORIES] | ||||
for dirname in SWH_CONFIG_DIRECTORIES] | |||||
def prepare_folders(conf, *keys): | def prepare_folders(conf, *keys): | ||||
"""Prepare the folder mentioned in config under keys. | """Prepare the folder mentioned in config under keys. | ||||
""" | """ | ||||
def makedir(folder): | def makedir(folder): | ||||
if not os.path.exists(folder): | if not os.path.exists(folder): | ||||
os.makedirs(folder) | os.makedirs(folder) | ||||
for key in keys: | for key in keys: | ||||
makedir(conf[key]) | makedir(conf[key]) | ||||
def load_global_config(): | def load_global_config(): | ||||
"""Load the global Software Heritage config""" | """Load the global Software Heritage config""" | ||||
return priority_read( | return priority_read(swh_config_paths(SWH_GLOBAL_CONFIG), SWH_DEFAULT_GLOBAL_CONFIG) | ||||
swh_config_paths(SWH_GLOBAL_CONFIG), | |||||
SWH_DEFAULT_GLOBAL_CONFIG, | |||||
) | |||||
def load_named_config(name, default_conf=None, global_conf=True): | def load_named_config(name, default_conf=None, global_conf=True): | ||||
"""Load the config named `name` from the Software Heritage | """Load the config named `name` from the Software Heritage | ||||
configuration paths. | configuration paths. | ||||
If global_conf is True (default), read the global configuration | If global_conf is True (default), read the global configuration | ||||
too. | too. | ||||
Show All 21 Lines | class SWHConfig: | ||||
class attribute. | class attribute. | ||||
""" | """ | ||||
DEFAULT_CONFIG = {} # type: Dict[str, Tuple[str, Any]] | DEFAULT_CONFIG = {} # type: Dict[str, Tuple[str, Any]] | ||||
CONFIG_BASE_FILENAME = '' # type: Optional[str] | CONFIG_BASE_FILENAME = '' # type: Optional[str] | ||||
@classmethod | @classmethod | ||||
def parse_config_file(cls, base_filename=None, config_filename=None, | def parse_config_file( | ||||
additional_configs=None, global_config=True): | cls, | ||||
base_filename=None, | |||||
config_filename=None, | |||||
additional_configs=None, | |||||
global_config=True, | |||||
): | |||||
"""Parse the configuration file associated to the current class. | """Parse the configuration file associated to the current class. | ||||
By default, parse_config_file will load the configuration | By default, parse_config_file will load the configuration | ||||
cls.CONFIG_BASE_FILENAME from one of the Software Heritage | cls.CONFIG_BASE_FILENAME from one of the Software Heritage | ||||
configuration directories, in order, unless it is overridden | configuration directories, in order, unless it is overridden | ||||
by base_filename or config_filename (which shortcuts the file | by base_filename or config_filename (which shortcuts the file | ||||
lookup completely). | lookup completely). | ||||
Show All 15 Lines | ): | ||||
config_filenames = [os.environ['SWH_CONFIG_FILENAME']] | config_filenames = [os.environ['SWH_CONFIG_FILENAME']] | ||||
else: | else: | ||||
if not base_filename: | if not base_filename: | ||||
base_filename = cls.CONFIG_BASE_FILENAME | base_filename = cls.CONFIG_BASE_FILENAME | ||||
config_filenames = swh_config_paths(base_filename) | config_filenames = swh_config_paths(base_filename) | ||||
if not additional_configs: | if not additional_configs: | ||||
additional_configs = [] | additional_configs = [] | ||||
full_default_config = merge_default_configs(cls.DEFAULT_CONFIG, | full_default_config = merge_default_configs( | ||||
*additional_configs) | cls.DEFAULT_CONFIG, *additional_configs | ||||
) | |||||
config = {} | config = {} | ||||
if global_config: | if global_config: | ||||
config = load_global_config() | config = load_global_config() | ||||
config.update(priority_read(config_filenames, full_default_config)) | config.update(priority_read(config_filenames, full_default_config)) | ||||
return config | return config |