Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Paste
P878
Config library WIP (does not run for now)
Active
Public
Actions
Authored by
tenma
on Nov 24 2020, 2:50 PM.
Edit Paste
Archive Paste
View Raw File
Subscribe
Mute Notifications
Award Token
Flag For Later
Tags
Core & foundations
Subscribers
None
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from
__future__
import
annotations
from
copy
import
deepcopy
from
collections
import
UserDict
from
dataclasses
import
dataclass
,
field
from
itertools
import
chain
import
io
import
logging
import
os
from
pathlib
import
Path
import
re
from
typing
import
(
Any
,
Callable
,
Collection
,
Dict
,
IO
,
List
,
Mapping
,
NewType
,
Optional
,
Pattern
as
Regexp
,
Sequence
,
Set
,
Tuple
,
Union
)
import
yaml
logger
=
logging
.
getLogger
(
__name__
)
SWH_CONFIG_DIRECTORY
=
Path
(
"~/.config/swh"
)
SWH_GLOBAL_CONFIG
=
"global.yml"
SWH_CONFIG_PATH_ENVVAR
=
"SWH_CONFIG_FILENAME"
PathLike
=
Union
[
str
,
bytes
,
os
.
PathLike
]
class
Envvar
(
str
):
__slots__
=
()
class
ConfigName
(
str
):
__slots__
=
()
### Loading API ###
def
loadable
(
filepath
:
os
.
PathLike
)
->
bool
:
"""Check whether a file exists, and is readable.
Returns:
True if the file exists and is accessible
False if the file does not exist
Raises:
PermissionError if the file cannot be read.
"""
try
:
os
.
stat
(
filepath
)
except
PermissionError
:
raise
except
FileNotFoundError
:
return
False
else
:
if
os
.
access
(
filepath
,
os
.
R_OK
):
return
True
else
:
raise
PermissionError
(
f
"Permission denied: {filepath!r}"
)
# Use single dispatch for load()?
# IO[] OR io.IOBase
def
load_from_file
(
file
:
IO
[
str
],
defaults
:
Optional
[
Config
]
=
None
)
->
Config
:
yamldata
=
yaml
.
safe_load
(
file
)
config
=
Config
(
yamldata
)
return
yamldata
def
load_from_path
(
path
:
PathLike
,
defaults
:
Optional
[
Config
]
=
None
)
->
Config
:
logger
.
debug
(
"Loading config file
%s
"
,
path
)
with
open
(
path
)
as
file
:
return
load_from_file
(
file
)
def
load_from_name
(
configname
:
str
,
defaults
:
Optional
[
Config
]
=
None
)
->
Config
:
path
=
name_to_path
(
configname
)
return
load_from_path
(
path
)
def
name_to_path
(
configname
:
str
)
->
Path
:
return
SWH_CONFIG_DIRECTORY
/
(
configname
+
".yml"
)
def
load_from_envvar
(
defaults
:
Optional
[
Config
]
=
None
)
->
Config
:
"""Load configuration yaml file from the environment variable SWH_CONFIG_FILENAME,
eventually enriched with default configuration key/value from the defaults
dict if provided.
Returns:
Configuration dict
Raises:
AssertionError if SWH_CONFIG_FILENAME is undefined
"""
assert
(
"SWH_CONFIG_FILENAME"
in
os
.
environ
),
"SWH_CONFIG_FILENAME environment variable is undefined."
# do at import time?
cfg_path
=
os
.
environ
[
"SWH_CONFIG_FILENAME"
]
cfg
=
load_from_path
(
cfg_path
)
cfg
=
merge_dicts
(
defaults
or
Config
(),
cfg
)
return
cfg
def
merge_dicts
(
base
:
Optional
[
Dict
[
str
,
Any
]],
other
:
Optional
[
Dict
[
str
,
Any
]]):
"""Merge two config dictionaries
Merge dicts recursively, using the following rules:
- None + type -> type
- type + None -> None
- dict + dict -> dict (merged)
- val + dict -> TypeError
- dict + val -> TypeError
- val + val -> val (other)
for instance:
>>> d1 = {
... 'key1': {
... 'skey1': 'value1',
... 'skey2': {'sskey1': 'value2'},
... },
... 'key2': 'value3',
... }
with
>>> d2 = {
... 'key1': {
... 'skey1': 'value4',
... 'skey2': {'sskey2': 'value5'},
... },
... 'key3': 'value6',
... }
will give:
>>> d3 = {
... 'key1': {
... 'skey1': 'value4', # <-- note this
... 'skey2': {
... 'sskey1': 'value2',
... 'sskey2': 'value5',
... },
... },
... 'key2': 'value3',
... 'key3': 'value6',
... }
>>> assert merge_dicts(d1, d2) == d3
Note that no type checking is done for anything but dicts.
"""
if
not
isinstance
(
base
,
dict
)
or
not
isinstance
(
other
,
dict
):
raise
TypeError
(
"Cannot merge a
%s
with a
%s
"
%
(
type
(
base
),
type
(
other
)))
output
=
{}
allkeys
=
set
(
chain
(
base
.
keys
(),
other
.
keys
()))
for
k
in
allkeys
:
vb
=
base
.
get
(
k
)
vo
=
other
.
get
(
k
)
if
isinstance
(
vo
,
dict
):
output
[
k
]
=
merge_dicts
(
vb
is
not
None
and
vb
or
{},
vo
)
elif
isinstance
(
vb
,
dict
)
and
k
in
other
and
other
[
k
]
is
not
None
:
output
[
k
]
=
merge_dicts
(
vb
,
vo
is
not
None
and
vo
or
{})
elif
k
in
other
:
output
[
k
]
=
deepcopy
(
vo
)
else
:
output
[
k
]
=
deepcopy
(
vb
)
return
output
### Language ###
# CID = NewType("CID", str) # QID is (TID, IID)
# TID = NewType("TID", CID)
# IID = NewType("IID", CID)
# AID = NewType("AID", CID)
# AnyCID = Union[TID, IID, AID]
# QID = Sequence[CID] # Qualified configuration identifier
# Config = Mapping[CID, Any] # ADT please?
# AttrKey = CID
# AttrValue = Any
# Attribute = (AttrKey, AttrValue)
Component
=
type
ComponentConstructor
=
Callable
[[
type
],
Component
]
# OR Callable[[], Component] ?
# Identifiers
class
CID
:
__slots__
=
(
"str"
,)
REGEXP
=
re
.
compile
(
"[A-Za-z0-9_\-]+"
)
def
__init__
(
self
,
token
:
str
):
assert
self
.
parses
(
token
),
f
"CID must match {CID.REGEXP!r}"
self
.
str
=
token
@staticmethod
def
parses
(
token
:
str
)
->
bool
:
return
isinstance
(
token
,
str
)
and
bool
(
CID
.
REGEXP
.
fullmatch
(
token
))
class
TID
(
CID
):
pass
class
IID
(
CID
):
pass
"""
QID must have following features:
- type-strict version of the QID string spec, here `QID = ("." TID)? IID`
- have access to both string form and individual fields
- constructor taking both string form or individual fields (string or CID)
- enforce semantics of its components: TID, IID
"""
class
QID
:
# (CID) # FIXME: settle str/cid/kwargs/resolved forms...
__slots__
=
(
"str"
,
"fields"
)
SEP
=
"."
REGEXP
:
Regexp
=
re
.
compile
(
f
"{CID.REGEXP}(?:\.{CID.REGEXP})*"
)
def
__init__
(
config
:
StandaloneConfig
,
*
tokens
:
Union
[
str
,
CID
]):
print
(
f
"QID({tokens!r})"
)
all_str
=
all
(
isinstance
(
tokens
,
str
)
for
t
in
tokens
)
all_cid
=
all
(
isinstance
(
tokens
,
CID
)
for
t
in
tokens
)
assert
all_str
or
all_cid
,
"QID tokens must all either be str or CID"
qualified_str
,
*
multiple_fields
=
tokens
# Do we have 1 or more elements?
if
all_str
and
not
multiple_fields
:
assert
QID
.
parses
(
qualified_str
),
f
"QID in string form must match {QID.REGEXP.pattern!r}"
self
.
str
=
qualified_str
self
.
fields
=
tuple
(
map
(
CID
,
qualified_str
.
split
(
QID_SEP
))
else
:
if
all_str
:
self
.
str
=
QID
.
SEP
.
join
(
tokens
)
self
.
fields
=
tuple
(
map
(
CID
,
tokens
))
elif
all_cid
:
self
.
str
=
QID
.
SEP
.
join
(
t
.
str
for
t
in
tokens
)
self
.
fields
=
tokens
assert
all
(
hasattr
(
self
,
attr
)
for
attr
in
self
.
__slots__
),
"QID parsing failed"
def
get_TID
(
self
):
# FIXME: no TID in QID
return
self
.
fields
[
0
]
if
isinstance
(
self
.
fields
[
0
],
TID
)
else
None
@staticmethod
def
parses
(
token
:
str
)
->
bool
:
return
re
.
fullmatch
(
QID
.
REGEXP
,
token
)
def
TID_exists
(
token
:
str
,
register
=
_ComponentRegister
)
->
bool
:
# Keep?
return
token
in
register
.
keys
()
def
IID_exists
(
config
:
StandaloneConfig
,
iid
:
IID
)
->
bool
:
return
iid
in
self
.
_qids
def
QID_exists
(
config
:
StandaloneConfig
,
qid
:
QID
)
->
bool
:
try
:
self
.
get_obj
(
qid
)
return
True
except
ValueError
:
return
False
def
QID_exists
(
config
:
StandaloneConfig
,
qid
:
QID
)
->
bool
:
# FIXME tokens or cids?
tokens
=
qid
.
str
types
=
[]
conf_it
=
config
for
tok
in
tokens
:
if
tok
not
in
conf_it
:
raise
ValueError
()
conf_it
=
conf_it
[
tok
]
if
TID
.
exists
(
tok
)
and
len
(
types
)
==
0
:
typ
=
TID
elif
config
.
IID_exists
(
config
,
tok
)
and
types
[
-
1
:]
==
TID
:
typ
=
IID
else
:
raise
ValueError
()
types
.
append
(
typ
)
# References
class
Reference
:
REGEXP
:
Regexp
=
re
.
compile
(
f
"\s+<({QID.REGEXP.pattern})>\s+"
)
def
__init__
(
self
,
target
:
QID
,
source
:
AttrKey
):
if
not
QID
.
parses
(
qid
):
raise
ValueError
(
f
"invalid QID format: {target!r}"
)
self
.
target
=
target
self
.
source
=
source
@classmethod
def
get_target
(
cls
,
token
:
AttrValue
)
->
Optional
[
str
]:
m
=
Reference
.
REGEXP
.
fullmatch
(
token
)
return
m
.
group
(
1
)
if
m
else
None
@classmethod
def
parses
(
cls
,
token
:
str
)
->
bool
:
return
bool
(
Reference
.
REGEXP
.
fullmatch
(
token
))
# Config objects
class
Config
(
UserDict
):
# XXX: should Config be OR own a dict?
def
__new__
(
cls
,
*
args
)
->
StandaloneConfig
:
return
StandaloneConfig
.
__new__
(
*
args
)
def
get_obj
(
self
,
qid
:
QID
)
->
PartialConfig
:
if
not
QID
.
parses
(
qid
):
# Works also for AID
raise
ValueError
(
f
"invalid QID format: {qid!r}"
)
conf_it
=
self
.
data
try
:
for
cid
in
qid
.
fields
:
if
isinstance
(
conf_it
,
list
):
cid
=
int
(
cid
)
# Throws ValueError
conf_it
[
cid
]
# Works for dict and list
except
KeyError
:
raise
ValueError
(
f
"QID unknown for config: {qid!r}"
)
return
conf_it
@dataclass
class
StandaloneConfig
(
Config
):
# Cache qids and refs
_qids
:
Set
[
QID
]
=
field
(
init
=
False
)
_refs
:
Dict
[
AID
,
QID
]
=
field
(
init
=
False
)
def
__post_init__
(
self
):
self
.
parse
()
def
parse
(
self
,
config
:
Dict
):
# assert isinstance(config, Dict), "Invalid configuration definition: not a mapping"
paths
=
set
()
# parse 2 levels, validate TIDs, store QIDs
for
cid
,
obj
in
self
.
data
.
items
():
if
TID
.
exists
(
cid
):
tid
=
cid
if
not
isinstance
(
obj
,
Dict
):
raise
ValueError
(
"mapping not found under TID:"
,
type
(
obj
),
obj
)
for
cid
,
obj
in
self
[
tid
]
.
items
():
self
.
_parse_instance_item
(
cid
,
obj
)
paths
.
add
(
QID
(
TID
(
tid
),
IID
(
cid
)))
else
:
self
.
_parse_instance_item
(
cid
,
obj
)
paths
.
add
(
QID
(
IID
(
cid
)))
self
.
_qids
=
paths
def
_parse_instance_item
(
self
,
cid
:
str
,
obj
:
Collection
)
->
Collection
:
if
not
CID
.
parses
(
cid
):
raise
ValueError
(
f
"invalid CID format: {cid!r}"
)
if
not
isinstance
(
obj
,
Dict
):
raise
ValueError
(
f
"mapping not found under IID: {type(obj)}, {obj}"
)
# inst = InstanceConfig(obj, self, qid)
# refs = inst.find_references() # parses references
# self._refs.update(refs)
def
get_instance
(
self
,
qid
:
QID
)
->
InstanceConfig
:
return
InstanceConfig
(
self
.
get_obj
(
qid
),
self
,
qid
)
def
resolve_reference
(
self
,
qid
:
QID
)
->
InstanceConfig
:
try
:
return
self
.
_refs
[
QID
]
except
KeyError
:
raise
ValueError
(
f
"reference not found at {QID!r}"
)
@dataclass
class
PartialConfig
(
Config
):
root
:
StandaloneConfig
qid
:
QID
class
InstanceConfig
(
PartialConfig
):
@classmethod
def
prepare
(
self
)
->
None
:
# WIP
inst
.
root
.
find_references
(
self
)
def
find_subreferences
(
self
)
->
Dict
[
AID
,
QID
]:
# FIXME: identity disorder
refs
=
{}
for
key
,
val
in
self
.
items
():
if
Reference
.
parses
(
val
):
aid
=
QID
(
*
self
.
qid
.
fields
,
key
)
# AID(key)
refs
[
aid
]
=
Reference
.
get_target
(
val
)
return
refs
def
find_subinstances
(
self
)
->
Dict
[
AID
,
InstanceConfig
]:
# WIP
instances
=
{}
refs
=
self
.
find_subreferences
()
for
source
,
target
in
refs
:
instconfig
=
self
.
root
.
get_instance
(
target
)
instances
[
source
]
=
instconfig
for
key
,
val
in
self
.
items
():
if
isinstance
(
val
,
Dict
):
aid
=
QID
(
*
self
.
qid
.
fields
,
key
)
instances
[
aid
]
=
InstanceConfig
(
val
,
self
.
root
,
aid
)
# elif isinstance(val, List): # TODO: recursion
# subinstances = self._list_find_subinstances(val)
# for key, val in enumerate(subinstances):
# aid = QID(*self.qid.fields, key)
# instances[aid] = InstanceConfig(val, self.root, aid)
return
instances
### Instantiation API ###
_ComponentRegister
:
Final
[
Dict
[
TID
,
Component
]]
=
{}
def
register_component
(
tid
:
TID
,
comp
:
Component
)
->
None
:
_ComponentRegister
[
tid
]
=
comp
def
resolve_component
(
tid
:
TID
)
->
ComponentConstructor
:
return
_ComponentRegister
.
get
(
tid
)
def
instantiate_component
(
instconfig
:
InstanceConfig
,
ctor
:
ComponentConstructor
)
->
Component
:
inst
=
ctor
(
instdef
)
# TODO: error handling
return
inst
def
create_component
(
config
:
StandaloneConfig
,
qid
:
QID
)
->
Component
:
instconfig
,
ctor
=
_prepare_component
(
config
,
qid
)
# instance composition, subinst but not anoninst
subrefs
=
instconfig
.
find_subreferences
()
for
source_qid
,
target_qid
in
subrefs
.
items
():
subinstconfig
,
subctor
=
_prepare_component
(
config
,
target_qid
)
subinst
=
instantiate_component
(
subinstconfig
,
subctor
)
aid
=
source_qid
.
fields
[
-
1
]
.
str
instconfig
[
aid
]
=
subinst
inst
=
instantiate_component
(
instconfig
,
ctor
)
return
inst
def
_prepare_component
(
config
:
StandaloneConfig
,
qid
:
QID
)
->
Component
:
instconfig
=
config
.
get_instance
(
qid
)
tid
=
qid
.
get_TID
()
if
tid
is
None
:
raise
ValueError
(
"Component QID must contain TID"
)
ctor
=
resolve_component
(
tid
)
if
ctor
is
None
:
raise
ValueError
(
f
"TID unknown in config: {tid}"
)
return
instconfig
,
ctor
if
__name__
==
"__main__"
:
from
swh.loader.git.loader
import
GitLoader
from
swh.storage
import
get_storage
_ComponentRegister
=
{
"loader-git"
:
GitLoader
,
"storage"
:
get_storage
,
}
conf
=
load_from_path
(
Path
(
"~/Dev/tests/data/conf.yml"
)
.
expanduser
())
storage
=
create_component
(
conf
,
QID
(
"loader-git.default"
))
Event Timeline
tenma
created this paste.
Nov 24 2020, 2:50 PM
2020-11-24 14:50:44 (UTC+1)
tenma
changed the title of this paste from
Config library WIP
to
Config library WIP (does not run for now)
.
Nov 24 2020, 2:54 PM
2020-11-24 14:54:48 (UTC+1)
Log In to Comment