Changeset View
Changeset View
Standalone View
Standalone View
swh/model/identifiers.py
Show All 18 Lines | from typing import ( | ||||
List, | List, | ||||
Optional, | Optional, | ||||
Tuple, | Tuple, | ||||
Type, | Type, | ||||
TypeVar, | TypeVar, | ||||
Union, | Union, | ||||
) | ) | ||||
import urllib.parse | import urllib.parse | ||||
import warnings | |||||
import attr | import attr | ||||
from attrs_strict import type_validator | from attrs_strict import type_validator | ||||
from .collections import ImmutableDict | |||||
from .exceptions import ValidationError | from .exceptions import ValidationError | ||||
from .fields.hashes import validate_sha1 | |||||
from .hashutil import MultiHash, hash_git_data, hash_to_bytes, hash_to_hex | from .hashutil import MultiHash, hash_git_data, hash_to_bytes, hash_to_hex | ||||
class ObjectType(enum.Enum): | class ObjectType(enum.Enum): | ||||
"""Possible object types of a QualifiedSWHID or CoreSWHID. | """Possible object types of a QualifiedSWHID or CoreSWHID. | ||||
The values of each variant is what is used in the SWHID's string representation.""" | The values of each variant is what is used in the SWHID's string representation.""" | ||||
Show All 23 Lines | |||||
ORIGIN = "origin" | ORIGIN = "origin" | ||||
SNAPSHOT = "snapshot" | SNAPSHOT = "snapshot" | ||||
REVISION = "revision" | REVISION = "revision" | ||||
RELEASE = "release" | RELEASE = "release" | ||||
DIRECTORY = "directory" | DIRECTORY = "directory" | ||||
CONTENT = "content" | CONTENT = "content" | ||||
RAW_EXTRINSIC_METADATA = "raw_extrinsic_metadata" | RAW_EXTRINSIC_METADATA = "raw_extrinsic_metadata" | ||||
SWHID_NAMESPACE = "swh" | SWHID_NAMESPACE = "swh" | ||||
SWHID_VERSION = 1 | SWHID_VERSION = 1 | ||||
SWHID_TYPES = ["snp", "rel", "rev", "dir", "cnt"] | SWHID_TYPES = ["snp", "rel", "rev", "dir", "cnt"] | ||||
EXTENDED_SWHID_TYPES = SWHID_TYPES + ["ori", "emd"] | EXTENDED_SWHID_TYPES = SWHID_TYPES + ["ori", "emd"] | ||||
SWHID_SEP = ":" | SWHID_SEP = ":" | ||||
SWHID_CTXT_SEP = ";" | SWHID_CTXT_SEP = ";" | ||||
SWHID_QUALIFIERS = {"origin", "anchor", "visit", "path", "lines"} | SWHID_QUALIFIERS = {"origin", "anchor", "visit", "path", "lines"} | ||||
▲ Show 20 Lines • Show All 639 Lines • ▼ Show 20 Lines | def origin_identifier(origin): | ||||
"""Return the intrinsic identifier for an origin. | """Return the intrinsic identifier for an origin. | ||||
An origin's identifier is the sha1 checksum of the entire origin URL | An origin's identifier is the sha1 checksum of the entire origin URL | ||||
""" | """ | ||||
return hashlib.sha1(origin["url"].encode("utf-8")).hexdigest() | return hashlib.sha1(origin["url"].encode("utf-8")).hexdigest() | ||||
_object_type_map = { | |||||
ORIGIN: {"short_name": "ori", "key_id": "id"}, | |||||
SNAPSHOT: {"short_name": "snp", "key_id": "id"}, | |||||
RELEASE: {"short_name": "rel", "key_id": "id"}, | |||||
REVISION: {"short_name": "rev", "key_id": "id"}, | |||||
DIRECTORY: {"short_name": "dir", "key_id": "id"}, | |||||
CONTENT: {"short_name": "cnt", "key_id": "sha1_git"}, | |||||
RAW_EXTRINSIC_METADATA: {"short_name": "emd", "key_id": "id"}, | |||||
} | |||||
_swhid_type_map = { | |||||
"ori": ORIGIN, | |||||
"snp": SNAPSHOT, | |||||
"rel": RELEASE, | |||||
"rev": REVISION, | |||||
"dir": DIRECTORY, | |||||
"cnt": CONTENT, | |||||
"emd": RAW_EXTRINSIC_METADATA, | |||||
} | |||||
# type of the "object_type" attribute of the SWHID class; either | # type of the "object_type" attribute of the SWHID class; either | ||||
# ObjectType or ExtendedObjectType | # ObjectType or ExtendedObjectType | ||||
_TObjectType = TypeVar("_TObjectType", ObjectType, ExtendedObjectType) | _TObjectType = TypeVar("_TObjectType", ObjectType, ExtendedObjectType) | ||||
# the SWHID class itself (this is used so that X.from_string() can return X | # the SWHID class itself (this is used so that X.from_string() can return X | ||||
# for all X subclass of _BaseSWHID) | # for all X subclass of _BaseSWHID) | ||||
_TSWHID = TypeVar("_TSWHID", bound="_BaseSWHID") | _TSWHID = TypeVar("_TSWHID", bound="_BaseSWHID") | ||||
▲ Show 20 Lines • Show All 331 Lines • ▼ Show 20 Lines | class ExtendedSWHID(_BaseSWHID[ExtendedObjectType]): | ||||
object_type = attr.ib( | object_type = attr.ib( | ||||
type=ExtendedObjectType, | type=ExtendedObjectType, | ||||
validator=type_validator(), | validator=type_validator(), | ||||
converter=ExtendedObjectType, | converter=ExtendedObjectType, | ||||
) | ) | ||||
"""the type of object the identifier points to""" | """the type of object the identifier points to""" | ||||
@attr.s(frozen=True) | |||||
class SWHID: | |||||
""" | |||||
Deprecated alternative to QualifiedSWHID. | |||||
Args: | |||||
namespace (str): the namespace of the identifier, defaults to ``swh`` | |||||
scheme_version (int): the scheme version of the identifier, | |||||
defaults to 1 | |||||
object_type (str): the type of object the identifier points to, | |||||
either ``content``, ``directory``, ``release``, ``revision`` or ``snapshot`` | |||||
object_id (str): object's identifier | |||||
metadata (dict): optional dict filled with metadata related to | |||||
pointed object | |||||
Raises: | |||||
swh.model.exceptions.ValidationError: In case of invalid object type or id | |||||
Once created, it contains the following attributes: | |||||
Attributes: | |||||
namespace (str): the namespace of the identifier | |||||
scheme_version (int): the scheme version of the identifier | |||||
object_type (str): the type of object the identifier points to | |||||
object_id (str): hexadecimal representation of the object hash | |||||
metadata (dict): metadata related to the pointed object | |||||
To get the raw SWHID string from an instance of this named tuple, | |||||
use the :func:`str` function:: | |||||
swhid = SWHID( | |||||
object_type='content', | |||||
object_id='8ff44f081d43176474b267de5451f2c2e88089d0' | |||||
) | |||||
swhid_str = str(swhid) | |||||
# 'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0' | |||||
""" | |||||
namespace = attr.ib(type=str, default=SWHID_NAMESPACE) | |||||
scheme_version = attr.ib(type=int, default=SWHID_VERSION) | |||||
object_type = attr.ib(type=str, default="") | |||||
object_id = attr.ib(type=str, converter=hash_to_hex, default="") # type: ignore | |||||
metadata = attr.ib( | |||||
type=ImmutableDict[str, Any], converter=ImmutableDict, default=ImmutableDict() | |||||
) | |||||
def __attrs_post_init__(self): | |||||
warnings.warn( | |||||
"swh.model.identifiers.SWHID is deprecated; " | |||||
"use swh.model.identifiers.QualifiedSWHID instead.", | |||||
DeprecationWarning, | |||||
) | |||||
@namespace.validator | |||||
def check_namespace(self, attribute, value): | |||||
if value != SWHID_NAMESPACE: | |||||
raise ValidationError( | |||||
"Invalid SWHID: invalid namespace: %(namespace)s", | |||||
params={"namespace": value}, | |||||
) | |||||
@scheme_version.validator | |||||
def check_scheme_version(self, attribute, value): | |||||
if value != SWHID_VERSION: | |||||
raise ValidationError( | |||||
"Invalid SWHID: invalid version: %(version)s", params={"version": value} | |||||
) | |||||
@object_type.validator | |||||
def check_object_type(self, attribute, value): | |||||
if value not in _object_type_map: | |||||
raise ValidationError( | |||||
"Invalid SWHID: invalid type: %(object_type)s)", | |||||
params={"object_type": value}, | |||||
) | |||||
@object_id.validator | |||||
def check_object_id(self, attribute, value): | |||||
try: | |||||
validate_sha1(value) # can raise if invalid hash | |||||
except ValidationError: | |||||
raise ValidationError( | |||||
"Invalid SWHID: invalid checksum: %(object_id)s", | |||||
params={"object_id": value}, | |||||
) from None | |||||
@metadata.validator | |||||
def check_qualifiers(self, attribute, value): | |||||
for k in value: | |||||
if k not in SWHID_QUALIFIERS: | |||||
raise ValidationError( | |||||
"Invalid SWHID: unknown qualifier: %(qualifier)s", | |||||
params={"qualifier": k}, | |||||
) | |||||
def to_dict(self) -> Dict[str, Any]: | |||||
return attr.asdict(self) | |||||
def __str__(self) -> str: | |||||
o = _object_type_map.get(self.object_type) | |||||
assert o | |||||
swhid = SWHID_SEP.join( | |||||
[self.namespace, str(self.scheme_version), o["short_name"], self.object_id] | |||||
) | |||||
if self.metadata: | |||||
for k, v in self.metadata.items(): | |||||
swhid += "%s%s=%s" % (SWHID_CTXT_SEP, k, v) | |||||
return swhid | |||||
def swhid( | |||||
object_type: str, | |||||
object_id: Union[str, Dict[str, Any]], | |||||
scheme_version: int = 1, | |||||
metadata: Union[ImmutableDict[str, Any], Dict[str, Any]] = ImmutableDict(), | |||||
) -> str: | |||||
"""Compute :ref:`persistent-identifiers` | |||||
Args: | |||||
object_type: object's type, either ``content``, ``directory``, | |||||
``release``, ``revision`` or ``snapshot`` | |||||
object_id: object's identifier | |||||
scheme_version: SWHID scheme version, defaults to 1 | |||||
metadata: metadata related to the pointed object | |||||
Raises: | |||||
swh.model.exceptions.ValidationError: In case of invalid object type or id | |||||
Returns: | |||||
the SWHID of the object | |||||
""" | |||||
if isinstance(object_id, dict): | |||||
o = _object_type_map[object_type] | |||||
object_id = object_id[o["key_id"]] | |||||
swhid = SWHID( | |||||
scheme_version=scheme_version, | |||||
object_type=object_type, | |||||
object_id=object_id, | |||||
metadata=metadata, # type: ignore # mypy can't properly unify types | |||||
) | |||||
return str(swhid) | |||||
def _parse_swhid(swhid: str) -> Dict[str, Any]: | def _parse_swhid(swhid: str) -> Dict[str, Any]: | ||||
"""Parse a Software Heritage identifier (SWHID) from string (see: | """Parse a Software Heritage identifier (SWHID) from string (see: | ||||
:ref:`persistent-identifiers`.) | :ref:`persistent-identifiers`.) | ||||
This is for internal use; use :meth:`CoreSWHID.from_string`, | This is for internal use; use :meth:`CoreSWHID.from_string`, | ||||
:meth:`QualifiedSWHID.from_string`, or :meth:`ExtendedSWHID.from_string` instead, | :meth:`QualifiedSWHID.from_string`, or :meth:`ExtendedSWHID.from_string` instead, | ||||
as they perform validation and build a dataclass. | as they perform validation and build a dataclass. | ||||
Show All 22 Lines | if qualifiers_raw: | ||||
"Invalid SWHID: invalid qualifier: %(qualifier)s", | "Invalid SWHID: invalid qualifier: %(qualifier)s", | ||||
params={"qualifier": qualifier}, | params={"qualifier": qualifier}, | ||||
) | ) | ||||
parts["qualifiers"][k] = v | parts["qualifiers"][k] = v | ||||
parts["scheme_version"] = int(parts["scheme_version"]) | parts["scheme_version"] = int(parts["scheme_version"]) | ||||
parts["object_id"] = hash_to_bytes(parts["object_id"]) | parts["object_id"] = hash_to_bytes(parts["object_id"]) | ||||
return parts | return parts | ||||
def parse_swhid(swhid: str) -> SWHID: | |||||
"""Parse a Software Heritage identifier (SWHID) from string (see: | |||||
:ref:`persistent-identifiers`.) | |||||
Args: | |||||
swhid (str): A persistent identifier | |||||
Raises: | |||||
swh.model.exceptions.ValidationError: if passed string is not a valid SWHID | |||||
""" | |||||
parts = _parse_swhid(swhid) | |||||
return SWHID( | |||||
parts["namespace"], | |||||
parts["scheme_version"], | |||||
_swhid_type_map[parts["object_type"]], | |||||
hash_to_hex(parts["object_id"]), | |||||
parts["qualifiers"], | |||||
) |