Changeset View
Changeset View
Standalone View
Standalone View
swh/model/identifiers.py
Show First 20 Lines • Show All 803 Lines • ▼ Show 20 Lines | def from_string(cls, s: str) -> CoreSWHID: | ||||
return CoreSWHID( | return CoreSWHID( | ||||
namespace=old_swhid.namespace, | namespace=old_swhid.namespace, | ||||
scheme_version=old_swhid.scheme_version, | scheme_version=old_swhid.scheme_version, | ||||
object_type=object_type, | object_type=object_type, | ||||
object_id=hash_to_bytes(old_swhid.object_id), | object_id=hash_to_bytes(old_swhid.object_id), | ||||
) | ) | ||||
def _parse_core_swhid(swhid: Union[str, CoreSWHID, None]) -> Optional[CoreSWHID]: | |||||
"""Alias of CoreSWHID.from_string to make mypy happy...... | |||||
https://github.com/python/mypy/issues/6172""" | |||||
if swhid is None or isinstance(swhid, CoreSWHID): | |||||
return swhid | |||||
else: | |||||
return CoreSWHID.from_string(swhid) | |||||
def _parse_lines_qualifier( | |||||
lines: Union[str, Tuple[int, Optional[int]], None] | |||||
) -> Optional[Tuple[int, Optional[int]]]: | |||||
if lines is None or isinstance(lines, tuple): | |||||
return lines | |||||
elif "-" in lines: | |||||
olasd: Instead of `tuple(map())` you can probably write this in a way that doesn't need a type ignore. | |||||
return tuple(map(int, lines.split("-", 2))) # type: ignore | |||||
else: | |||||
return (int(lines), None) | |||||
@attr.s(frozen=True, kw_only=True) | @attr.s(frozen=True, kw_only=True) | ||||
class QualifiedSWHID: | class QualifiedSWHID: | ||||
""" | """ | ||||
Dataclass holding the relevant info associated to a SoftWare Heritage | Dataclass holding the relevant info associated to a SoftWare Heritage | ||||
persistent IDentifier (SWHID) | persistent IDentifier (SWHID) | ||||
Raises: | Raises: | ||||
swh.model.exceptions.ValidationError: In case of invalid object type or id | swh.model.exceptions.ValidationError: In case of invalid object type or id | ||||
To get the raw SWHID string from an instance of this class, | To get the raw SWHID string from an instance of this class, | ||||
use the :func:`str` function: | use the :func:`str` function: | ||||
>>> swhid = QualifiedSWHID( | >>> swhid = QualifiedSWHID( | ||||
... object_type=ObjectType.CONTENT, | ... object_type=ObjectType.CONTENT, | ||||
... object_id=bytes.fromhex('8ff44f081d43176474b267de5451f2c2e88089d0'), | ... object_id=bytes.fromhex('8ff44f081d43176474b267de5451f2c2e88089d0'), | ||||
... qualifiers={"lines": "5-10"}, | ... lines=(5, 10), | ||||
... ) | ... ) | ||||
>>> str(swhid) | >>> str(swhid) | ||||
'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0;lines=5-10' | 'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0;lines=5-10' | ||||
And vice-versa with :meth:`QualifiedSWHID.from_string`: | And vice-versa with :meth:`QualifiedSWHID.from_string`: | ||||
>>> swhid == QualifiedSWHID.from_string( | >>> swhid == QualifiedSWHID.from_string( | ||||
... "swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0;lines=5-10" | ... "swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0;lines=5-10" | ||||
... ) | ... ) | ||||
True | True | ||||
""" | """ | ||||
namespace = attr.ib(type=str, default=SWHID_NAMESPACE) | namespace = attr.ib(type=str, default=SWHID_NAMESPACE) | ||||
"""the namespace of the identifier, defaults to ``swh``""" | """the namespace of the identifier, defaults to ``swh``""" | ||||
scheme_version = attr.ib(type=int, default=SWHID_VERSION) | scheme_version = attr.ib(type=int, default=SWHID_VERSION) | ||||
"""the scheme version of the identifier, defaults to 1""" | """the scheme version of the identifier, defaults to 1""" | ||||
object_type = attr.ib(type=ObjectType, validator=type_validator()) | object_type = attr.ib(type=ObjectType, validator=type_validator()) | ||||
"""the type of object the identifier points to""" | """the type of object the identifier points to""" | ||||
object_id = attr.ib(type=bytes, validator=type_validator()) | object_id = attr.ib(type=bytes, validator=type_validator()) | ||||
"""object's identifier""" | """object's identifier""" | ||||
qualifiers = attr.ib( | # qualifiers: | ||||
type=ImmutableDict[str, Any], converter=ImmutableDict, default=ImmutableDict() | |||||
origin = attr.ib(type=Optional[str], default=None, validator=type_validator()) | |||||
"""the software origin where an object has been found or observed in the wild, | |||||
as an URI""" | |||||
visit = attr.ib(type=Optional[CoreSWHID], default=None, converter=_parse_core_swhid) | |||||
"""the core identifier of a snapshot corresponding to a specific visit | |||||
of a repository containing the designated object""" | |||||
anchor = attr.ib( | |||||
type=Optional[CoreSWHID], | |||||
default=None, | |||||
validator=type_validator(), | |||||
converter=_parse_core_swhid, | |||||
) | |||||
"""a designated node in the Merkle DAG relative to which a path to the object | |||||
is specified, as the core identifier of a directory, a revision, a release, | |||||
or a snapshot""" | |||||
path = attr.ib(type=Optional[bytes], default=None, validator=type_validator()) | |||||
"""the absolute file path, from the root directory associated to the anchor node, | |||||
to the object; when the anchor denotes a directory or a revision, and almost always | |||||
when it’s a release, the root directory is uniquely determined; | |||||
when the anchor denotes a snapshot, the root directory is the one pointed to by HEAD | |||||
(possibly indirectly), and undefined if such a reference is missing""" | |||||
lines = attr.ib( | |||||
type=Optional[Tuple[int, Optional[int]]], | |||||
default=None, | |||||
validator=type_validator(), | |||||
converter=_parse_lines_qualifier, | |||||
) | ) | ||||
"""optional dict filled with metadata related to pointed object""" | """lines: line number(s) of interest, usually within a content object""" | ||||
@namespace.validator | @namespace.validator | ||||
def check_namespace(self, attribute, value): | def check_namespace(self, attribute, value): | ||||
if value != SWHID_NAMESPACE: | if value != SWHID_NAMESPACE: | ||||
raise ValidationError( | raise ValidationError( | ||||
"Invalid SWHID: invalid namespace: %(namespace)s", | "Invalid SWHID: invalid namespace: %(namespace)s", | ||||
params={"namespace": value}, | params={"namespace": value}, | ||||
) | ) | ||||
@scheme_version.validator | @scheme_version.validator | ||||
def check_scheme_version(self, attribute, value): | def check_scheme_version(self, attribute, value): | ||||
if value != SWHID_VERSION: | if value != SWHID_VERSION: | ||||
raise ValidationError( | raise ValidationError( | ||||
"Invalid SWHID: invalid version: %(version)s", params={"version": value} | "Invalid SWHID: invalid version: %(version)s", params={"version": value} | ||||
) | ) | ||||
@object_id.validator | @object_id.validator | ||||
def check_object_id(self, attribute, value): | def check_object_id(self, attribute, value): | ||||
if len(value) != 20: | if len(value) != 20: | ||||
raise ValidationError( | raise ValidationError( | ||||
"Invalid SWHID: invalid checksum: %(object_id)s", | "Invalid SWHID: invalid checksum: %(object_id)s", | ||||
params={"object_id": hash_to_hex(value)}, | params={"object_id": hash_to_hex(value)}, | ||||
) | ) | ||||
@qualifiers.validator | @visit.validator | ||||
def check_qualifiers(self, attribute, value): | def check_visit(self, attribute, value): | ||||
for k in value: | if value and value.object_type != ObjectType.SNAPSHOT: | ||||
if k not in SWHID_QUALIFIERS: | |||||
raise ValidationError( | raise ValidationError( | ||||
"Invalid SWHID: unknown qualifier: %(qualifier)s", | f"The 'visit' qualifier must be a 'snp' SWHID, " | ||||
params={"qualifier": k}, | f"not '{value.object_type.value}'" | ||||
) | |||||
@anchor.validator | |||||
def check_anchor(self, attribute, value): | |||||
if value and value.object_type not in ( | |||||
ObjectType.DIRECTORY, | |||||
ObjectType.REVISION, | |||||
ObjectType.RELEASE, | |||||
ObjectType.SNAPSHOT, | |||||
): | |||||
raise ValidationError( | |||||
f"The 'visit' qualifier must be a 'dir', 'rev', 'rel', or 'snp' SWHID, " | |||||
f"not '{value.object_type.value}'" | |||||
) | ) | ||||
def qualifiers(self) -> Dict[str, str]: | |||||
d: Dict[str, Optional[str]] = { | |||||
"origin": self.origin, | |||||
"visit": str(self.visit) if self.visit else None, | |||||
"anchor": str(self.anchor) if self.anchor else None, | |||||
"path": self.path.decode() if self.path is not None else None, | |||||
Not Done Inline ActionsDoesn't that return "1-None" for a single line? olasd: Doesn't that return "1-None" for a single line? | |||||
"lines": "-".join(map(str, self.lines)) if self.lines else None, | |||||
} | |||||
return {k: v for (k, v) in d.items() if v is not None} | |||||
def __str__(self) -> str: | def __str__(self) -> str: | ||||
swhid = SWHID_SEP.join( | swhid = SWHID_SEP.join( | ||||
[ | [ | ||||
self.namespace, | self.namespace, | ||||
str(self.scheme_version), | str(self.scheme_version), | ||||
self.object_type.value, | self.object_type.value, | ||||
hash_to_hex(self.object_id), | hash_to_hex(self.object_id), | ||||
] | ] | ||||
) | ) | ||||
if self.qualifiers: | qualifiers = self.qualifiers() | ||||
for k, v in self.qualifiers.items(): | if qualifiers: | ||||
for k, v in qualifiers.items(): | |||||
swhid += "%s%s=%s" % (SWHID_CTXT_SEP, k, v) | swhid += "%s%s=%s" % (SWHID_CTXT_SEP, k, v) | ||||
return swhid | return swhid | ||||
@classmethod | @classmethod | ||||
def from_string(cls, s: str) -> QualifiedSWHID: | def from_string(cls, s: str) -> QualifiedSWHID: | ||||
with warnings.catch_warnings(): | with warnings.catch_warnings(): | ||||
warnings.simplefilter("ignore") | warnings.simplefilter("ignore") | ||||
old_swhid = parse_swhid(s) | old_swhid = parse_swhid(s) | ||||
object_type = ObjectType(_object_type_map[old_swhid.object_type]["short_name"]) | object_type = ObjectType(_object_type_map[old_swhid.object_type]["short_name"]) | ||||
return QualifiedSWHID( | return QualifiedSWHID( | ||||
namespace=old_swhid.namespace, | namespace=old_swhid.namespace, | ||||
scheme_version=old_swhid.scheme_version, | scheme_version=old_swhid.scheme_version, | ||||
object_type=object_type, | object_type=object_type, | ||||
object_id=hash_to_bytes(old_swhid.object_id), | object_id=hash_to_bytes(old_swhid.object_id), | ||||
qualifiers=old_swhid.metadata, | **old_swhid.metadata, | ||||
) | ) | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class SWHID: | class SWHID: | ||||
""" | """ | ||||
Deprecated alternative to QualifiedSWHID. | Deprecated alternative to QualifiedSWHID. | ||||
▲ Show 20 Lines • Show All 180 Lines • Show Last 20 Lines |
Instead of tuple(map()) you can probably write this in a way that doesn't need a type ignore.