# Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from copy import deepcopy from collections import UserDict from dataclasses import dataclass, field from itertools import chain import io import logging import os from pathlib import Path import re from typing import (Any, Callable, Collection, Dict, IO, List, Mapping, NewType, Optional, Pattern as Regexp, Sequence, Set, Tuple, Union) import yaml logger = logging.getLogger(__name__) SWH_CONFIG_DIRECTORY = Path("~/.config/swh") SWH_GLOBAL_CONFIG = "global.yml" SWH_CONFIG_PATH_ENVVAR = "SWH_CONFIG_FILENAME" PathLike = Union[str, bytes, os.PathLike] class Envvar(str): __slots__ = () class ConfigName(str): __slots__ = () ### Loading API ### def loadable(filepath: os.PathLike) -> bool: """Check whether a file exists, and is readable. Returns: True if the file exists and is accessible False if the file does not exist Raises: PermissionError if the file cannot be read. """ try: os.stat(filepath) except PermissionError: raise except FileNotFoundError: return False else: if os.access(filepath, os.R_OK): return True else: raise PermissionError(f"Permission denied: {filepath!r}") # Use single dispatch for load()? # IO[] OR io.IOBase def load_from_file(file: IO[str], defaults: Optional[Config] = None) -> Config: yamldata = yaml.safe_load(file) config = Config(yamldata) return yamldata def load_from_path(path: PathLike, defaults: Optional[Config] = None) -> Config: logger.debug("Loading config file %s", path) with open(path) as file: return load_from_file(file) def load_from_name(configname: str, defaults: Optional[Config] = None) -> Config: path = name_to_path(configname) return load_from_path(path) def name_to_path(configname: str) -> Path: return SWH_CONFIG_DIRECTORY / (configname + ".yml") def load_from_envvar(defaults: Optional[Config] = None) -> Config: """Load configuration yaml file from the environment variable SWH_CONFIG_FILENAME, eventually enriched with default configuration key/value from the defaults dict if provided. Returns: Configuration dict Raises: AssertionError if SWH_CONFIG_FILENAME is undefined """ assert ( "SWH_CONFIG_FILENAME" in os.environ ), "SWH_CONFIG_FILENAME environment variable is undefined." # do at import time? cfg_path = os.environ["SWH_CONFIG_FILENAME"] cfg = load_from_path(cfg_path) cfg = merge_dicts(defaults or Config(), cfg) return cfg def merge_dicts(base: Optional[Dict[str, Any]], other: Optional[Dict[str, Any]]): """Merge two config dictionaries Merge dicts recursively, using the following rules: - None + type -> type - type + None -> None - dict + dict -> dict (merged) - val + dict -> TypeError - dict + val -> TypeError - val + val -> val (other) for instance: >>> d1 = { ... 'key1': { ... 'skey1': 'value1', ... 'skey2': {'sskey1': 'value2'}, ... }, ... 'key2': 'value3', ... } with >>> d2 = { ... 'key1': { ... 'skey1': 'value4', ... 'skey2': {'sskey2': 'value5'}, ... }, ... 'key3': 'value6', ... } will give: >>> d3 = { ... 'key1': { ... 'skey1': 'value4', # <-- note this ... 'skey2': { ... 'sskey1': 'value2', ... 'sskey2': 'value5', ... }, ... }, ... 'key2': 'value3', ... 'key3': 'value6', ... } >>> assert merge_dicts(d1, d2) == d3 Note that no type checking is done for anything but dicts. """ if not isinstance(base, dict) or not isinstance(other, dict): raise TypeError("Cannot merge a %s with a %s" % (type(base), type(other))) output = {} allkeys = set(chain(base.keys(), other.keys())) for k in allkeys: vb = base.get(k) vo = other.get(k) if isinstance(vo, dict): output[k] = merge_dicts(vb is not None and vb or {}, vo) elif isinstance(vb, dict) and k in other and other[k] is not None: output[k] = merge_dicts(vb, vo is not None and vo or {}) elif k in other: output[k] = deepcopy(vo) else: output[k] = deepcopy(vb) return output ### Language ### # CID = NewType("CID", str) # QID is (TID, IID) # TID = NewType("TID", CID) # IID = NewType("IID", CID) # AID = NewType("AID", CID) # AnyCID = Union[TID, IID, AID] # QID = Sequence[CID] # Qualified configuration identifier # Config = Mapping[CID, Any] # ADT please? # AttrKey = CID # AttrValue = Any # Attribute = (AttrKey, AttrValue) Component = type ComponentConstructor = Callable[[type], Component] # OR Callable[[], Component] ? # Identifiers class CID: __slots__ = ("str",) REGEXP = re.compile("[A-Za-z0-9_\-]+") def __init__(self, token: str): assert self.parses(token), f"CID must match {CID.REGEXP!r}" self.str = token @staticmethod def parses(token: str) -> bool: return isinstance(token, str) and bool(CID.REGEXP.fullmatch(token)) class TID(CID): pass class IID(CID): pass """ QID must have following features: - type-strict version of the QID string spec, here `QID = ("." TID)? IID` - have access to both string form and individual fields - constructor taking both string form or individual fields (string or CID) - enforce semantics of its components: TID, IID """ class QID: # (CID) # FIXME: settle str/cid/kwargs/resolved forms... __slots__ = ("str", "fields") SEP = "." REGEXP: Regexp = re.compile(f"{CID.REGEXP}(?:\.{CID.REGEXP})*") def __init__(config: StandaloneConfig, *tokens: Union[str, CID]): print(f"QID({tokens!r})") all_str = all(isinstance(tokens, str) for t in tokens) all_cid = all(isinstance(tokens, CID) for t in tokens) assert all_str or all_cid, "QID tokens must all either be str or CID" qualified_str, *multiple_fields = tokens # Do we have 1 or more elements? if all_str and not multiple_fields: assert QID.parses(qualified_str), f"QID in string form must match {QID.REGEXP.pattern!r}" self.str = qualified_str self.fields = tuple(map(CID, qualified_str.split(QID_SEP)) else: if all_str: self.str = QID.SEP.join(tokens) self.fields = tuple(map(CID, tokens)) elif all_cid: self.str = QID.SEP.join(t.str for t in tokens) self.fields = tokens assert all(hasattr(self, attr) for attr in self.__slots__), "QID parsing failed" def get_TID(self): # FIXME: no TID in QID return self.fields[0] if isinstance(self.fields[0], TID) else None @staticmethod def parses(token: str) -> bool: return re.fullmatch(QID.REGEXP, token) def TID_exists(token: str, register = _ComponentRegister) -> bool: # Keep? return token in register.keys() def IID_exists(config: StandaloneConfig, iid: IID) -> bool: return iid in self._qids def QID_exists(config: StandaloneConfig, qid: QID) -> bool: try: self.get_obj(qid) return True except ValueError: return False def QID_exists(config: StandaloneConfig, qid: QID) -> bool: # FIXME tokens or cids? tokens = qid.str types = [] conf_it = config for tok in tokens: if tok not in conf_it: raise ValueError() conf_it = conf_it[tok] if TID.exists(tok) and len(types) == 0: typ = TID elif config.IID_exists(config, tok) and types[-1:] == TID: typ = IID else: raise ValueError() types.append(typ) # References class Reference: REGEXP: Regexp = re.compile(f"\s+<({QID.REGEXP.pattern})>\s+") def __init__(self, target: QID, source: AttrKey): if not QID.parses(qid): raise ValueError(f"invalid QID format: {target!r}") self.target = target self.source = source @classmethod def get_target(cls, token: AttrValue) -> Optional[str]: m = Reference.REGEXP.fullmatch(token) return m.group(1) if m else None @classmethod def parses(cls, token: str) -> bool: return bool(Reference.REGEXP.fullmatch(token)) # Config objects class Config(UserDict): # XXX: should Config be OR own a dict? def __new__(cls, *args) -> StandaloneConfig: return StandaloneConfig.__new__(*args) def get_obj(self, qid: QID) -> PartialConfig: if not QID.parses(qid): # Works also for AID raise ValueError(f"invalid QID format: {qid!r}") conf_it = self.data try: for cid in qid.fields: if isinstance(conf_it, list): cid = int(cid) # Throws ValueError conf_it[cid] # Works for dict and list except KeyError: raise ValueError(f"QID unknown for config: {qid!r}") return conf_it @dataclass class StandaloneConfig(Config): # Cache qids and refs _qids: Set[QID] = field(init=False) _refs: Dict[AID, QID] = field(init=False) def __post_init__(self): self.parse() def parse(self, config: Dict): # assert isinstance(config, Dict), "Invalid configuration definition: not a mapping" paths = set() # parse 2 levels, validate TIDs, store QIDs for cid, obj in self.data.items(): if TID.exists(cid): tid = cid if not isinstance(obj, Dict): raise ValueError("mapping not found under TID:", type(obj), obj) for cid, obj in self[tid].items(): self._parse_instance_item(cid, obj) paths.add(QID(TID(tid), IID(cid))) else: self._parse_instance_item(cid, obj) paths.add(QID(IID(cid))) self._qids = paths def _parse_instance_item(self, cid: str, obj: Collection) -> Collection: if not CID.parses(cid): raise ValueError(f"invalid CID format: {cid!r}") if not isinstance(obj, Dict): raise ValueError(f"mapping not found under IID: {type(obj)}, {obj}") # inst = InstanceConfig(obj, self, qid) # refs = inst.find_references() # parses references # self._refs.update(refs) def get_instance(self, qid: QID) -> InstanceConfig: return InstanceConfig(self.get_obj(qid), self, qid) def resolve_reference(self, qid: QID) -> InstanceConfig: try: return self._refs[QID] except KeyError: raise ValueError(f"reference not found at {QID!r}") @dataclass class PartialConfig(Config): root: StandaloneConfig qid: QID class InstanceConfig(PartialConfig): @classmethod def prepare(self) -> None: # WIP inst.root.find_references(self) def find_subreferences(self) -> Dict[AID, QID]: # FIXME: identity disorder refs = {} for key, val in self.items(): if Reference.parses(val): aid = QID(*self.qid.fields, key) # AID(key) refs[aid] = Reference.get_target(val) return refs def find_subinstances(self) -> Dict[AID, InstanceConfig]: # WIP instances = {} refs = self.find_subreferences() for source, target in refs: instconfig = self.root.get_instance(target) instances[source] = instconfig for key, val in self.items(): if isinstance(val, Dict): aid = QID(*self.qid.fields, key) instances[aid] = InstanceConfig(val, self.root, aid) # elif isinstance(val, List): # TODO: recursion # subinstances = self._list_find_subinstances(val) # for key, val in enumerate(subinstances): # aid = QID(*self.qid.fields, key) # instances[aid] = InstanceConfig(val, self.root, aid) return instances ### Instantiation API ### _ComponentRegister: Final[Dict[TID, Component]] = {} def register_component(tid: TID, comp: Component) -> None: _ComponentRegister[tid] = comp def resolve_component(tid: TID) -> ComponentConstructor: return _ComponentRegister.get(tid) def instantiate_component(instconfig: InstanceConfig, ctor: ComponentConstructor) -> Component: inst = ctor(instdef) # TODO: error handling return inst def create_component(config: StandaloneConfig, qid: QID) -> Component: instconfig, ctor = _prepare_component(config, qid) # instance composition, subinst but not anoninst subrefs = instconfig.find_subreferences() for source_qid, target_qid in subrefs.items(): subinstconfig, subctor = _prepare_component(config, target_qid) subinst = instantiate_component(subinstconfig, subctor) aid = source_qid.fields[-1].str instconfig[aid] = subinst inst = instantiate_component(instconfig, ctor) return inst def _prepare_component(config: StandaloneConfig, qid: QID) -> Component: instconfig = config.get_instance(qid) tid = qid.get_TID() if tid is None: raise ValueError("Component QID must contain TID") ctor = resolve_component(tid) if ctor is None: raise ValueError(f"TID unknown in config: {tid}") return instconfig, ctor if __name__ == "__main__": from swh.loader.git.loader import GitLoader from swh.storage import get_storage _ComponentRegister = { "loader-git": GitLoader, "storage": get_storage, } conf = load_from_path(Path("~/Dev/tests/data/conf.yml").expanduser()) storage = create_component(conf, QID("loader-git.default"))