Page MenuHomeSoftware Heritage
Paste P878

Config library WIP (does not run for now)
ActivePublic

Authored by tenma on Nov 24 2020, 2:50 PM.
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
from copy import deepcopy
from collections import UserDict
from dataclasses import dataclass, field
from itertools import chain
import io
import logging
import os
from pathlib import Path
import re
from typing import (Any, Callable, Collection, Dict, IO, List, Mapping, NewType,
Optional, Pattern as Regexp, Sequence, Set, Tuple, Union)
import yaml
logger = logging.getLogger(__name__)
SWH_CONFIG_DIRECTORY = Path("~/.config/swh")
SWH_GLOBAL_CONFIG = "global.yml"
SWH_CONFIG_PATH_ENVVAR = "SWH_CONFIG_FILENAME"
PathLike = Union[str, bytes, os.PathLike]
class Envvar(str):
__slots__ = ()
class ConfigName(str):
__slots__ = ()
### Loading API ###
def loadable(filepath: os.PathLike) -> bool:
"""Check whether a file exists, and is readable.
Returns:
True if the file exists and is accessible
False if the file does not exist
Raises:
PermissionError if the file cannot be read.
"""
try:
os.stat(filepath)
except PermissionError:
raise
except FileNotFoundError:
return False
else:
if os.access(filepath, os.R_OK):
return True
else:
raise PermissionError(f"Permission denied: {filepath!r}")
# Use single dispatch for load()?
# IO[] OR io.IOBase
def load_from_file(file: IO[str], defaults: Optional[Config] = None) -> Config:
yamldata = yaml.safe_load(file)
config = Config(yamldata)
return yamldata
def load_from_path(path: PathLike, defaults: Optional[Config] = None) -> Config:
logger.debug("Loading config file %s", path)
with open(path) as file:
return load_from_file(file)
def load_from_name(configname: str, defaults: Optional[Config] = None) -> Config:
path = name_to_path(configname)
return load_from_path(path)
def name_to_path(configname: str) -> Path:
return SWH_CONFIG_DIRECTORY / (configname + ".yml")
def load_from_envvar(defaults: Optional[Config] = None) -> Config:
"""Load configuration yaml file from the environment variable SWH_CONFIG_FILENAME,
eventually enriched with default configuration key/value from the defaults
dict if provided.
Returns:
Configuration dict
Raises:
AssertionError if SWH_CONFIG_FILENAME is undefined
"""
assert (
"SWH_CONFIG_FILENAME" in os.environ
), "SWH_CONFIG_FILENAME environment variable is undefined." # do at import time?
cfg_path = os.environ["SWH_CONFIG_FILENAME"]
cfg = load_from_path(cfg_path)
cfg = merge_dicts(defaults or Config(), cfg)
return cfg
def merge_dicts(base: Optional[Dict[str, Any]], other: Optional[Dict[str, Any]]):
"""Merge two config dictionaries
Merge dicts recursively, using the following rules:
- None + type -> type
- type + None -> None
- dict + dict -> dict (merged)
- val + dict -> TypeError
- dict + val -> TypeError
- val + val -> val (other)
for instance:
>>> d1 = {
... 'key1': {
... 'skey1': 'value1',
... 'skey2': {'sskey1': 'value2'},
... },
... 'key2': 'value3',
... }
with
>>> d2 = {
... 'key1': {
... 'skey1': 'value4',
... 'skey2': {'sskey2': 'value5'},
... },
... 'key3': 'value6',
... }
will give:
>>> d3 = {
... 'key1': {
... 'skey1': 'value4', # <-- note this
... 'skey2': {
... 'sskey1': 'value2',
... 'sskey2': 'value5',
... },
... },
... 'key2': 'value3',
... 'key3': 'value6',
... }
>>> assert merge_dicts(d1, d2) == d3
Note that no type checking is done for anything but dicts.
"""
if not isinstance(base, dict) or not isinstance(other, dict):
raise TypeError("Cannot merge a %s with a %s" % (type(base), type(other)))
output = {}
allkeys = set(chain(base.keys(), other.keys()))
for k in allkeys:
vb = base.get(k)
vo = other.get(k)
if isinstance(vo, dict):
output[k] = merge_dicts(vb is not None and vb or {}, vo)
elif isinstance(vb, dict) and k in other and other[k] is not None:
output[k] = merge_dicts(vb, vo is not None and vo or {})
elif k in other:
output[k] = deepcopy(vo)
else:
output[k] = deepcopy(vb)
return output
### Language ###
# CID = NewType("CID", str) # QID is (TID, IID)
# TID = NewType("TID", CID)
# IID = NewType("IID", CID)
# AID = NewType("AID", CID)
# AnyCID = Union[TID, IID, AID]
# QID = Sequence[CID] # Qualified configuration identifier
# Config = Mapping[CID, Any] # ADT please?
# AttrKey = CID
# AttrValue = Any
# Attribute = (AttrKey, AttrValue)
Component = type
ComponentConstructor = Callable[[type], Component] # OR Callable[[], Component] ?
# Identifiers
class CID:
__slots__ = ("str",)
REGEXP = re.compile("[A-Za-z0-9_\-]+")
def __init__(self, token: str):
assert self.parses(token), f"CID must match {CID.REGEXP!r}"
self.str = token
@staticmethod
def parses(token: str) -> bool:
return isinstance(token, str) and bool(CID.REGEXP.fullmatch(token))
class TID(CID): pass
class IID(CID): pass
"""
QID must have following features:
- type-strict version of the QID string spec, here `QID = ("." TID)? IID`
- have access to both string form and individual fields
- constructor taking both string form or individual fields (string or CID)
- enforce semantics of its components: TID, IID
"""
class QID: # (CID) # FIXME: settle str/cid/kwargs/resolved forms...
__slots__ = ("str", "fields")
SEP = "."
REGEXP: Regexp = re.compile(f"{CID.REGEXP}(?:\.{CID.REGEXP})*")
def __init__(config: StandaloneConfig, *tokens: Union[str, CID]):
print(f"QID({tokens!r})")
all_str = all(isinstance(tokens, str) for t in tokens)
all_cid = all(isinstance(tokens, CID) for t in tokens)
assert all_str or all_cid, "QID tokens must all either be str or CID"
qualified_str, *multiple_fields = tokens # Do we have 1 or more elements?
if all_str and not multiple_fields:
assert QID.parses(qualified_str), f"QID in string form must match {QID.REGEXP.pattern!r}"
self.str = qualified_str
self.fields = tuple(map(CID, qualified_str.split(QID_SEP))
else:
if all_str:
self.str = QID.SEP.join(tokens)
self.fields = tuple(map(CID, tokens))
elif all_cid:
self.str = QID.SEP.join(t.str for t in tokens)
self.fields = tokens
assert all(hasattr(self, attr) for attr in self.__slots__), "QID parsing failed"
def get_TID(self): # FIXME: no TID in QID
return self.fields[0] if isinstance(self.fields[0], TID) else None
@staticmethod
def parses(token: str) -> bool:
return re.fullmatch(QID.REGEXP, token)
def TID_exists(token: str, register = _ComponentRegister) -> bool: # Keep?
return token in register.keys()
def IID_exists(config: StandaloneConfig, iid: IID) -> bool:
return iid in self._qids
def QID_exists(config: StandaloneConfig, qid: QID) -> bool:
try:
self.get_obj(qid)
return True
except ValueError:
return False
def QID_exists(config: StandaloneConfig, qid: QID) -> bool: # FIXME tokens or cids?
tokens = qid.str
types = []
conf_it = config
for tok in tokens:
if tok not in conf_it:
raise ValueError()
conf_it = conf_it[tok]
if TID.exists(tok) and len(types) == 0:
typ = TID
elif config.IID_exists(config, tok) and types[-1:] == TID:
typ = IID
else:
raise ValueError()
types.append(typ)
# References
class Reference:
REGEXP: Regexp = re.compile(f"\s+<({QID.REGEXP.pattern})>\s+")
def __init__(self, target: QID, source: AttrKey):
if not QID.parses(qid):
raise ValueError(f"invalid QID format: {target!r}")
self.target = target
self.source = source
@classmethod
def get_target(cls, token: AttrValue) -> Optional[str]:
m = Reference.REGEXP.fullmatch(token)
return m.group(1) if m else None
@classmethod
def parses(cls, token: str) -> bool:
return bool(Reference.REGEXP.fullmatch(token))
# Config objects
class Config(UserDict): # XXX: should Config be OR own a dict?
def __new__(cls, *args) -> StandaloneConfig:
return StandaloneConfig.__new__(*args)
def get_obj(self, qid: QID) -> PartialConfig:
if not QID.parses(qid): # Works also for AID
raise ValueError(f"invalid QID format: {qid!r}")
conf_it = self.data
try:
for cid in qid.fields:
if isinstance(conf_it, list):
cid = int(cid) # Throws ValueError
conf_it[cid] # Works for dict and list
except KeyError:
raise ValueError(f"QID unknown for config: {qid!r}")
return conf_it
@dataclass
class StandaloneConfig(Config):
# Cache qids and refs
_qids: Set[QID] = field(init=False)
_refs: Dict[AID, QID] = field(init=False)
def __post_init__(self):
self.parse()
def parse(self, config: Dict):
# assert isinstance(config, Dict), "Invalid configuration definition: not a mapping"
paths = set()
# parse 2 levels, validate TIDs, store QIDs
for cid, obj in self.data.items():
if TID.exists(cid):
tid = cid
if not isinstance(obj, Dict):
raise ValueError("mapping not found under TID:", type(obj), obj)
for cid, obj in self[tid].items():
self._parse_instance_item(cid, obj)
paths.add(QID(TID(tid), IID(cid)))
else:
self._parse_instance_item(cid, obj)
paths.add(QID(IID(cid)))
self._qids = paths
def _parse_instance_item(self, cid: str, obj: Collection) -> Collection:
if not CID.parses(cid):
raise ValueError(f"invalid CID format: {cid!r}")
if not isinstance(obj, Dict):
raise ValueError(f"mapping not found under IID: {type(obj)}, {obj}")
# inst = InstanceConfig(obj, self, qid)
# refs = inst.find_references() # parses references
# self._refs.update(refs)
def get_instance(self, qid: QID) -> InstanceConfig:
return InstanceConfig(self.get_obj(qid), self, qid)
def resolve_reference(self, qid: QID) -> InstanceConfig:
try:
return self._refs[QID]
except KeyError:
raise ValueError(f"reference not found at {QID!r}")
@dataclass
class PartialConfig(Config):
root: StandaloneConfig
qid: QID
class InstanceConfig(PartialConfig):
@classmethod
def prepare(self) -> None: # WIP
inst.root.find_references(self)
def find_subreferences(self) -> Dict[AID, QID]: # FIXME: identity disorder
refs = {}
for key, val in self.items():
if Reference.parses(val):
aid = QID(*self.qid.fields, key) # AID(key)
refs[aid] = Reference.get_target(val)
return refs
def find_subinstances(self) -> Dict[AID, InstanceConfig]: # WIP
instances = {}
refs = self.find_subreferences()
for source, target in refs:
instconfig = self.root.get_instance(target)
instances[source] = instconfig
for key, val in self.items():
if isinstance(val, Dict):
aid = QID(*self.qid.fields, key)
instances[aid] = InstanceConfig(val, self.root, aid)
# elif isinstance(val, List): # TODO: recursion
# subinstances = self._list_find_subinstances(val)
# for key, val in enumerate(subinstances):
# aid = QID(*self.qid.fields, key)
# instances[aid] = InstanceConfig(val, self.root, aid)
return instances
### Instantiation API ###
_ComponentRegister: Final[Dict[TID, Component]] = {}
def register_component(tid: TID, comp: Component) -> None:
_ComponentRegister[tid] = comp
def resolve_component(tid: TID) -> ComponentConstructor:
return _ComponentRegister.get(tid)
def instantiate_component(instconfig: InstanceConfig, ctor: ComponentConstructor) -> Component:
inst = ctor(instdef)
# TODO: error handling
return inst
def create_component(config: StandaloneConfig, qid: QID) -> Component:
instconfig, ctor = _prepare_component(config, qid)
# instance composition, subinst but not anoninst
subrefs = instconfig.find_subreferences()
for source_qid, target_qid in subrefs.items():
subinstconfig, subctor = _prepare_component(config, target_qid)
subinst = instantiate_component(subinstconfig, subctor)
aid = source_qid.fields[-1].str
instconfig[aid] = subinst
inst = instantiate_component(instconfig, ctor)
return inst
def _prepare_component(config: StandaloneConfig, qid: QID) -> Component:
instconfig = config.get_instance(qid)
tid = qid.get_TID()
if tid is None:
raise ValueError("Component QID must contain TID")
ctor = resolve_component(tid)
if ctor is None:
raise ValueError(f"TID unknown in config: {tid}")
return instconfig, ctor
if __name__ == "__main__":
from swh.loader.git.loader import GitLoader
from swh.storage import get_storage
_ComponentRegister = {
"loader-git": GitLoader,
"storage": get_storage,
}
conf = load_from_path(Path("~/Dev/tests/data/conf.yml").expanduser())
storage = create_component(conf, QID("loader-git.default"))

Event Timeline

tenma changed the title of this paste from Config library WIP to Config library WIP (does not run for now).Nov 24 2020, 2:54 PM