diff --git a/swh/model/identifiers.py b/swh/model/identifiers.py --- a/swh/model/identifiers.py +++ b/swh/model/identifiers.py @@ -1,22 +1,43 @@ -# Copyright (C) 2015-2020 The Software Heritage developers +# Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from __future__ import annotations + import binascii import datetime +import enum from functools import lru_cache import hashlib import re from typing import Any, Dict, Iterable, List, Optional, Tuple, Union +import warnings import attr +from attrs_strict import type_validator from .collections import ImmutableDict from .exceptions import ValidationError from .fields.hashes import validate_sha1 -from .hashutil import MultiHash, hash_git_data, hash_to_hex +from .hashutil import MultiHash, hash_git_data, hash_to_bytes, hash_to_hex + + +class ObjectType(enum.Enum): + """Possible object types of a QualifiedSWHID. + + The values of each variant is what is used in the SWHID's string representation.""" + + ORIGIN = "ori" + SNAPSHOT = "snp" + REVISION = "rev" + RELEASE = "rel" + DIRECTORY = "dir" + CONTENT = "cnt" + +# The following are deprecated aliases of the variants defined in ObjectType +# while transitioning from SWHID to QualifiedSWHID ORIGIN = "origin" SNAPSHOT = "snapshot" REVISION = "revision" @@ -697,11 +718,116 @@ } +@attr.s(frozen=True, kw_only=True) +class QualifiedSWHID: + """ + Dataclass holding the relevant info associated to a SoftWare Heritage + persistent IDentifier (SWHID) + + Raises: + swh.model.exceptions.ValidationError: In case of invalid object type or id + + To get the raw QualifiedSWHID string from an instance of this class, + use the :func:`str` function:: + + >>> swhid = QualifiedSWHID( + ... object_type=ObjectType.CONTENT, + ... object_id=bytes.fromhex('8ff44f081d43176474b267de5451f2c2e88089d0'), + ... qualifiers={"lines": "5-10"}, + ... ) + >>> str(swhid) + 'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0;lines=5-10' + + And vice-versa with :meth:`QualifiedSWHID.from_string`: + + >>> swhid == QualifiedSWHID.from_string( + ... "swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0;lines=5-10" + ... ) + True + """ + + namespace = attr.ib(type=str, default=SWHID_NAMESPACE) + """the namespace of the identifier, defaults to ``swh``""" + + scheme_version = attr.ib(type=int, default=SWHID_VERSION) + """the scheme version of the identifier, defaults to 1""" + + object_type = attr.ib(type=ObjectType, validator=type_validator()) + """the type of object the identifier points to""" + + object_id = attr.ib(type=bytes, validator=type_validator()) + """object's identifier""" + + qualifiers = attr.ib( + type=ImmutableDict[str, Any], converter=ImmutableDict, default=ImmutableDict() + ) + """optional dict filled with metadata related to pointed object""" + + @namespace.validator + def check_namespace(self, attribute, value): + if value != SWHID_NAMESPACE: + raise ValidationError( + "Invalid SWHID: invalid namespace: %(namespace)s", + params={"namespace": value}, + ) + + @scheme_version.validator + def check_scheme_version(self, attribute, value): + if value != SWHID_VERSION: + raise ValidationError( + "Invalid SWHID: invalid version: %(version)s", params={"version": value} + ) + + @object_id.validator + def check_object_id(self, attribute, value): + if len(value) != 20: + raise ValidationError( + "Invalid SWHID: invalid checksum: %(object_id)s", + params={"object_id": hash_to_hex(value)}, + ) + + @qualifiers.validator + def check_qualifiers(self, attribute, value): + for k in value: + if k not in SWHID_QUALIFIERS: + raise ValidationError( + "Invalid SWHID: unknown qualifier: %(qualifier)s", + params={"qualifier": k}, + ) + + def __str__(self) -> str: + swhid = SWHID_SEP.join( + [ + self.namespace, + str(self.scheme_version), + self.object_type.value, + hash_to_hex(self.object_id), + ] + ) + if self.qualifiers: + for k, v in self.qualifiers.items(): + swhid += "%s%s=%s" % (SWHID_CTXT_SEP, k, v) + return swhid + + @classmethod + def from_string(cls, s: str) -> QualifiedSWHID: + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + old_swhid = parse_swhid(s) + object_type = ObjectType(_object_type_map[old_swhid.object_type]["short_name"]) + return QualifiedSWHID( + namespace=old_swhid.namespace, + scheme_version=old_swhid.scheme_version, + object_type=object_type, + object_id=hash_to_bytes(old_swhid.object_id), + qualifiers=old_swhid.metadata, + ) + + @attr.s(frozen=True) class SWHID: """ - Named tuple holding the relevant info associated to a SoftWare Heritage - persistent IDentifier (SWHID) + Deprecated alternative to QualifiedSWHID. Args: namespace (str): the namespace of the identifier, defaults to ``swh`` @@ -744,6 +870,13 @@ type=ImmutableDict[str, Any], converter=ImmutableDict, default=ImmutableDict() ) + def __attrs_post_init__(self): + warnings.warn( + "swh.model.identifiers.SWHID is deprecated; " + "use swh.model.identifiers.QualifiedSWHID instead.", + DeprecationWarning, + ) + @namespace.validator def check_namespace(self, attribute, value): if value != SWHID_NAMESPACE: diff --git a/swh/model/tests/test_identifiers.py b/swh/model/tests/test_identifiers.py --- a/swh/model/tests/test_identifiers.py +++ b/swh/model/tests/test_identifiers.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2018 The Software Heritage developers +# Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -20,6 +20,8 @@ REVISION, SNAPSHOT, SWHID, + ObjectType, + QualifiedSWHID, normalize_timestamp, ) @@ -883,6 +885,7 @@ class TestSwhid(unittest.TestCase): + @pytest.mark.filterwarnings("ignore:.*SWHID.*:DeprecationWarning") def test_swhid(self): _snapshot_id = _x("c7c108084bc0bf3d81436bf980b46e98bd338453") _release_id = "22ece559cc7cc2364edc5e5593d63ae8bd229f9f" @@ -995,6 +998,7 @@ with self.assertRaises(ValidationError): identifiers.swhid(_type, _hash) + @pytest.mark.filterwarnings("ignore:.*SWHID.*:DeprecationWarning") def test_parse_swhid(self): for swhid, _type, _version, _hash in [ ( @@ -1028,15 +1032,17 @@ "c7c108084bc0bf3d81436bf980b46e98bd338453", ), ]: - expected_result = SWHID( - namespace="swh", - scheme_version=_version, - object_type=_type, - object_id=_hash, - metadata={}, - ) - actual_result = identifiers.parse_swhid(swhid) + with pytest.warns(DeprecationWarning): + expected_result = SWHID( + namespace="swh", + scheme_version=_version, + object_type=_type, + object_id=_hash, + metadata={}, + ) + actual_result = identifiers.parse_swhid(swhid) self.assertEqual(actual_result, expected_result) + self.assertEqual(str(expected_result), swhid) for swhid, _type, _version, _hash, _metadata in [ ( @@ -1054,14 +1060,15 @@ {"origin": "deb://Debian/packages/linuxdoc-tools"}, ), ]: - expected_result = SWHID( - namespace="swh", - scheme_version=_version, - object_type=_type, - object_id=_hash, - metadata=_metadata, - ) - actual_result = identifiers.parse_swhid(swhid) + with pytest.warns(DeprecationWarning): + expected_result = SWHID( + namespace="swh", + scheme_version=_version, + object_type=_type, + object_id=_hash, + metadata=_metadata, + ) + actual_result = identifiers.parse_swhid(swhid) self.assertEqual(actual_result, expected_result) self.assertEqual( expected_result.to_dict(), @@ -1073,6 +1080,7 @@ "metadata": _metadata, }, ) + self.assertEqual(str(expected_result), swhid) @pytest.mark.parametrize( @@ -1122,6 +1130,7 @@ identifiers.parse_swhid(invalid_swhid) +@pytest.mark.filterwarnings("ignore:.*SWHID.*:DeprecationWarning") @pytest.mark.parametrize( "ns,version,type,id", [ @@ -1138,7 +1147,8 @@ ) -def test_swhid_hash(): +@pytest.mark.filterwarnings("ignore:.*SWHID.*:DeprecationWarning") +def test_SWHID_hash(): object_id = "94a9ed024d3859793618152ea559a168bbcbb5e2" assert hash(SWHID(object_type="directory", object_id=object_id)) == hash( @@ -1168,7 +1178,8 @@ ) -def test_swhid_eq(): +@pytest.mark.filterwarnings("ignore:.*SWHID.*:DeprecationWarning") +def test_SWHID_eq(): object_id = "94a9ed024d3859793618152ea559a168bbcbb5e2" assert SWHID(object_type="directory", object_id=object_id) == SWHID( @@ -1182,3 +1193,215 @@ assert SWHID( object_type="directory", object_id=object_id, metadata=dummy_qualifiers, ) == SWHID(object_type="directory", object_id=object_id, metadata=dummy_qualifiers,) + + +def test_parse_serialize_qualified_swhid(): + for swhid, _type, _version, _hash in [ + ( + "swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2", + ObjectType.CONTENT, + 1, + _x("94a9ed024d3859793618152ea559a168bbcbb5e2"), + ), + ( + "swh:1:dir:d198bc9d7a6bcf6db04f476d29314f157507d505", + ObjectType.DIRECTORY, + 1, + _x("d198bc9d7a6bcf6db04f476d29314f157507d505"), + ), + ( + "swh:1:rev:309cf2674ee7a0749978cf8265ab91a60aea0f7d", + ObjectType.REVISION, + 1, + _x("309cf2674ee7a0749978cf8265ab91a60aea0f7d"), + ), + ( + "swh:1:rel:22ece559cc7cc2364edc5e5593d63ae8bd229f9f", + ObjectType.RELEASE, + 1, + _x("22ece559cc7cc2364edc5e5593d63ae8bd229f9f"), + ), + ( + "swh:1:snp:c7c108084bc0bf3d81436bf980b46e98bd338453", + ObjectType.SNAPSHOT, + 1, + _x("c7c108084bc0bf3d81436bf980b46e98bd338453"), + ), + ]: + expected_result = QualifiedSWHID( + namespace="swh", + scheme_version=_version, + object_type=_type, + object_id=_hash, + qualifiers={}, + ) + actual_result = QualifiedSWHID.from_string(swhid) + assert actual_result == expected_result + assert str(expected_result) == str(actual_result) == swhid + + for swhid, _type, _version, _hash, _qualifiers in [ + ( + "swh:1:cnt:9c95815d9e9d91b8dae8e05d8bbc696fe19f796b;lines=1-18;origin=https://github.com/python/cpython", # noqa + ObjectType.CONTENT, + 1, + _x("9c95815d9e9d91b8dae8e05d8bbc696fe19f796b"), + {"lines": "1-18", "origin": "https://github.com/python/cpython"}, + ), + ( + "swh:1:dir:0b6959356d30f1a4e9b7f6bca59b9a336464c03d;origin=deb://Debian/packages/linuxdoc-tools", # noqa + ObjectType.DIRECTORY, + 1, + _x("0b6959356d30f1a4e9b7f6bca59b9a336464c03d"), + {"origin": "deb://Debian/packages/linuxdoc-tools"}, + ), + ]: + expected_result = QualifiedSWHID( + namespace="swh", + scheme_version=_version, + object_type=_type, + object_id=_hash, + qualifiers=_qualifiers, + ) + actual_result = QualifiedSWHID.from_string(swhid) + assert actual_result == expected_result + assert str(expected_result) == str(actual_result) == swhid + + +@pytest.mark.parametrize( + "invalid_swhid", + [ + "swh:1:cnt", + "swh:1:", + "swh:", + "swh:1:cnt:", + "foo:1:cnt:abc8bc9d7a6bcf6db04f476d29314f157507d505", + "swh:2:dir:def8bc9d7a6bcf6db04f476d29314f157507d505", + "swh:1:foo:fed8bc9d7a6bcf6db04f476d29314f157507d505", + "swh:1:dir:0b6959356d30f1a4e9b7f6bca59b9a336464c03d;invalid;malformed", + "swh:1:snp:gh6959356d30f1a4e9b7f6bca59b9a336464c03d", + "swh:1:snp:foo", + # wrong qualifier: ori should be origin + "swh:1:dir:0b6959356d30f1a4e9b7f6bca59b9a336464c03d;ori=something;anchor=1;visit=1;path=/", # noqa + # wrong qualifier: anc should be anchor + "swh:1:dir:0b6959356d30f1a4e9b7f6bca59b9a336464c03d;origin=something;anc=1;visit=1;path=/", # noqa + # wrong qualifier: vis should be visit + "swh:1:dir:0b6959356d30f1a4e9b7f6bca59b9a336464c03d;origin=something;anchor=1;vis=1;path=/", # noqa + # wrong qualifier: pa should be path + "swh:1:dir:0b6959356d30f1a4e9b7f6bca59b9a336464c03d;origin=something;anchor=1;visit=1;pa=/", # noqa + # wrong qualifier: line should be lines + "swh:1:dir:0b6959356d30f1a4e9b7f6bca59b9a336464c03d;line=10;origin=something;anchor=1;visit=1;path=/", # noqa + # wrong qualifier value: it contains space before of after + "swh:1:dir:0b6959356d30f1a4e9b7f6bca59b9a336464c03d;origin= https://some-url", # noqa + "swh:1:dir:0b6959356d30f1a4e9b7f6bca59b9a336464c03d;origin=something;anchor=some-anchor ", # noqa + "swh:1:dir:0b6959356d30f1a4e9b7f6bca59b9a336464c03d;origin=something;anchor=some-anchor ;visit=1", # noqa + # invalid swhid: whitespaces + "swh :1:dir:0b6959356d30f1a4e9b7f6bca59b9a336464c03d;ori=something;anchor=1;visit=1;path=/", # noqa + "swh: 1:dir:0b6959356d30f1a4e9b7f6bca59b9a336464c03d;ori=something;anchor=1;visit=1;path=/", # noqa + "swh: 1: dir:0b6959356d30f1a4e9b7f6bca59b9a336464c03d;ori=something;anchor=1;visit=1;path=/", # noqa + "swh:1: dir: 0b6959356d30f1a4e9b7f6bca59b9a336464c03d", + "swh:1: dir: 0b6959356d30f1a4e9b7f6bca59b9a336464c03d; origin=blah", + "swh:1: dir: 0b6959356d30f1a4e9b7f6bca59b9a336464c03d;lines=12", + # other whitespaces + "swh\t:1:dir:0b6959356d30f1a4e9b7f6bca59b9a336464c03d;lines=12", + "swh:1\n:dir:0b6959356d30f1a4e9b7f6bca59b9a336464c03d;lines=12", + "swh:1:\rdir:0b6959356d30f1a4e9b7f6bca59b9a336464c03d;lines=12", + "swh:1:dir:0b6959356d30f1a4e9b7f6bca59b9a336464c03d\f;lines=12", + "swh:1:dir:0b6959356d30f1a4e9b7f6bca59b9a336464c03d;lines=12\v", + ], +) +def test_parse_qualified_swhid_parsing_error(invalid_swhid): + with pytest.raises(ValidationError): + QualifiedSWHID.from_string(invalid_swhid) + + +@pytest.mark.filterwarnings("ignore:.*SWHID.*:DeprecationWarning") +@pytest.mark.parametrize( + "ns,version,type,id,qualifiers", + [ + ("foo", 1, ObjectType.CONTENT, "abc8bc9d7a6bcf6db04f476d29314f157507d505", {}), + ("swh", 2, ObjectType.CONTENT, "def8bc9d7a6bcf6db04f476d29314f157507d505", {}), + ("swh", 1, ObjectType.DIRECTORY, "aaaa", {}), + ( + "swh", + 1, + ObjectType.CONTENT, + "abc8bc9d7a6bcf6db04f476d29314f157507d505", + {"foo": "bar"}, + ), + ], +) +def test_QualifiedSWHID_validation_error(ns, version, type, id, qualifiers): + with pytest.raises(ValidationError): + QualifiedSWHID( + namespace=ns, + scheme_version=version, + object_type=type, + object_id=_x(id), + qualifiers=qualifiers, + ) + + +def test_QualifiedSWHID_hash(): + object_id = _x("94a9ed024d3859793618152ea559a168bbcbb5e2") + + assert hash( + QualifiedSWHID(object_type=ObjectType.DIRECTORY, object_id=object_id) + ) == hash(QualifiedSWHID(object_type=ObjectType.DIRECTORY, object_id=object_id)) + + assert hash( + QualifiedSWHID( + object_type=ObjectType.DIRECTORY, + object_id=object_id, + qualifiers=dummy_qualifiers, + ) + ) == hash( + QualifiedSWHID( + object_type=ObjectType.DIRECTORY, + object_id=object_id, + qualifiers=dummy_qualifiers, + ) + ) + + # Different order of the dictionary, so the underlying order of the tuple in + # ImmutableDict is different. + assert hash( + QualifiedSWHID( + object_type=ObjectType.DIRECTORY, + object_id=object_id, + qualifiers={"origin": "https://example.com", "lines": "42"}, + ) + ) == hash( + QualifiedSWHID( + object_type=ObjectType.DIRECTORY, + object_id=object_id, + qualifiers={"lines": "42", "origin": "https://example.com"}, + ) + ) + + +def test_QualifiedSWHID_eq(): + object_id = _x("94a9ed024d3859793618152ea559a168bbcbb5e2") + + assert QualifiedSWHID( + object_type=ObjectType.DIRECTORY, object_id=object_id + ) == QualifiedSWHID(object_type=ObjectType.DIRECTORY, object_id=object_id) + + assert QualifiedSWHID( + object_type=ObjectType.DIRECTORY, + object_id=object_id, + qualifiers=dummy_qualifiers, + ) == QualifiedSWHID( + object_type=ObjectType.DIRECTORY, + object_id=object_id, + qualifiers=dummy_qualifiers, + ) + + assert QualifiedSWHID( + object_type=ObjectType.DIRECTORY, + object_id=object_id, + qualifiers=dummy_qualifiers, + ) == QualifiedSWHID( + object_type=ObjectType.DIRECTORY, + object_id=object_id, + qualifiers=dummy_qualifiers, + ) diff --git a/tox.ini b/tox.ini --- a/tox.ini +++ b/tox.ini @@ -8,6 +8,7 @@ pytest-cov commands = pytest --cov={envsitepackagesdir}/swh/model \ + --doctest-modules \ {envsitepackagesdir}/swh/model \ --cov-branch {posargs}