diff --git a/swh/model/identifiers.py b/swh/model/identifiers.py --- a/swh/model/identifiers.py +++ b/swh/model/identifiers.py @@ -827,6 +827,100 @@ ) +@attr.s(frozen=True, kw_only=True) +class CoreSWHID: + """ + Dataclass holding the relevant info associated to a SoftWare Heritage + persistent IDentifier (SWHID). + + Unlike `QualifiedSWHID`, it is restricted to core SWHIDs, ie. SWHIDs + with no qualifiers. + + Raises: + swh.model.exceptions.ValidationError: In case of invalid object type or id + + To get the raw CoreSWHID string from an instance of this class, + use the :func:`str` function:: + + >>> swhid = CoreSWHID( + ... object_type=ObjectType.CONTENT, + ... object_id=bytes.fromhex('8ff44f081d43176474b267de5451f2c2e88089d0'), + ... ) + >>> str(swhid) + 'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0' + + And vice-versa with :meth:`CoreSWHID.from_string`: + + >>> swhid == CoreSWHID.from_string( + ... "swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0" + ... ) + True + """ + + namespace = attr.ib(type=str, default=SWHID_NAMESPACE) + """the namespace of the identifier, defaults to ``swh``""" + + scheme_version = attr.ib(type=int, default=SWHID_VERSION) + """the scheme version of the identifier, defaults to 1""" + + object_type = attr.ib(type=ObjectType, validator=type_validator()) + """the type of object the identifier points to""" + + object_id = attr.ib(type=bytes, validator=type_validator()) + """object's identifier""" + + @namespace.validator + def check_namespace(self, attribute, value): + if value != SWHID_NAMESPACE: + raise ValidationError( + "Invalid SWHID: invalid namespace: %(namespace)s", + params={"namespace": value}, + ) + + @scheme_version.validator + def check_scheme_version(self, attribute, value): + if value != SWHID_VERSION: + raise ValidationError( + "Invalid SWHID: invalid version: %(version)s", params={"version": value} + ) + + @object_id.validator + def check_object_id(self, attribute, value): + if len(value) != 20: + raise ValidationError( + "Invalid SWHID: invalid checksum: %(object_id)s", + params={"object_id": hash_to_hex(value)}, + ) + + def to_dict(self) -> Dict[str, Any]: + return attr.asdict(self) + + def __str__(self) -> str: + return SWHID_SEP.join( + [ + self.namespace, + str(self.scheme_version), + self.object_type.value, + hash_to_hex(self.object_id), + ] + ) + + @classmethod + def from_string(cls, s: str) -> CoreSWHID: + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + old_swhid = parse_swhid(s) + object_type = ObjectType(_object_type_map[old_swhid.object_type]["short_name"]) + if old_swhid.metadata: + raise ValidationError("CoreSWHID does not support qualifiers.") + return CoreSWHID( + namespace=old_swhid.namespace, + scheme_version=old_swhid.scheme_version, + object_type=object_type, + object_id=hash_to_bytes(old_swhid.object_id), + ) + + @attr.s(frozen=True) class SWHID: """ diff --git a/swh/model/tests/test_identifiers.py b/swh/model/tests/test_identifiers.py --- a/swh/model/tests/test_identifiers.py +++ b/swh/model/tests/test_identifiers.py @@ -20,6 +20,7 @@ REVISION, SNAPSHOT, SWHID, + CoreSWHID, ObjectType, QualifiedSWHID, normalize_timestamp, @@ -1412,3 +1413,148 @@ object_id=object_id, qualifiers=dummy_qualifiers, ) + + +def test_parse_serialize_core_swhid(): + for swhid, _type, _version, _hash in [ + ( + "swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2", + ObjectType.CONTENT, + 1, + _x("94a9ed024d3859793618152ea559a168bbcbb5e2"), + ), + ( + "swh:1:dir:d198bc9d7a6bcf6db04f476d29314f157507d505", + ObjectType.DIRECTORY, + 1, + _x("d198bc9d7a6bcf6db04f476d29314f157507d505"), + ), + ( + "swh:1:rev:309cf2674ee7a0749978cf8265ab91a60aea0f7d", + ObjectType.REVISION, + 1, + _x("309cf2674ee7a0749978cf8265ab91a60aea0f7d"), + ), + ( + "swh:1:rel:22ece559cc7cc2364edc5e5593d63ae8bd229f9f", + ObjectType.RELEASE, + 1, + _x("22ece559cc7cc2364edc5e5593d63ae8bd229f9f"), + ), + ( + "swh:1:snp:c7c108084bc0bf3d81436bf980b46e98bd338453", + ObjectType.SNAPSHOT, + 1, + _x("c7c108084bc0bf3d81436bf980b46e98bd338453"), + ), + ]: + expected_result = CoreSWHID( + namespace="swh", + scheme_version=_version, + object_type=_type, + object_id=_hash, + ) + actual_result = CoreSWHID.from_string(swhid) + assert actual_result == expected_result + assert str(expected_result) == str(actual_result) == swhid + + for swhid, _type, _version, _hash in [ + ( + "swh:1:cnt:9c95815d9e9d91b8dae8e05d8bbc696fe19f796b", + ObjectType.CONTENT, + 1, + _x("9c95815d9e9d91b8dae8e05d8bbc696fe19f796b"), + ), + ( + "swh:1:dir:0b6959356d30f1a4e9b7f6bca59b9a336464c03d", + ObjectType.DIRECTORY, + 1, + _x("0b6959356d30f1a4e9b7f6bca59b9a336464c03d"), + ), + ]: + expected_result = CoreSWHID( + namespace="swh", + scheme_version=_version, + object_type=_type, + object_id=_hash, + ) + actual_result = CoreSWHID.from_string(swhid) + assert actual_result == expected_result + assert expected_result.to_dict() == { + "namespace": "swh", + "scheme_version": _version, + "object_type": _type, + "object_id": _hash, + } + assert str(expected_result) == str(actual_result) == swhid + + +@pytest.mark.parametrize( + "invalid_swhid", + [ + "swh:1:cnt", + "swh:1:", + "swh:", + "swh:1:cnt:", + "foo:1:cnt:abc8bc9d7a6bcf6db04f476d29314f157507d505", + "swh:2:dir:def8bc9d7a6bcf6db04f476d29314f157507d505", + "swh:1:foo:fed8bc9d7a6bcf6db04f476d29314f157507d505", + "swh:1:dir:0b6959356d30f1a4e9b7f6bca59b9a336464c03d;visit=swh:1:snp:gh6959356d30f1a4e9b7f6bca59b9a336464c03d", # noqa + "swh:1:snp:gh6959356d30f1a4e9b7f6bca59b9a336464c03d", + "swh:1:snp:foo", + "swh:1: dir: 0b6959356d30f1a4e9b7f6bca59b9a336464c03d", + ], +) +def test_parse_core_swhid_parsing_error(invalid_swhid): + with pytest.raises(ValidationError): + CoreSWHID.from_string(invalid_swhid) + + +@pytest.mark.filterwarnings("ignore:.*SWHID.*:DeprecationWarning") +@pytest.mark.parametrize( + "ns,version,type,id", + [ + ("foo", 1, ObjectType.CONTENT, "abc8bc9d7a6bcf6db04f476d29314f157507d505"), + ("swh", 2, ObjectType.CONTENT, "def8bc9d7a6bcf6db04f476d29314f157507d505"), + ("swh", 1, ObjectType.DIRECTORY, "aaaa"), + ], +) +def test_CoreSWHID_validation_error(ns, version, type, id): + with pytest.raises(ValidationError): + CoreSWHID( + namespace=ns, scheme_version=version, object_type=type, object_id=_x(id), + ) + + +def test_CoreSWHID_hash(): + object_id = _x("94a9ed024d3859793618152ea559a168bbcbb5e2") + + assert hash( + CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=object_id) + ) == hash(CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=object_id)) + + assert hash( + CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=object_id,) + ) == hash(CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=object_id,)) + + # Different order of the dictionary, so the underlying order of the tuple in + # ImmutableDict is different. + assert hash( + CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=object_id,) + ) == hash(CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=object_id,)) + + +def test_CoreSWHID_eq(): + object_id = _x("94a9ed024d3859793618152ea559a168bbcbb5e2") + + assert CoreSWHID( + object_type=ObjectType.DIRECTORY, object_id=object_id + ) == CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=object_id) + + assert CoreSWHID( + object_type=ObjectType.DIRECTORY, object_id=object_id, + ) == CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=object_id,) + + assert CoreSWHID( + object_type=ObjectType.DIRECTORY, object_id=object_id, + ) == CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=object_id,)