diff --git a/swh/model/identifiers.py b/swh/model/identifiers.py --- a/swh/model/identifiers.py +++ b/swh/model/identifiers.py @@ -23,6 +23,7 @@ TypeVar, Union, ) +import urllib.parse import warnings import attr @@ -816,7 +817,9 @@ try: return cls(**parts) except ValueError as e: - raise ValidationError(*e.args) from None + raise ValidationError( + "ValueError: %(args)", params={"args": e.args} + ) from None @attr.s(frozen=True, kw_only=True) @@ -856,9 +859,6 @@ def _parse_core_swhid(swhid: Union[str, CoreSWHID, None]) -> Optional[CoreSWHID]: - """Alias of CoreSWHID.from_string to make mypy happy...... - - https://github.com/python/mypy/issues/6172""" if swhid is None or isinstance(swhid, CoreSWHID): return swhid else: @@ -868,12 +868,25 @@ def _parse_lines_qualifier( lines: Union[str, Tuple[int, Optional[int]], None] ) -> Optional[Tuple[int, Optional[int]]]: - if lines is None or isinstance(lines, tuple): - return lines - elif "-" in lines: - return tuple(map(int, lines.split("-", 2))) # type: ignore + try: + if lines is None or isinstance(lines, tuple): + return lines + elif "-" in lines: + (from_, to) = lines.split("-", 2) + return (int(from_), int(to)) + else: + return (int(lines), None) + except ValueError: + raise ValidationError( + "Invalid format for the lines qualifier: %(lines)", params={"lines": lines} + ) + + +def _parse_path_qualifier(path: Union[str, bytes, None]) -> Optional[bytes]: + if path is None or isinstance(path, bytes): + return path else: - return (int(lines), None) + return urllib.parse.unquote_to_bytes(path) @attr.s(frozen=True, kw_only=True) @@ -929,7 +942,12 @@ is specified, as the core identifier of a directory, a revision, a release, or a snapshot""" - path = attr.ib(type=Optional[bytes], default=None, validator=type_validator()) + path = attr.ib( + type=Optional[bytes], + default=None, + validator=type_validator(), + converter=_parse_path_qualifier, + ) """the absolute file path, from the root directory associated to the anchor node, to the object; when the anchor denotes a directory or a revision, and almost always when it’s a release, the root directory is uniquely determined; @@ -948,8 +966,8 @@ def check_visit(self, attribute, value): if value and value.object_type != ObjectType.SNAPSHOT: raise ValidationError( - f"The 'visit' qualifier must be a 'snp' SWHID, " - f"not '{value.object_type.value}'" + "The 'visit' qualifier must be a 'snp' SWHID, not '%(type)s'", + params={"type": value.object_type.value}, ) @anchor.validator @@ -961,17 +979,36 @@ ObjectType.SNAPSHOT, ): raise ValidationError( - f"The 'visit' qualifier must be a 'dir', 'rev', 'rel', or 'snp' SWHID, " - f"not '{value.object_type.value}'" + "The 'visit' qualifier must be a 'dir', 'rev', 'rel', or 'snp' SWHID, " + "not '%s(type)s'", + params={"type": value.object_type.value}, ) def qualifiers(self) -> Dict[str, str]: + origin = self.origin + if origin: + unescaped_origin = origin + origin = origin.replace(";", "%3B") + assert urllib.parse.unquote_to_bytes( + origin + ) == urllib.parse.unquote_to_bytes( + unescaped_origin + ), "Escaping ';' in the origin qualifier corrupted the origin URL." + d: Dict[str, Optional[str]] = { - "origin": self.origin, + "origin": origin, "visit": str(self.visit) if self.visit else None, "anchor": str(self.anchor) if self.anchor else None, - "path": self.path.decode() if self.path is not None else None, - "lines": "-".join(map(str, self.lines)) if self.lines else None, + "path": ( + urllib.parse.quote_from_bytes(self.path) + if self.path is not None + else None + ), + "lines": ( + "-".join(str(line) for line in self.lines if line is not None) + if self.lines + else None + ), } return {k: v for (k, v) in d.items() if v is not None} @@ -997,12 +1034,15 @@ invalid_qualifiers = set(qualifiers) - SWHID_QUALIFIERS if invalid_qualifiers: raise ValidationError( - "Invalid qualifier(s): {', '.join(invalid_qualifiers)}" + "Invalid qualifier(s): %(qualifiers)", + params={"qualifiers": ", ".join(invalid_qualifiers)}, ) try: return QualifiedSWHID(**parts, **qualifiers) except ValueError as e: - raise ValidationError(*e.args) from None + raise ValidationError( + "ValueError: %(args)s", params={"args": e.args} + ) from None @attr.s(frozen=True, kw_only=True) diff --git a/swh/model/tests/test_identifiers.py b/swh/model/tests/test_identifiers.py --- a/swh/model/tests/test_identifiers.py +++ b/swh/model/tests/test_identifiers.py @@ -9,6 +9,7 @@ from typing import Dict import unittest +import attr import pytest from swh.model import hashutil, identifiers @@ -21,6 +22,7 @@ REVISION, SNAPSHOT, SWHID, + SWHID_QUALIFIERS, CoreSWHID, ExtendedObjectType, ExtendedSWHID, @@ -1298,6 +1300,17 @@ ), None, # Neither does ExtendedSWHID ), + ( + f"swh:1:cnt:{HASH};origin=https://github.com/python/cpython;lines=18", + None, # likewise + QualifiedSWHID( + object_type=ObjectType.CONTENT, + object_id=_x(HASH), + origin="https://github.com/python/cpython", + lines=(18, None), + ), + None, # likewise + ), ( f"swh:1:dir:{HASH};origin=deb://Debian/packages/linuxdoc-tools", None, # likewise @@ -1423,6 +1436,132 @@ ) +QUALIFIED_SWHIDS = [ + # origin: + ( + f"swh:1:cnt:{HASH};origin=https://github.com/python/cpython", + QualifiedSWHID( + object_type=ObjectType.CONTENT, + object_id=_x(HASH), + origin="https://github.com/python/cpython", + ), + ), + ( + f"swh:1:cnt:{HASH};origin=https://example.org/foo%3Bbar%25baz", + QualifiedSWHID( + object_type=ObjectType.CONTENT, + object_id=_x(HASH), + origin="https://example.org/foo%3Bbar%25baz", + ), + ), + # visit: + ( + f"swh:1:cnt:{HASH};visit=swh:1:snp:{HASH}", + QualifiedSWHID( + object_type=ObjectType.CONTENT, + object_id=_x(HASH), + visit=CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=_x(HASH)), + ), + ), + (f"swh:1:cnt:{HASH};visit=swh:1:rel:{HASH}", None,), + # anchor: + ( + f"swh:1:cnt:{HASH};anchor=swh:1:dir:{HASH}", + QualifiedSWHID( + object_type=ObjectType.CONTENT, + object_id=_x(HASH), + anchor=CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=_x(HASH)), + ), + ), + ( + f"swh:1:cnt:{HASH};anchor=swh:1:rev:{HASH}", + QualifiedSWHID( + object_type=ObjectType.CONTENT, + object_id=_x(HASH), + anchor=CoreSWHID(object_type=ObjectType.REVISION, object_id=_x(HASH)), + ), + ), + ( + f"swh:1:cnt:{HASH};anchor=swh:1:cnt:{HASH}", + None, # 'cnt' is not valid in anchor + ), + ( + f"swh:1:cnt:{HASH};anchor=swh:1:ori:{HASH}", + None, # 'ori' is not valid in a CoreSWHID + ), + # path: + ( + f"swh:1:cnt:{HASH};path=/foo", + QualifiedSWHID( + object_type=ObjectType.CONTENT, object_id=_x(HASH), path=b"/foo" + ), + ), + ( + f"swh:1:cnt:{HASH};path=/foo%3Bbar", + QualifiedSWHID( + object_type=ObjectType.CONTENT, object_id=_x(HASH), path=b"/foo;bar" + ), + ), + ( + f"swh:1:cnt:{HASH};path=/foo%25bar", + QualifiedSWHID( + object_type=ObjectType.CONTENT, object_id=_x(HASH), path=b"/foo%bar" + ), + ), + # lines + ( + f"swh:1:cnt:{HASH};lines=1-18", + QualifiedSWHID( + object_type=ObjectType.CONTENT, object_id=_x(HASH), lines=(1, 18), + ), + ), + ( + f"swh:1:cnt:{HASH};lines=18", + QualifiedSWHID( + object_type=ObjectType.CONTENT, object_id=_x(HASH), lines=(18, None), + ), + ), + (f"swh:1:cnt:{HASH};lines=", None,), + (f"swh:1:cnt:{HASH};lines=aa", None,), + (f"swh:1:cnt:{HASH};lines=18-aa", None,), +] + + +@pytest.mark.parametrize("string,parsed", QUALIFIED_SWHIDS) +def test_QualifiedSWHID_parse_serialize_qualifiers(string, parsed): + """Tests parsing and serializing valid SWHIDs with the various SWHID classes.""" + if parsed is None: + with pytest.raises(ValidationError): + print(repr(QualifiedSWHID.from_string(string))) + else: + assert QualifiedSWHID.from_string(string) == parsed + assert str(parsed) == string + + +def test_QualifiedSWHID_serialize_origin(): + """Checks that semicolon in origins are escaped.""" + string = f"swh:1:cnt:{HASH};origin=https://example.org/foo%3Bbar%25baz" + swhid = QualifiedSWHID( + object_type=ObjectType.CONTENT, + object_id=_x(HASH), + origin="https://example.org/foo;bar%25baz", + ) + assert str(swhid) == string + + +def test_QualifiedSWHID_attributes(): + """Checks the set of QualifiedSWHID attributes match the SWHID_QUALIFIERS + constant.""" + + assert set(attr.fields_dict(QualifiedSWHID)) == { + "namespace", + "scheme_version", + "object_type", + "object_id", + *SWHID_QUALIFIERS, + } + + @pytest.mark.parametrize( "ns,version,type,id", [