Changeset View
Changeset View
Standalone View
Standalone View
swh/model/identifiers.py
Show First 20 Lines • Show All 47 Lines • ▼ Show 20 Lines | |||||
SWHID_NAMESPACE = "swh" | SWHID_NAMESPACE = "swh" | ||||
SWHID_VERSION = 1 | SWHID_VERSION = 1 | ||||
SWHID_TYPES = ["ori", "snp", "rel", "rev", "dir", "cnt"] | SWHID_TYPES = ["ori", "snp", "rel", "rev", "dir", "cnt"] | ||||
SWHID_SEP = ":" | SWHID_SEP = ":" | ||||
SWHID_CTXT_SEP = ";" | SWHID_CTXT_SEP = ";" | ||||
SWHID_QUALIFIERS = {"origin", "anchor", "visit", "path", "lines"} | SWHID_QUALIFIERS = {"origin", "anchor", "visit", "path", "lines"} | ||||
SWHID_RE_RAW = ( | SWHID_RE_RAW = ( | ||||
f"(?P<scheme>{SWHID_NAMESPACE})" | f"(?P<namespace>{SWHID_NAMESPACE})" | ||||
f"{SWHID_SEP}(?P<version>{SWHID_VERSION})" | f"{SWHID_SEP}(?P<scheme_version>{SWHID_VERSION})" | ||||
f"{SWHID_SEP}(?P<object_type>{'|'.join(SWHID_TYPES)})" | f"{SWHID_SEP}(?P<object_type>{'|'.join(SWHID_TYPES)})" | ||||
f"{SWHID_SEP}(?P<object_id>[0-9a-f]{{40}})" | f"{SWHID_SEP}(?P<object_id>[0-9a-f]{{40}})" | ||||
f"({SWHID_CTXT_SEP}(?P<qualifiers>\\S+))?" | f"({SWHID_CTXT_SEP}(?P<qualifiers>\\S+))?" | ||||
) | ) | ||||
SWHID_RE = re.compile(SWHID_RE_RAW) | SWHID_RE = re.compile(SWHID_RE_RAW) | ||||
@lru_cache() | @lru_cache() | ||||
▲ Show 20 Lines • Show All 683 Lines • ▼ Show 20 Lines | class CoreSWHID: | ||||
""" | """ | ||||
namespace = attr.ib(type=str, default=SWHID_NAMESPACE) | namespace = attr.ib(type=str, default=SWHID_NAMESPACE) | ||||
"""the namespace of the identifier, defaults to ``swh``""" | """the namespace of the identifier, defaults to ``swh``""" | ||||
scheme_version = attr.ib(type=int, default=SWHID_VERSION) | scheme_version = attr.ib(type=int, default=SWHID_VERSION) | ||||
"""the scheme version of the identifier, defaults to 1""" | """the scheme version of the identifier, defaults to 1""" | ||||
object_type = attr.ib(type=ObjectType, validator=type_validator()) | object_type = attr.ib( | ||||
type=ObjectType, validator=type_validator(), converter=ObjectType | |||||
) | |||||
"""the type of object the identifier points to""" | """the type of object the identifier points to""" | ||||
object_id = attr.ib(type=bytes, validator=type_validator()) | object_id = attr.ib(type=bytes, validator=type_validator()) | ||||
"""object's identifier""" | """object's identifier""" | ||||
@namespace.validator | @namespace.validator | ||||
def check_namespace(self, attribute, value): | def check_namespace(self, attribute, value): | ||||
if value != SWHID_NAMESPACE: | if value != SWHID_NAMESPACE: | ||||
Show All 24 Lines | def __str__(self) -> str: | ||||
str(self.scheme_version), | str(self.scheme_version), | ||||
self.object_type.value, | self.object_type.value, | ||||
hash_to_hex(self.object_id), | hash_to_hex(self.object_id), | ||||
] | ] | ||||
) | ) | ||||
@classmethod | @classmethod | ||||
def from_string(cls, s: str) -> CoreSWHID: | def from_string(cls, s: str) -> CoreSWHID: | ||||
with warnings.catch_warnings(): | parts = _parse_swhid(s) | ||||
olasd: Not sure if that gets refactored later? | |||||
Done Inline Actionsit doesn't, why? vlorentz: it doesn't, why? | |||||
warnings.simplefilter("ignore") | if parts.pop("qualifiers"): | ||||
old_swhid = parse_swhid(s) | |||||
object_type = ObjectType(_object_type_map[old_swhid.object_type]["short_name"]) | |||||
if old_swhid.metadata: | |||||
raise ValidationError("CoreSWHID does not support qualifiers.") | raise ValidationError("CoreSWHID does not support qualifiers.") | ||||
return CoreSWHID( | try: | ||||
namespace=old_swhid.namespace, | return CoreSWHID(**parts) | ||||
scheme_version=old_swhid.scheme_version, | except ValueError as e: | ||||
object_type=object_type, | raise ValidationError(*e.args) from None | ||||
object_id=hash_to_bytes(old_swhid.object_id), | |||||
) | |||||
def _parse_core_swhid(swhid: Union[str, CoreSWHID, None]) -> Optional[CoreSWHID]: | def _parse_core_swhid(swhid: Union[str, CoreSWHID, None]) -> Optional[CoreSWHID]: | ||||
"""Alias of CoreSWHID.from_string to make mypy happy...... | """Alias of CoreSWHID.from_string to make mypy happy...... | ||||
https://github.com/python/mypy/issues/6172""" | https://github.com/python/mypy/issues/6172""" | ||||
if swhid is None or isinstance(swhid, CoreSWHID): | if swhid is None or isinstance(swhid, CoreSWHID): | ||||
return swhid | return swhid | ||||
▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | class QualifiedSWHID: | ||||
""" | """ | ||||
namespace = attr.ib(type=str, default=SWHID_NAMESPACE) | namespace = attr.ib(type=str, default=SWHID_NAMESPACE) | ||||
"""the namespace of the identifier, defaults to ``swh``""" | """the namespace of the identifier, defaults to ``swh``""" | ||||
scheme_version = attr.ib(type=int, default=SWHID_VERSION) | scheme_version = attr.ib(type=int, default=SWHID_VERSION) | ||||
"""the scheme version of the identifier, defaults to 1""" | """the scheme version of the identifier, defaults to 1""" | ||||
object_type = attr.ib(type=ObjectType, validator=type_validator()) | object_type = attr.ib( | ||||
type=ObjectType, validator=type_validator(), converter=ObjectType | |||||
) | |||||
"""the type of object the identifier points to""" | """the type of object the identifier points to""" | ||||
Not Done Inline ActionsI guess this converter is getting tested via the new from_string? Maybe it'd make sense to have a simple unit test for it. olasd: I guess this converter is getting tested via the new `from_string`? Maybe it'd make sense to… | |||||
Done Inline Actionsyes and so do the other ones. I'll add an other diff for that vlorentz: yes and so do the other ones. I'll add an other diff for that | |||||
Done Inline Actionsvlorentz: D5128 | |||||
object_id = attr.ib(type=bytes, validator=type_validator()) | object_id = attr.ib(type=bytes, validator=type_validator()) | ||||
"""object's identifier""" | """object's identifier""" | ||||
# qualifiers: | # qualifiers: | ||||
origin = attr.ib(type=Optional[str], default=None, validator=type_validator()) | origin = attr.ib(type=Optional[str], default=None, validator=type_validator()) | ||||
"""the software origin where an object has been found or observed in the wild, | """the software origin where an object has been found or observed in the wild, | ||||
▲ Show 20 Lines • Show All 94 Lines • ▼ Show 20 Lines | def __str__(self) -> str: | ||||
qualifiers = self.qualifiers() | qualifiers = self.qualifiers() | ||||
if qualifiers: | if qualifiers: | ||||
for k, v in qualifiers.items(): | for k, v in qualifiers.items(): | ||||
swhid += "%s%s=%s" % (SWHID_CTXT_SEP, k, v) | swhid += "%s%s=%s" % (SWHID_CTXT_SEP, k, v) | ||||
return swhid | return swhid | ||||
@classmethod | @classmethod | ||||
def from_string(cls, s: str) -> QualifiedSWHID: | def from_string(cls, s: str) -> QualifiedSWHID: | ||||
with warnings.catch_warnings(): | parts = _parse_swhid(s) | ||||
warnings.simplefilter("ignore") | qualifiers = parts.pop("qualifiers") | ||||
old_swhid = parse_swhid(s) | invalid_qualifiers = set(qualifiers) - SWHID_QUALIFIERS | ||||
object_type = ObjectType(_object_type_map[old_swhid.object_type]["short_name"]) | if invalid_qualifiers: | ||||
return QualifiedSWHID( | raise ValidationError( | ||||
namespace=old_swhid.namespace, | "Invalid qualifier(s): {', '.join(invalid_qualifiers)}" | ||||
scheme_version=old_swhid.scheme_version, | |||||
object_type=object_type, | |||||
object_id=hash_to_bytes(old_swhid.object_id), | |||||
**old_swhid.metadata, | |||||
) | ) | ||||
try: | |||||
return QualifiedSWHID(**parts, **qualifiers) | |||||
except ValueError as e: | |||||
raise ValidationError(*e.args) from None | |||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class SWHID: | class SWHID: | ||||
""" | """ | ||||
Deprecated alternative to QualifiedSWHID. | Deprecated alternative to QualifiedSWHID. | ||||
Args: | Args: | ||||
▲ Show 20 Lines • Show All 130 Lines • ▼ Show 20 Lines | swhid = SWHID( | ||||
scheme_version=scheme_version, | scheme_version=scheme_version, | ||||
object_type=object_type, | object_type=object_type, | ||||
object_id=object_id, | object_id=object_id, | ||||
metadata=metadata, # type: ignore # mypy can't properly unify types | metadata=metadata, # type: ignore # mypy can't properly unify types | ||||
) | ) | ||||
return str(swhid) | return str(swhid) | ||||
def parse_swhid(swhid: str) -> SWHID: | def _parse_swhid(swhid: str) -> Dict[str, Any]: | ||||
"""Parse a Software Heritage identifier (SWHID) from string (see: | """Parse a Software Heritage identifier (SWHID) from string (see: | ||||
:ref:`persistent-identifiers`.) | :ref:`persistent-identifiers`.) | ||||
This is for internal use; use :meth:`CoreSWHID.from_string`, | |||||
:meth:`QualifiedSWHID.from_string`, or :meth:`ExtendedSWHID.from_string` instead, | |||||
as they perform validation and build a dataclass. | |||||
Args: | Args: | ||||
swhid (str): A persistent identifier | swhid (str): A persistent identifier | ||||
Returns: | |||||
a named tuple holding the parsing result | |||||
Raises: | Raises: | ||||
swh.model.exceptions.ValidationError: if passed string is not a valid SWHID | swh.model.exceptions.ValidationError: if passed string is not a valid SWHID | ||||
""" | """ | ||||
m = SWHID_RE.fullmatch(swhid) | m = SWHID_RE.fullmatch(swhid) | ||||
if not m: | if not m: | ||||
raise ValidationError( | raise ValidationError( | ||||
"Invalid SWHID: invalid syntax: %(swhid)s", params={"swhid": swhid} | "Invalid SWHID: invalid syntax: %(swhid)s", params={"swhid": swhid} | ||||
) | ) | ||||
parts = m.groupdict() | parts: Dict[str, Any] = m.groupdict() | ||||
_qualifiers = {} | |||||
qualifiers_raw = parts["qualifiers"] | qualifiers_raw = parts["qualifiers"] | ||||
parts["qualifiers"] = {} | |||||
if qualifiers_raw: | if qualifiers_raw: | ||||
for qualifier in qualifiers_raw.split(SWHID_CTXT_SEP): | for qualifier in qualifiers_raw.split(SWHID_CTXT_SEP): | ||||
try: | try: | ||||
k, v = qualifier.split("=") | k, v = qualifier.split("=") | ||||
except ValueError: | except ValueError: | ||||
raise ValidationError( | raise ValidationError( | ||||
"Invalid SWHID: invalid qualifier: %(qualifier)s", | "Invalid SWHID: invalid qualifier: %(qualifier)s", | ||||
params={"qualifier": qualifier}, | params={"qualifier": qualifier}, | ||||
) | ) | ||||
_qualifiers[k] = v | parts["qualifiers"][k] = v | ||||
parts["scheme_version"] = int(parts["scheme_version"]) | |||||
parts["object_id"] = hash_to_bytes(parts["object_id"]) | |||||
return parts | |||||
def parse_swhid(swhid: str) -> SWHID: | |||||
"""Parse a Software Heritage identifier (SWHID) from string (see: | |||||
:ref:`persistent-identifiers`.) | |||||
Args: | |||||
swhid (str): A persistent identifier | |||||
Raises: | |||||
swh.model.exceptions.ValidationError: if passed string is not a valid SWHID | |||||
""" | |||||
parts = _parse_swhid(swhid) | |||||
return SWHID( | return SWHID( | ||||
parts["scheme"], | parts["namespace"], | ||||
int(parts["version"]), | parts["scheme_version"], | ||||
_swhid_type_map[parts["object_type"]], | _swhid_type_map[parts["object_type"]], | ||||
parts["object_id"], | hash_to_hex(parts["object_id"]), | ||||
_qualifiers, # type: ignore # mypy can't properly unify types | parts["qualifiers"], | ||||
) | ) |
Not sure if that gets refactored later?