Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7124856
D6323.id22977.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
31 KB
Subscribers
None
D6323.id22977.diff
View Options
diff --git a/swh/model/identifiers.py b/swh/model/identifiers.py
--- a/swh/model/identifiers.py
+++ b/swh/model/identifiers.py
@@ -7,56 +7,14 @@
import binascii
import datetime
-import enum
from functools import lru_cache
import hashlib
-import re
-from typing import (
- Any,
- Dict,
- Generic,
- Iterable,
- List,
- Optional,
- Tuple,
- Type,
- TypeVar,
- Union,
-)
-import urllib.parse
+from typing import Any, Dict, Iterable, List, Optional, Tuple
-import attr
-from attrs_strict import type_validator
-
-from .exceptions import ValidationError
-from .hashutil import MultiHash, git_object_header, hash_to_bytes, hash_to_hex
-
-
-class ObjectType(enum.Enum):
- """Possible object types of a QualifiedSWHID or CoreSWHID.
-
- The values of each variant is what is used in the SWHID's string representation."""
-
- SNAPSHOT = "snp"
- REVISION = "rev"
- RELEASE = "rel"
- DIRECTORY = "dir"
- CONTENT = "cnt"
-
-
-class ExtendedObjectType(enum.Enum):
- """Possible object types of an ExtendedSWHID.
-
- The variants are a superset of :class:`ObjectType`'s"""
-
- SNAPSHOT = "snp"
- REVISION = "rev"
- RELEASE = "rel"
- DIRECTORY = "dir"
- CONTENT = "cnt"
- ORIGIN = "ori"
- RAW_EXTRINSIC_METADATA = "emd"
+from .hashutil import MultiHash, git_object_header
+# Reexport for backward compatibility
+from .swhids import * # noqa
# The following are deprecated aliases of the variants defined in ObjectType
# while transitioning from SWHID to QualifiedSWHID
@@ -69,24 +27,6 @@
RAW_EXTRINSIC_METADATA = "raw_extrinsic_metadata"
-SWHID_NAMESPACE = "swh"
-SWHID_VERSION = 1
-SWHID_TYPES = ["snp", "rel", "rev", "dir", "cnt"]
-EXTENDED_SWHID_TYPES = SWHID_TYPES + ["ori", "emd"]
-SWHID_SEP = ":"
-SWHID_CTXT_SEP = ";"
-SWHID_QUALIFIERS = {"origin", "anchor", "visit", "path", "lines"}
-
-SWHID_RE_RAW = (
- f"(?P<namespace>{SWHID_NAMESPACE})"
- f"{SWHID_SEP}(?P<scheme_version>{SWHID_VERSION})"
- f"{SWHID_SEP}(?P<object_type>{'|'.join(EXTENDED_SWHID_TYPES)})"
- f"{SWHID_SEP}(?P<object_id>[0-9a-f]{{40}})"
- f"({SWHID_CTXT_SEP}(?P<qualifiers>\\S+))?"
-)
-SWHID_RE = re.compile(SWHID_RE_RAW)
-
-
@lru_cache()
def identifier_to_bytes(identifier):
"""Convert a text identifier to bytes.
@@ -887,390 +827,3 @@
git_object = format_git_object_from_headers("extid", headers)
return hashlib.new("sha1", git_object).hexdigest()
-
-
-# type of the "object_type" attribute of the SWHID class; either
-# ObjectType or ExtendedObjectType
-_TObjectType = TypeVar("_TObjectType", ObjectType, ExtendedObjectType)
-
-# the SWHID class itself (this is used so that X.from_string() can return X
-# for all X subclass of _BaseSWHID)
-_TSWHID = TypeVar("_TSWHID", bound="_BaseSWHID")
-
-
-@attr.s(frozen=True, kw_only=True)
-class _BaseSWHID(Generic[_TObjectType]):
- """Common base class for CoreSWHID, QualifiedSWHID, and ExtendedSWHID.
-
- This is an "abstract" class and should not be instantiated directly;
- it only exists to deduplicate code between these three SWHID classes."""
-
- namespace = attr.ib(type=str, default=SWHID_NAMESPACE)
- """the namespace of the identifier, defaults to ``swh``"""
-
- scheme_version = attr.ib(type=int, default=SWHID_VERSION)
- """the scheme version of the identifier, defaults to 1"""
-
- # overridden by subclasses
- object_type: _TObjectType
- """the type of object the identifier points to"""
-
- object_id = attr.ib(type=bytes, validator=type_validator())
- """object's identifier"""
-
- @namespace.validator
- def check_namespace(self, attribute, value):
- if value != SWHID_NAMESPACE:
- raise ValidationError(
- "Invalid SWHID: invalid namespace: %(namespace)s",
- params={"namespace": value},
- )
-
- @scheme_version.validator
- def check_scheme_version(self, attribute, value):
- if value != SWHID_VERSION:
- raise ValidationError(
- "Invalid SWHID: invalid version: %(version)s", params={"version": value}
- )
-
- @object_id.validator
- def check_object_id(self, attribute, value):
- if len(value) != 20:
- raise ValidationError(
- "Invalid SWHID: invalid checksum: %(object_id)s",
- params={"object_id": hash_to_hex(value)},
- )
-
- def __str__(self) -> str:
- return SWHID_SEP.join(
- [
- self.namespace,
- str(self.scheme_version),
- self.object_type.value,
- hash_to_hex(self.object_id),
- ]
- )
-
- @classmethod
- def from_string(cls: Type[_TSWHID], s: str) -> _TSWHID:
- parts = _parse_swhid(s)
- if parts.pop("qualifiers"):
- raise ValidationError(f"{cls.__name__} does not support qualifiers.")
- try:
- return cls(**parts)
- except ValueError as e:
- raise ValidationError(
- "ValueError: %(args)s", params={"args": e.args}
- ) from None
-
-
-@attr.s(frozen=True, kw_only=True)
-class CoreSWHID(_BaseSWHID[ObjectType]):
- """
- Dataclass holding the relevant info associated to a SoftWare Heritage
- persistent IDentifier (SWHID).
-
- Unlike `QualifiedSWHID`, it is restricted to core SWHIDs, ie. SWHIDs
- with no qualifiers.
-
- Raises:
- swh.model.exceptions.ValidationError: In case of invalid object type or id
-
- To get the raw SWHID string from an instance of this class,
- use the :func:`str` function:
-
- >>> swhid = CoreSWHID(
- ... object_type=ObjectType.CONTENT,
- ... object_id=bytes.fromhex('8ff44f081d43176474b267de5451f2c2e88089d0'),
- ... )
- >>> str(swhid)
- 'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0'
-
- And vice-versa with :meth:`CoreSWHID.from_string`:
-
- >>> swhid == CoreSWHID.from_string(
- ... "swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0"
- ... )
- True
- """
-
- object_type = attr.ib(
- type=ObjectType, validator=type_validator(), converter=ObjectType
- )
- """the type of object the identifier points to"""
-
- def to_extended(self) -> ExtendedSWHID:
- """Converts this CoreSWHID into an ExtendedSWHID.
-
- As ExtendedSWHID is a superset of CoreSWHID, this is lossless."""
- return ExtendedSWHID(
- namespace=self.namespace,
- scheme_version=self.scheme_version,
- object_type=ExtendedObjectType(self.object_type.value),
- object_id=self.object_id,
- )
-
-
-def _parse_core_swhid(swhid: Union[str, CoreSWHID, None]) -> Optional[CoreSWHID]:
- if swhid is None or isinstance(swhid, CoreSWHID):
- return swhid
- else:
- return CoreSWHID.from_string(swhid)
-
-
-def _parse_lines_qualifier(
- lines: Union[str, Tuple[int, Optional[int]], None]
-) -> Optional[Tuple[int, Optional[int]]]:
- try:
- if lines is None or isinstance(lines, tuple):
- return lines
- elif "-" in lines:
- (from_, to) = lines.split("-", 2)
- return (int(from_), int(to))
- else:
- return (int(lines), None)
- except ValueError:
- raise ValidationError(
- "Invalid format for the lines qualifier: %(lines)s", params={"lines": lines}
- )
-
-
-def _parse_path_qualifier(path: Union[str, bytes, None]) -> Optional[bytes]:
- if path is None or isinstance(path, bytes):
- return path
- else:
- return urllib.parse.unquote_to_bytes(path)
-
-
-@attr.s(frozen=True, kw_only=True)
-class QualifiedSWHID(_BaseSWHID[ObjectType]):
- """
- Dataclass holding the relevant info associated to a SoftWare Heritage
- persistent IDentifier (SWHID)
-
- Raises:
- swh.model.exceptions.ValidationError: In case of invalid object type or id
-
- To get the raw SWHID string from an instance of this class,
- use the :func:`str` function:
-
- >>> swhid = QualifiedSWHID(
- ... object_type=ObjectType.CONTENT,
- ... object_id=bytes.fromhex('8ff44f081d43176474b267de5451f2c2e88089d0'),
- ... lines=(5, 10),
- ... )
- >>> str(swhid)
- 'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0;lines=5-10'
-
- And vice-versa with :meth:`QualifiedSWHID.from_string`:
-
- >>> swhid == QualifiedSWHID.from_string(
- ... "swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0;lines=5-10"
- ... )
- True
- """
-
- object_type = attr.ib(
- type=ObjectType, validator=type_validator(), converter=ObjectType
- )
- """the type of object the identifier points to"""
-
- # qualifiers:
-
- origin = attr.ib(type=Optional[str], default=None, validator=type_validator())
- """the software origin where an object has been found or observed in the wild,
- as an URI"""
-
- visit = attr.ib(type=Optional[CoreSWHID], default=None, converter=_parse_core_swhid)
- """the core identifier of a snapshot corresponding to a specific visit
- of a repository containing the designated object"""
-
- anchor = attr.ib(
- type=Optional[CoreSWHID],
- default=None,
- validator=type_validator(),
- converter=_parse_core_swhid,
- )
- """a designated node in the Merkle DAG relative to which a path to the object
- is specified, as the core identifier of a directory, a revision, a release,
- or a snapshot"""
-
- path = attr.ib(
- type=Optional[bytes],
- default=None,
- validator=type_validator(),
- converter=_parse_path_qualifier,
- )
- """the absolute file path, from the root directory associated to the anchor node,
- to the object; when the anchor denotes a directory or a revision, and almost always
- when it’s a release, the root directory is uniquely determined;
- when the anchor denotes a snapshot, the root directory is the one pointed to by HEAD
- (possibly indirectly), and undefined if such a reference is missing"""
-
- lines = attr.ib(
- type=Optional[Tuple[int, Optional[int]]],
- default=None,
- validator=type_validator(),
- converter=_parse_lines_qualifier,
- )
- """lines: line number(s) of interest, usually within a content object"""
-
- @visit.validator
- def check_visit(self, attribute, value):
- if value and value.object_type != ObjectType.SNAPSHOT:
- raise ValidationError(
- "The 'visit' qualifier must be a 'snp' SWHID, not '%(type)s'",
- params={"type": value.object_type.value},
- )
-
- @anchor.validator
- def check_anchor(self, attribute, value):
- if value and value.object_type not in (
- ObjectType.DIRECTORY,
- ObjectType.REVISION,
- ObjectType.RELEASE,
- ObjectType.SNAPSHOT,
- ):
- raise ValidationError(
- "The 'visit' qualifier must be a 'dir', 'rev', 'rel', or 'snp' SWHID, "
- "not '%s(type)s'",
- params={"type": value.object_type.value},
- )
-
- def qualifiers(self) -> Dict[str, str]:
- origin = self.origin
- if origin:
- unescaped_origin = origin
- origin = origin.replace(";", "%3B")
- assert urllib.parse.unquote_to_bytes(
- origin
- ) == urllib.parse.unquote_to_bytes(
- unescaped_origin
- ), "Escaping ';' in the origin qualifier corrupted the origin URL."
-
- d: Dict[str, Optional[str]] = {
- "origin": origin,
- "visit": str(self.visit) if self.visit else None,
- "anchor": str(self.anchor) if self.anchor else None,
- "path": (
- urllib.parse.quote_from_bytes(self.path)
- if self.path is not None
- else None
- ),
- "lines": (
- "-".join(str(line) for line in self.lines if line is not None)
- if self.lines
- else None
- ),
- }
- return {k: v for (k, v) in d.items() if v is not None}
-
- def __str__(self) -> str:
- swhid = SWHID_SEP.join(
- [
- self.namespace,
- str(self.scheme_version),
- self.object_type.value,
- hash_to_hex(self.object_id),
- ]
- )
- qualifiers = self.qualifiers()
- if qualifiers:
- for k, v in qualifiers.items():
- swhid += "%s%s=%s" % (SWHID_CTXT_SEP, k, v)
- return swhid
-
- @classmethod
- def from_string(cls, s: str) -> QualifiedSWHID:
- parts = _parse_swhid(s)
- qualifiers = parts.pop("qualifiers")
- invalid_qualifiers = set(qualifiers) - SWHID_QUALIFIERS
- if invalid_qualifiers:
- raise ValidationError(
- "Invalid qualifier(s): %(qualifiers)s",
- params={"qualifiers": ", ".join(invalid_qualifiers)},
- )
- try:
- return QualifiedSWHID(**parts, **qualifiers)
- except ValueError as e:
- raise ValidationError(
- "ValueError: %(args)s", params={"args": e.args}
- ) from None
-
-
-@attr.s(frozen=True, kw_only=True)
-class ExtendedSWHID(_BaseSWHID[ExtendedObjectType]):
- """
- Dataclass holding the relevant info associated to a SoftWare Heritage
- persistent IDentifier (SWHID).
-
- It extends `CoreSWHID`, by allowing non-standard object types; and should
- only be used internally to Software Heritage.
-
- Raises:
- swh.model.exceptions.ValidationError: In case of invalid object type or id
-
- To get the raw SWHID string from an instance of this class,
- use the :func:`str` function:
-
- >>> swhid = ExtendedSWHID(
- ... object_type=ExtendedObjectType.CONTENT,
- ... object_id=bytes.fromhex('8ff44f081d43176474b267de5451f2c2e88089d0'),
- ... )
- >>> str(swhid)
- 'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0'
-
- And vice-versa with :meth:`CoreSWHID.from_string`:
-
- >>> swhid == ExtendedSWHID.from_string(
- ... "swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0"
- ... )
- True
- """
-
- object_type = attr.ib(
- type=ExtendedObjectType,
- validator=type_validator(),
- converter=ExtendedObjectType,
- )
- """the type of object the identifier points to"""
-
-
-def _parse_swhid(swhid: str) -> Dict[str, Any]:
- """Parse a Software Heritage identifier (SWHID) from string (see:
- :ref:`persistent-identifiers`.)
-
- This is for internal use; use :meth:`CoreSWHID.from_string`,
- :meth:`QualifiedSWHID.from_string`, or :meth:`ExtendedSWHID.from_string` instead,
- as they perform validation and build a dataclass.
-
- Args:
- swhid (str): A persistent identifier
-
- Raises:
- swh.model.exceptions.ValidationError: if passed string is not a valid SWHID
-
- """
- m = SWHID_RE.fullmatch(swhid)
- if not m:
- raise ValidationError(
- "Invalid SWHID: invalid syntax: %(swhid)s", params={"swhid": swhid}
- )
- parts: Dict[str, Any] = m.groupdict()
-
- qualifiers_raw = parts["qualifiers"]
- parts["qualifiers"] = {}
- if qualifiers_raw:
- for qualifier in qualifiers_raw.split(SWHID_CTXT_SEP):
- try:
- k, v = qualifier.split("=", maxsplit=1)
- parts["qualifiers"][k] = v
- except ValueError:
- raise ValidationError(
- "Invalid SWHID: invalid qualifier: %(qualifier)s",
- params={"qualifier": qualifier},
- )
-
- parts["scheme_version"] = int(parts["scheme_version"])
- parts["object_id"] = hash_to_bytes(parts["object_id"])
- return parts
diff --git a/swh/model/model.py b/swh/model/model.py
--- a/swh/model/model.py
+++ b/swh/model/model.py
@@ -27,10 +27,10 @@
revision_identifier,
snapshot_identifier,
)
-from .identifiers import CoreSWHID
-from .identifiers import ExtendedObjectType as SwhidExtendedObjectType
-from .identifiers import ExtendedSWHID
-from .identifiers import ObjectType as SwhidObjectType
+from .swhids import CoreSWHID
+from .swhids import ExtendedObjectType as SwhidExtendedObjectType
+from .swhids import ExtendedSWHID
+from .swhids import ObjectType as SwhidObjectType
class MissingData(Exception):
diff --git a/swh/model/swhids.py b/swh/model/swhids.py
new file mode 100644
--- /dev/null
+++ b/swh/model/swhids.py
@@ -0,0 +1,448 @@
+# Copyright (C) 2015-2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from __future__ import annotations
+
+import enum
+import re
+from typing import Any, Dict, Generic, Optional, Tuple, Type, TypeVar, Union
+import urllib.parse
+
+import attr
+from attrs_strict import type_validator
+
+from .exceptions import ValidationError
+from .hashutil import hash_to_bytes, hash_to_hex
+
+
+class ObjectType(enum.Enum):
+ """Possible object types of a QualifiedSWHID or CoreSWHID.
+
+ The values of each variant is what is used in the SWHID's string representation."""
+
+ SNAPSHOT = "snp"
+ REVISION = "rev"
+ RELEASE = "rel"
+ DIRECTORY = "dir"
+ CONTENT = "cnt"
+
+
+class ExtendedObjectType(enum.Enum):
+ """Possible object types of an ExtendedSWHID.
+
+ The variants are a superset of :class:`ObjectType`'s"""
+
+ SNAPSHOT = "snp"
+ REVISION = "rev"
+ RELEASE = "rel"
+ DIRECTORY = "dir"
+ CONTENT = "cnt"
+ ORIGIN = "ori"
+ RAW_EXTRINSIC_METADATA = "emd"
+
+
+SWHID_NAMESPACE = "swh"
+SWHID_VERSION = 1
+SWHID_TYPES = ["snp", "rel", "rev", "dir", "cnt"]
+EXTENDED_SWHID_TYPES = SWHID_TYPES + ["ori", "emd"]
+SWHID_SEP = ":"
+SWHID_CTXT_SEP = ";"
+SWHID_QUALIFIERS = {"origin", "anchor", "visit", "path", "lines"}
+
+SWHID_RE_RAW = (
+ f"(?P<namespace>{SWHID_NAMESPACE})"
+ f"{SWHID_SEP}(?P<scheme_version>{SWHID_VERSION})"
+ f"{SWHID_SEP}(?P<object_type>{'|'.join(EXTENDED_SWHID_TYPES)})"
+ f"{SWHID_SEP}(?P<object_id>[0-9a-f]{{40}})"
+ f"({SWHID_CTXT_SEP}(?P<qualifiers>\\S+))?"
+)
+SWHID_RE = re.compile(SWHID_RE_RAW)
+
+
+# type of the "object_type" attribute of the SWHID class; either
+# ObjectType or ExtendedObjectType
+_TObjectType = TypeVar("_TObjectType", ObjectType, ExtendedObjectType)
+
+# the SWHID class itself (this is used so that X.from_string() can return X
+# for all X subclass of _BaseSWHID)
+_TSWHID = TypeVar("_TSWHID", bound="_BaseSWHID")
+
+
+@attr.s(frozen=True, kw_only=True)
+class _BaseSWHID(Generic[_TObjectType]):
+ """Common base class for CoreSWHID, QualifiedSWHID, and ExtendedSWHID.
+
+ This is an "abstract" class and should not be instantiated directly;
+ it only exists to deduplicate code between these three SWHID classes."""
+
+ namespace = attr.ib(type=str, default=SWHID_NAMESPACE)
+ """the namespace of the identifier, defaults to ``swh``"""
+
+ scheme_version = attr.ib(type=int, default=SWHID_VERSION)
+ """the scheme version of the identifier, defaults to 1"""
+
+ # overridden by subclasses
+ object_type: _TObjectType
+ """the type of object the identifier points to"""
+
+ object_id = attr.ib(type=bytes, validator=type_validator())
+ """object's identifier"""
+
+ @namespace.validator
+ def check_namespace(self, attribute, value):
+ if value != SWHID_NAMESPACE:
+ raise ValidationError(
+ "Invalid SWHID: invalid namespace: %(namespace)s",
+ params={"namespace": value},
+ )
+
+ @scheme_version.validator
+ def check_scheme_version(self, attribute, value):
+ if value != SWHID_VERSION:
+ raise ValidationError(
+ "Invalid SWHID: invalid version: %(version)s", params={"version": value}
+ )
+
+ @object_id.validator
+ def check_object_id(self, attribute, value):
+ if len(value) != 20:
+ raise ValidationError(
+ "Invalid SWHID: invalid checksum: %(object_id)s",
+ params={"object_id": hash_to_hex(value)},
+ )
+
+ def __str__(self) -> str:
+ return SWHID_SEP.join(
+ [
+ self.namespace,
+ str(self.scheme_version),
+ self.object_type.value,
+ hash_to_hex(self.object_id),
+ ]
+ )
+
+ @classmethod
+ def from_string(cls: Type[_TSWHID], s: str) -> _TSWHID:
+ parts = _parse_swhid(s)
+ if parts.pop("qualifiers"):
+ raise ValidationError(f"{cls.__name__} does not support qualifiers.")
+ try:
+ return cls(**parts)
+ except ValueError as e:
+ raise ValidationError(
+ "ValueError: %(args)s", params={"args": e.args}
+ ) from None
+
+
+@attr.s(frozen=True, kw_only=True)
+class CoreSWHID(_BaseSWHID[ObjectType]):
+ """
+ Dataclass holding the relevant info associated to a SoftWare Heritage
+ persistent IDentifier (SWHID).
+
+ Unlike `QualifiedSWHID`, it is restricted to core SWHIDs, ie. SWHIDs
+ with no qualifiers.
+
+ Raises:
+ swh.model.exceptions.ValidationError: In case of invalid object type or id
+
+ To get the raw SWHID string from an instance of this class,
+ use the :func:`str` function:
+
+ >>> swhid = CoreSWHID(
+ ... object_type=ObjectType.CONTENT,
+ ... object_id=bytes.fromhex('8ff44f081d43176474b267de5451f2c2e88089d0'),
+ ... )
+ >>> str(swhid)
+ 'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0'
+
+ And vice-versa with :meth:`CoreSWHID.from_string`:
+
+ >>> swhid == CoreSWHID.from_string(
+ ... "swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0"
+ ... )
+ True
+ """
+
+ object_type = attr.ib(
+ type=ObjectType, validator=type_validator(), converter=ObjectType
+ )
+ """the type of object the identifier points to"""
+
+ def to_extended(self) -> ExtendedSWHID:
+ """Converts this CoreSWHID into an ExtendedSWHID.
+
+ As ExtendedSWHID is a superset of CoreSWHID, this is lossless."""
+ return ExtendedSWHID(
+ namespace=self.namespace,
+ scheme_version=self.scheme_version,
+ object_type=ExtendedObjectType(self.object_type.value),
+ object_id=self.object_id,
+ )
+
+
+def _parse_core_swhid(swhid: Union[str, CoreSWHID, None]) -> Optional[CoreSWHID]:
+ if swhid is None or isinstance(swhid, CoreSWHID):
+ return swhid
+ else:
+ return CoreSWHID.from_string(swhid)
+
+
+def _parse_lines_qualifier(
+ lines: Union[str, Tuple[int, Optional[int]], None]
+) -> Optional[Tuple[int, Optional[int]]]:
+ try:
+ if lines is None or isinstance(lines, tuple):
+ return lines
+ elif "-" in lines:
+ (from_, to) = lines.split("-", 2)
+ return (int(from_), int(to))
+ else:
+ return (int(lines), None)
+ except ValueError:
+ raise ValidationError(
+ "Invalid format for the lines qualifier: %(lines)s", params={"lines": lines}
+ )
+
+
+def _parse_path_qualifier(path: Union[str, bytes, None]) -> Optional[bytes]:
+ if path is None or isinstance(path, bytes):
+ return path
+ else:
+ return urllib.parse.unquote_to_bytes(path)
+
+
+@attr.s(frozen=True, kw_only=True)
+class QualifiedSWHID(_BaseSWHID[ObjectType]):
+ """
+ Dataclass holding the relevant info associated to a SoftWare Heritage
+ persistent IDentifier (SWHID)
+
+ Raises:
+ swh.model.exceptions.ValidationError: In case of invalid object type or id
+
+ To get the raw SWHID string from an instance of this class,
+ use the :func:`str` function:
+
+ >>> swhid = QualifiedSWHID(
+ ... object_type=ObjectType.CONTENT,
+ ... object_id=bytes.fromhex('8ff44f081d43176474b267de5451f2c2e88089d0'),
+ ... lines=(5, 10),
+ ... )
+ >>> str(swhid)
+ 'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0;lines=5-10'
+
+ And vice-versa with :meth:`QualifiedSWHID.from_string`:
+
+ >>> swhid == QualifiedSWHID.from_string(
+ ... "swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0;lines=5-10"
+ ... )
+ True
+ """
+
+ object_type = attr.ib(
+ type=ObjectType, validator=type_validator(), converter=ObjectType
+ )
+ """the type of object the identifier points to"""
+
+ # qualifiers:
+
+ origin = attr.ib(type=Optional[str], default=None, validator=type_validator())
+ """the software origin where an object has been found or observed in the wild,
+ as an URI"""
+
+ visit = attr.ib(type=Optional[CoreSWHID], default=None, converter=_parse_core_swhid)
+ """the core identifier of a snapshot corresponding to a specific visit
+ of a repository containing the designated object"""
+
+ anchor = attr.ib(
+ type=Optional[CoreSWHID],
+ default=None,
+ validator=type_validator(),
+ converter=_parse_core_swhid,
+ )
+ """a designated node in the Merkle DAG relative to which a path to the object
+ is specified, as the core identifier of a directory, a revision, a release,
+ or a snapshot"""
+
+ path = attr.ib(
+ type=Optional[bytes],
+ default=None,
+ validator=type_validator(),
+ converter=_parse_path_qualifier,
+ )
+ """the absolute file path, from the root directory associated to the anchor node,
+ to the object; when the anchor denotes a directory or a revision, and almost always
+ when it’s a release, the root directory is uniquely determined;
+ when the anchor denotes a snapshot, the root directory is the one pointed to by HEAD
+ (possibly indirectly), and undefined if such a reference is missing"""
+
+ lines = attr.ib(
+ type=Optional[Tuple[int, Optional[int]]],
+ default=None,
+ validator=type_validator(),
+ converter=_parse_lines_qualifier,
+ )
+ """lines: line number(s) of interest, usually within a content object"""
+
+ @visit.validator
+ def check_visit(self, attribute, value):
+ if value and value.object_type != ObjectType.SNAPSHOT:
+ raise ValidationError(
+ "The 'visit' qualifier must be a 'snp' SWHID, not '%(type)s'",
+ params={"type": value.object_type.value},
+ )
+
+ @anchor.validator
+ def check_anchor(self, attribute, value):
+ if value and value.object_type not in (
+ ObjectType.DIRECTORY,
+ ObjectType.REVISION,
+ ObjectType.RELEASE,
+ ObjectType.SNAPSHOT,
+ ):
+ raise ValidationError(
+ "The 'visit' qualifier must be a 'dir', 'rev', 'rel', or 'snp' SWHID, "
+ "not '%s(type)s'",
+ params={"type": value.object_type.value},
+ )
+
+ def qualifiers(self) -> Dict[str, str]:
+ origin = self.origin
+ if origin:
+ unescaped_origin = origin
+ origin = origin.replace(";", "%3B")
+ assert urllib.parse.unquote_to_bytes(
+ origin
+ ) == urllib.parse.unquote_to_bytes(
+ unescaped_origin
+ ), "Escaping ';' in the origin qualifier corrupted the origin URL."
+
+ d: Dict[str, Optional[str]] = {
+ "origin": origin,
+ "visit": str(self.visit) if self.visit else None,
+ "anchor": str(self.anchor) if self.anchor else None,
+ "path": (
+ urllib.parse.quote_from_bytes(self.path)
+ if self.path is not None
+ else None
+ ),
+ "lines": (
+ "-".join(str(line) for line in self.lines if line is not None)
+ if self.lines
+ else None
+ ),
+ }
+ return {k: v for (k, v) in d.items() if v is not None}
+
+ def __str__(self) -> str:
+ swhid = SWHID_SEP.join(
+ [
+ self.namespace,
+ str(self.scheme_version),
+ self.object_type.value,
+ hash_to_hex(self.object_id),
+ ]
+ )
+ qualifiers = self.qualifiers()
+ if qualifiers:
+ for k, v in qualifiers.items():
+ swhid += "%s%s=%s" % (SWHID_CTXT_SEP, k, v)
+ return swhid
+
+ @classmethod
+ def from_string(cls, s: str) -> QualifiedSWHID:
+ parts = _parse_swhid(s)
+ qualifiers = parts.pop("qualifiers")
+ invalid_qualifiers = set(qualifiers) - SWHID_QUALIFIERS
+ if invalid_qualifiers:
+ raise ValidationError(
+ "Invalid qualifier(s): %(qualifiers)s",
+ params={"qualifiers": ", ".join(invalid_qualifiers)},
+ )
+ try:
+ return QualifiedSWHID(**parts, **qualifiers)
+ except ValueError as e:
+ raise ValidationError(
+ "ValueError: %(args)s", params={"args": e.args}
+ ) from None
+
+
+@attr.s(frozen=True, kw_only=True)
+class ExtendedSWHID(_BaseSWHID[ExtendedObjectType]):
+ """
+ Dataclass holding the relevant info associated to a SoftWare Heritage
+ persistent IDentifier (SWHID).
+
+ It extends `CoreSWHID`, by allowing non-standard object types; and should
+ only be used internally to Software Heritage.
+
+ Raises:
+ swh.model.exceptions.ValidationError: In case of invalid object type or id
+
+ To get the raw SWHID string from an instance of this class,
+ use the :func:`str` function:
+
+ >>> swhid = ExtendedSWHID(
+ ... object_type=ExtendedObjectType.CONTENT,
+ ... object_id=bytes.fromhex('8ff44f081d43176474b267de5451f2c2e88089d0'),
+ ... )
+ >>> str(swhid)
+ 'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0'
+
+ And vice-versa with :meth:`CoreSWHID.from_string`:
+
+ >>> swhid == ExtendedSWHID.from_string(
+ ... "swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0"
+ ... )
+ True
+ """
+
+ object_type = attr.ib(
+ type=ExtendedObjectType,
+ validator=type_validator(),
+ converter=ExtendedObjectType,
+ )
+ """the type of object the identifier points to"""
+
+
+def _parse_swhid(swhid: str) -> Dict[str, Any]:
+ """Parse a Software Heritage identifier (SWHID) from string (see:
+ :ref:`persistent-identifiers`.)
+
+ This is for internal use; use :meth:`CoreSWHID.from_string`,
+ :meth:`QualifiedSWHID.from_string`, or :meth:`ExtendedSWHID.from_string` instead,
+ as they perform validation and build a dataclass.
+
+ Args:
+ swhid (str): A persistent identifier
+
+ Raises:
+ swh.model.exceptions.ValidationError: if passed string is not a valid SWHID
+
+ """
+ m = SWHID_RE.fullmatch(swhid)
+ if not m:
+ raise ValidationError(
+ "Invalid SWHID: invalid syntax: %(swhid)s", params={"swhid": swhid}
+ )
+ parts: Dict[str, Any] = m.groupdict()
+
+ qualifiers_raw = parts["qualifiers"]
+ parts["qualifiers"] = {}
+ if qualifiers_raw:
+ for qualifier in qualifiers_raw.split(SWHID_CTXT_SEP):
+ try:
+ k, v = qualifier.split("=", maxsplit=1)
+ parts["qualifiers"][k] = v
+ except ValueError:
+ raise ValidationError(
+ "Invalid SWHID: invalid qualifier: %(qualifier)s",
+ params={"qualifier": qualifier},
+ )
+
+ parts["scheme_version"] = int(parts["scheme_version"])
+ parts["object_id"] = hash_to_bytes(parts["object_id"])
+ return parts
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 21 2024, 9:54 PM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3226811
Attached To
D6323: Move SWHID classes and functions from identifiers.py to swhids.py
Event Timeline
Log In to Comment