Changeset View
Changeset View
Standalone View
Standalone View
swh/model/identifiers.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import binascii | import binascii | ||||
import datetime | import datetime | ||||
from functools import lru_cache | from functools import lru_cache | ||||
import hashlib | import hashlib | ||||
import re | from typing import Iterable, List, Optional, Tuple | ||||
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union | |||||
import attr | # for bw compat | ||||
from swh.model.swhid import SWHID, SWHID_RE, SWHID_TYPES, parse_swhid, swhid # noqa | |||||
from .collections import ImmutableDict | from .hashutil import MultiHash, hash_git_data | ||||
from .exceptions import ValidationError | |||||
from .fields.hashes import validate_sha1 | |||||
from .hashutil import MultiHash, hash_git_data, hash_to_hex | |||||
ORIGIN = "origin" | |||||
SNAPSHOT = "snapshot" | |||||
REVISION = "revision" | |||||
RELEASE = "release" | |||||
DIRECTORY = "directory" | |||||
CONTENT = "content" | |||||
SWHID_NAMESPACE = "swh" | |||||
SWHID_VERSION = 1 | |||||
SWHID_TYPES = ["ori", "snp", "rel", "rev", "dir", "cnt"] | |||||
SWHID_SEP = ":" | |||||
SWHID_CTXT_SEP = ";" | |||||
SWHID_QUALIFIERS = {"origin", "anchor", "visit", "path", "lines"} | |||||
SWHID_RE_RAW = ( | |||||
f"(?P<scheme>{SWHID_NAMESPACE})" | |||||
f"{SWHID_SEP}(?P<version>{SWHID_VERSION})" | |||||
f"{SWHID_SEP}(?P<object_type>{'|'.join(SWHID_TYPES)})" | |||||
f"{SWHID_SEP}(?P<object_id>[0-9a-f]{{40}})" | |||||
f"({SWHID_CTXT_SEP}(?P<qualifiers>\\S+))?" | |||||
) | |||||
SWHID_RE = re.compile(SWHID_RE_RAW) | |||||
@lru_cache() | @lru_cache() | ||||
def identifier_to_bytes(identifier): | def identifier_to_bytes(identifier): | ||||
"""Convert a text identifier to bytes. | """Convert a text identifier to bytes. | ||||
Args: | Args: | ||||
identifier: an identifier, either a 40-char hexadecimal string or a | identifier: an identifier, either a 40-char hexadecimal string or a | ||||
▲ Show 20 Lines • Show All 621 Lines • ▼ Show 20 Lines | |||||
def origin_identifier(origin): | def origin_identifier(origin): | ||||
"""Return the intrinsic identifier for an origin. | """Return the intrinsic identifier for an origin. | ||||
An origin's identifier is the sha1 checksum of the entire origin URL | An origin's identifier is the sha1 checksum of the entire origin URL | ||||
""" | """ | ||||
return hashlib.sha1(origin["url"].encode("utf-8")).hexdigest() | return hashlib.sha1(origin["url"].encode("utf-8")).hexdigest() | ||||
_object_type_map = { | |||||
ORIGIN: {"short_name": "ori", "key_id": "id"}, | |||||
SNAPSHOT: {"short_name": "snp", "key_id": "id"}, | |||||
RELEASE: {"short_name": "rel", "key_id": "id"}, | |||||
REVISION: {"short_name": "rev", "key_id": "id"}, | |||||
DIRECTORY: {"short_name": "dir", "key_id": "id"}, | |||||
CONTENT: {"short_name": "cnt", "key_id": "sha1_git"}, | |||||
} | |||||
_swhid_type_map = { | |||||
"ori": ORIGIN, | |||||
"snp": SNAPSHOT, | |||||
"rel": RELEASE, | |||||
"rev": REVISION, | |||||
"dir": DIRECTORY, | |||||
"cnt": CONTENT, | |||||
} | |||||
@attr.s(frozen=True) | |||||
class SWHID: | |||||
""" | |||||
Named tuple holding the relevant info associated to a SoftWare Heritage | |||||
persistent IDentifier (SWHID) | |||||
Args: | |||||
namespace (str): the namespace of the identifier, defaults to ``swh`` | |||||
scheme_version (int): the scheme version of the identifier, | |||||
defaults to 1 | |||||
object_type (str): the type of object the identifier points to, | |||||
either ``content``, ``directory``, ``release``, ``revision`` or ``snapshot`` | |||||
object_id (str): object's identifier | |||||
metadata (dict): optional dict filled with metadata related to | |||||
pointed object | |||||
Raises: | |||||
swh.model.exceptions.ValidationError: In case of invalid object type or id | |||||
Once created, it contains the following attributes: | |||||
Attributes: | |||||
namespace (str): the namespace of the identifier | |||||
scheme_version (int): the scheme version of the identifier | |||||
object_type (str): the type of object the identifier points to | |||||
object_id (str): hexadecimal representation of the object hash | |||||
metadata (dict): metadata related to the pointed object | |||||
To get the raw SWHID string from an instance of this named tuple, | |||||
use the :func:`str` function:: | |||||
swhid = SWHID( | |||||
object_type='content', | |||||
object_id='8ff44f081d43176474b267de5451f2c2e88089d0' | |||||
) | |||||
swhid_str = str(swhid) | |||||
# 'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0' | |||||
""" | |||||
namespace = attr.ib(type=str, default=SWHID_NAMESPACE) | |||||
scheme_version = attr.ib(type=int, default=SWHID_VERSION) | |||||
object_type = attr.ib(type=str, default="") | |||||
object_id = attr.ib(type=str, converter=hash_to_hex, default="") # type: ignore | |||||
metadata = attr.ib( | |||||
type=ImmutableDict[str, Any], converter=ImmutableDict, default=ImmutableDict() | |||||
) | |||||
@namespace.validator | |||||
def check_namespace(self, attribute, value): | |||||
if value != SWHID_NAMESPACE: | |||||
raise ValidationError( | |||||
"Invalid SWHID: invalid namespace: %(namespace)s", | |||||
params={"namespace": value}, | |||||
) | |||||
@scheme_version.validator | |||||
def check_scheme_version(self, attribute, value): | |||||
if value != SWHID_VERSION: | |||||
raise ValidationError( | |||||
"Invalid SWHID: invalid version: %(version)s", params={"version": value} | |||||
) | |||||
@object_type.validator | |||||
def check_object_type(self, attribute, value): | |||||
if value not in _object_type_map: | |||||
raise ValidationError( | |||||
"Invalid SWHID: invalid type: %(object_type)s)", | |||||
params={"object_type": value}, | |||||
) | |||||
@object_id.validator | |||||
def check_object_id(self, attribute, value): | |||||
try: | |||||
validate_sha1(value) # can raise if invalid hash | |||||
except ValidationError: | |||||
raise ValidationError( | |||||
"Invalid SWHID: invalid checksum: %(object_id)s", | |||||
params={"object_id": value}, | |||||
) from None | |||||
@metadata.validator | |||||
def check_qualifiers(self, attribute, value): | |||||
for k in value: | |||||
if k not in SWHID_QUALIFIERS: | |||||
raise ValidationError( | |||||
"Invalid SWHID: unknown qualifier: %(qualifier)s", | |||||
params={"qualifier": k}, | |||||
) | |||||
def to_dict(self) -> Dict[str, Any]: | |||||
return attr.asdict(self) | |||||
def __str__(self) -> str: | |||||
o = _object_type_map.get(self.object_type) | |||||
assert o | |||||
swhid = SWHID_SEP.join( | |||||
[self.namespace, str(self.scheme_version), o["short_name"], self.object_id] | |||||
) | |||||
if self.metadata: | |||||
for k, v in self.metadata.items(): | |||||
swhid += "%s%s=%s" % (SWHID_CTXT_SEP, k, v) | |||||
return swhid | |||||
def swhid( | |||||
object_type: str, | |||||
object_id: Union[str, Dict[str, Any]], | |||||
scheme_version: int = 1, | |||||
metadata: Union[ImmutableDict[str, Any], Dict[str, Any]] = ImmutableDict(), | |||||
) -> str: | |||||
"""Compute :ref:`persistent-identifiers` | |||||
Args: | |||||
object_type: object's type, either ``content``, ``directory``, | |||||
``release``, ``revision`` or ``snapshot`` | |||||
object_id: object's identifier | |||||
scheme_version: SWHID scheme version, defaults to 1 | |||||
metadata: metadata related to the pointed object | |||||
Raises: | |||||
swh.model.exceptions.ValidationError: In case of invalid object type or id | |||||
Returns: | |||||
the SWHID of the object | |||||
""" | |||||
if isinstance(object_id, dict): | |||||
o = _object_type_map[object_type] | |||||
object_id = object_id[o["key_id"]] | |||||
swhid = SWHID( | |||||
scheme_version=scheme_version, | |||||
object_type=object_type, | |||||
object_id=object_id, | |||||
metadata=metadata, # type: ignore # mypy can't properly unify types | |||||
) | |||||
return str(swhid) | |||||
def parse_swhid(swhid: str) -> SWHID: | |||||
"""Parse a Software Heritage identifier (SWHID) from string (see: | |||||
:ref:`persistent-identifiers`.) | |||||
Args: | |||||
swhid (str): A persistent identifier | |||||
Returns: | |||||
a named tuple holding the parsing result | |||||
Raises: | |||||
swh.model.exceptions.ValidationError: if passed string is not a valid SWHID | |||||
""" | |||||
m = SWHID_RE.fullmatch(swhid) | |||||
if not m: | |||||
raise ValidationError( | |||||
"Invalid SWHID: invalid syntax: %(swhid)s", params={"swhid": swhid} | |||||
) | |||||
parts = m.groupdict() | |||||
_qualifiers = {} | |||||
qualifiers_raw = parts["qualifiers"] | |||||
if qualifiers_raw: | |||||
for qualifier in qualifiers_raw.split(SWHID_CTXT_SEP): | |||||
try: | |||||
k, v = qualifier.split("=") | |||||
except ValueError: | |||||
raise ValidationError( | |||||
"Invalid SWHID: invalid qualifier: %(qualifier)s", | |||||
params={"qualifier": qualifier}, | |||||
) | |||||
_qualifiers[k] = v | |||||
return SWHID( | |||||
parts["scheme"], | |||||
int(parts["version"]), | |||||
_swhid_type_map[parts["object_type"]], | |||||
parts["object_id"], | |||||
_qualifiers, # type: ignore # mypy can't properly unify types | |||||
) |