diff --git a/swh/model/identifiers.py b/swh/model/identifiers.py --- a/swh/model/identifiers.py +++ b/swh/model/identifiers.py @@ -1,22 +1,38 @@ -# Copyright (C) 2015-2020 The Software Heritage developers +# Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from __future__ import annotations + import binascii import datetime +import enum from functools import lru_cache import hashlib import re from typing import Any, Dict, Iterable, List, Optional, Tuple, Union import attr +from attrs_strict import type_validator from .collections import ImmutableDict from .exceptions import ValidationError from .fields.hashes import validate_sha1 -from .hashutil import MultiHash, hash_git_data, hash_to_hex +from .hashutil import MultiHash, hash_git_data, hash_to_bytes, hash_to_hex + + +class ObjectType(enum.Enum): + ORIGIN = "ori" + SNAPSHOT = "snp" + REVISION = "rev" + RELEASE = "rel" + DIRECTORY = "dir" + CONTENT = "cnt" + +# The following are deprecated aliases of the variants defined in ObjectType +# while transitioning from SWHID to QualifiedSWHID ORIGIN = "origin" SNAPSHOT = "snapshot" REVISION = "revision" @@ -678,6 +694,105 @@ return hashlib.sha1(origin["url"].encode("utf-8")).hexdigest() +def raw_extrinsic_metadata_identifier(metadata: Dict[str, Any]) -> str: + """Return the intrinsic identifier for a RawExtrinsicMetadata object. + + A raw_extrinsic_metadata identifier is a salted sha1 (using the git + hashing algorithm with the ``raw_extrinsic_metadata`` object type) of + a manifest following the format: + + ``` + target_type: $ValueOfMetadataTargetType + target: $UrlOrSwhid + discovery_date: $Timestamp + authority: $StrWithoutSpaces $IRI + fetcher: $Str $Version + format: $StrWithoutSpaces + origin: $IRI <- optional + visit: $IntInDecimal <- optional + snapshot: $Swhid <- optional + release: $Swhid <- optional + revision: $Swhid <- optional + path: $Bytes <- optional + directory: $Swhid <- optional + + $MetadataBytes + ``` + + $IRI must be RFC 3987 IRIs (so they may contain newlines, that are escaped as + described below) + + $StrWithoutSpaces and $Version are ASCII strings, and may not contain spaces. + + $Str is an UTF-8 string. + + $Swhid are core SWHIDs, as defined in :ref:`persistent-identifiers`. + + $Timestamp is a decimal representation of the rounded-down integer number of + seconds since the UNIX epoch (1970-01-01 00:00:00 UTC), + with no leading '0' (unless the timestamp value is zero) and no timezone. + It may be negative by prefixing it with a '-', which must not be followed + by a '0'. + + Newlines in $Bytes, $Str, and $Iri are escaped as with other git fields, + ie. by adding a space after them. + + Returns: + str: the intrinsic identifier for `metadata` + + """ + # equivalent to using math.floor(dt.timestamp()) to round down, + # as int(dt.timestamp()) rounds toward zero, + # which would map two seconds on the 0 timestamp. + # + # This should never be an issue in practice as Software Heritage didn't + # start collecting metadata before 2015. + timestamp = ( + metadata["discovery_date"] + .astimezone(datetime.timezone.utc) + .replace(microsecond=0) + .timestamp() + ) + assert timestamp.is_integer() + + headers = [ + (b"target_type", metadata["type"].encode("ascii")), + (b"target", str(metadata["target"]).encode()), + (b"discovery_date", str(int(timestamp)).encode("ascii")), + ( + b"authority", + f"{metadata['authority']['type']} {metadata['authority']['url']}".encode(), + ), + ( + b"fetcher", + f"{metadata['fetcher']['name']} {metadata['fetcher']['version']}".encode(), + ), + (b"format", metadata["format"].encode()), + ] + + for key in ( + "origin", + "visit", + "snapshot", + "release", + "revision", + "path", + "directory", + ): + if metadata.get(key) is not None: + value: bytes + if key == "path": + value = metadata[key] + else: + value = str(metadata[key]).encode() + + headers.append((key.encode("ascii"), value)) + + return identifier_to_str( + hash_manifest("raw_extrinsic_metadata", headers, metadata["metadata"]) + ) + + _object_type_map = { ORIGIN: {"short_name": "ori", "key_id": "id"}, SNAPSHOT: {"short_name": "snp", "key_id": "id"}, @@ -697,11 +812,106 @@ } +@attr.s(frozen=True, kw_only=True) +class QualifiedSWHID: + """ + Dataclass holding the relevant info associated to a SoftWare Heritage + persistent IDentifier (SWHID) + + Raises: + swh.model.exceptions.ValidationError: In case of invalid object type or id + + To get the raw QualifiedSWHID string from an instance of this named tuple, + use the :func:`str` function:: + + swhid = QualifiedSWHID( + object_type='content', + object_id='8ff44f081d43176474b267de5451f2c2e88089d0' + ) + swhid_str = str(swhid) + # 'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0' + """ + + namespace = attr.ib(type=str, default=SWHID_NAMESPACE) + """the namespace of the identifier, defaults to ``swh``""" + scheme_version = attr.ib(type=int, default=SWHID_VERSION) + """the scheme version of the identifier, defaults to 1""" + object_type = attr.ib(type=ObjectType, validator=type_validator()) + """the type of object the identifier points to""" + object_id = attr.ib(type=bytes, validator=type_validator()) + """object's identifier""" + qualifiers = attr.ib( + type=ImmutableDict[str, Any], converter=ImmutableDict, default=ImmutableDict() + ) + """optional dict filled with metadata related to pointed object""" + + @namespace.validator + def check_namespace(self, attribute, value): + if value != SWHID_NAMESPACE: + raise ValidationError( + "Invalid SWHID: invalid namespace: %(namespace)s", + params={"namespace": value}, + ) + + @scheme_version.validator + def check_scheme_version(self, attribute, value): + if value != SWHID_VERSION: + raise ValidationError( + "Invalid SWHID: invalid version: %(version)s", params={"version": value} + ) + + @object_id.validator + def check_object_id(self, attribute, value): + if len(value) != 20: + print(len(value)) + raise ValidationError( + "Invalid SWHID: invalid checksum: %(object_id)s", + params={"object_id": hash_to_hex(value)}, + ) + + @qualifiers.validator + def check_qualifiers(self, attribute, value): + for k in value: + if k not in SWHID_QUALIFIERS: + raise ValidationError( + "Invalid SWHID: unknown qualifier: %(qualifier)s", + params={"qualifier": k}, + ) + + def to_dict(self) -> Dict[str, Any]: + return attr.asdict(self) + + def __str__(self) -> str: + swhid = SWHID_SEP.join( + [ + self.namespace, + str(self.scheme_version), + self.object_type.value, + hash_to_hex(self.object_id), + ] + ) + if self.qualifiers: + for k, v in self.qualifiers.items(): + swhid += "%s%s=%s" % (SWHID_CTXT_SEP, k, v) + return swhid + + @classmethod + def from_string(cls, s: str) -> QualifiedSWHID: + old_swhid = parse_swhid(s) + object_type = ObjectType(_object_type_map[old_swhid.object_type]["short_name"]) + return QualifiedSWHID( + namespace=old_swhid.namespace, + scheme_version=old_swhid.scheme_version, + object_type=object_type, + object_id=hash_to_bytes(old_swhid.object_id), + qualifiers=old_swhid.metadata, + ) + + @attr.s(frozen=True) class SWHID: """ - Named tuple holding the relevant info associated to a SoftWare Heritage - persistent IDentifier (SWHID) + Deprecated alternative to QualifiedSWHID. Args: namespace (str): the namespace of the identifier, defaults to ``swh`` diff --git a/swh/model/tests/test_identifiers.py b/swh/model/tests/test_identifiers.py --- a/swh/model/tests/test_identifiers.py +++ b/swh/model/tests/test_identifiers.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2018 The Software Heritage developers +# Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -20,6 +20,8 @@ REVISION, SNAPSHOT, SWHID, + ObjectType, + QualifiedSWHID, normalize_timestamp, ) @@ -1138,7 +1140,7 @@ ) -def test_swhid_hash(): +def test_SWHID_hash(): object_id = "94a9ed024d3859793618152ea559a168bbcbb5e2" assert hash(SWHID(object_type="directory", object_id=object_id)) == hash( @@ -1168,7 +1170,7 @@ ) -def test_swhid_eq(): +def test_SWHID_eq(): object_id = "94a9ed024d3859793618152ea559a168bbcbb5e2" assert SWHID(object_type="directory", object_id=object_id) == SWHID( @@ -1182,3 +1184,83 @@ assert SWHID( object_type="directory", object_id=object_id, metadata=dummy_qualifiers, ) == SWHID(object_type="directory", object_id=object_id, metadata=dummy_qualifiers,) + + +@pytest.mark.parametrize( + "ns,version,type,id", + [ + ("foo", 1, ObjectType.CONTENT, "abc8bc9d7a6bcf6db04f476d29314f157507d505",), + ("swh", 2, ObjectType.DIRECTORY, "def8bc9d7a6bcf6db04f476d29314f157507d505",), + ], +) +def test_QualifiedSWHID_validation_error(ns, version, type, id): + with pytest.raises(ValidationError): + QualifiedSWHID( + namespace=ns, scheme_version=version, object_type=type, object_id=_x(id), + ) + + +def test_QualifiedSWHID_hash(): + object_id = _x("94a9ed024d3859793618152ea559a168bbcbb5e2") + + assert hash( + QualifiedSWHID(object_type=ObjectType.DIRECTORY, object_id=object_id) + ) == hash(QualifiedSWHID(object_type=ObjectType.DIRECTORY, object_id=object_id)) + + assert hash( + QualifiedSWHID( + object_type=ObjectType.DIRECTORY, + object_id=object_id, + qualifiers=dummy_qualifiers, + ) + ) == hash( + QualifiedSWHID( + object_type=ObjectType.DIRECTORY, + object_id=object_id, + qualifiers=dummy_qualifiers, + ) + ) + + # Different order of the dictionary, so the underlying order of the tuple in + # ImmutableDict is different. + assert hash( + QualifiedSWHID( + object_type=ObjectType.DIRECTORY, + object_id=object_id, + qualifiers={"origin": "https://example.com", "lines": "42"}, + ) + ) == hash( + QualifiedSWHID( + object_type=ObjectType.DIRECTORY, + object_id=object_id, + qualifiers={"lines": "42", "origin": "https://example.com"}, + ) + ) + + +def test_QualifiedSWHID_eq(): + object_id = _x("94a9ed024d3859793618152ea559a168bbcbb5e2") + + assert QualifiedSWHID( + object_type=ObjectType.DIRECTORY, object_id=object_id + ) == QualifiedSWHID(object_type=ObjectType.DIRECTORY, object_id=object_id) + + assert QualifiedSWHID( + object_type=ObjectType.DIRECTORY, + object_id=object_id, + qualifiers=dummy_qualifiers, + ) == QualifiedSWHID( + object_type=ObjectType.DIRECTORY, + object_id=object_id, + qualifiers=dummy_qualifiers, + ) + + assert QualifiedSWHID( + object_type=ObjectType.DIRECTORY, + object_id=object_id, + qualifiers=dummy_qualifiers, + ) == QualifiedSWHID( + object_type=ObjectType.DIRECTORY, + object_id=object_id, + qualifiers=dummy_qualifiers, + )