diff --git a/swh/model/identifiers.py b/swh/model/identifiers.py --- a/swh/model/identifiers.py +++ b/swh/model/identifiers.py @@ -29,6 +29,16 @@ SWHID_TYPES = ["ori", "snp", "rel", "rev", "dir", "cnt"] SWHID_SEP = ":" SWHID_CTXT_SEP = ";" +SWHID_QUALIFIERS = {"origin", "anchor", "visit", "path", "lines"} + +SWHID_RE_RAW = ( + f"(?P{SWHID_NAMESPACE})" + f"{SWHID_SEP}(?P{SWHID_VERSION})" + f"{SWHID_SEP}(?P{'|'.join(SWHID_TYPES)})" + f"{SWHID_SEP}(?P[0-9a-f]{{40}})" + f"({SWHID_CTXT_SEP}(?P\\S+))?" +) +SWHID_RE = re.compile(SWHID_RE_RAW) @lru_cache() @@ -677,6 +687,15 @@ CONTENT: {"short_name": "cnt", "key_id": "sha1_git"}, } +_swhid_type_map = { + "ori": ORIGIN, + "snp": SNAPSHOT, + "rel": RELEASE, + "rev": REVISION, + "dir": DIRECTORY, + "cnt": CONTENT, +} + @attr.s(frozen=True) class SWHID: @@ -717,8 +736,8 @@ # 'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0' """ - namespace = attr.ib(type=str, default="swh") - scheme_version = attr.ib(type=int, default=1) + namespace = attr.ib(type=str, default=SWHID_NAMESPACE) + scheme_version = attr.ib(type=int, default=SWHID_VERSION) object_type = attr.ib(type=str, default="") object_id = attr.ib(type=str, converter=hash_to_hex, default="") # type: ignore metadata = attr.ib( @@ -729,28 +748,43 @@ def check_namespace(self, attribute, value): if value != SWHID_NAMESPACE: raise ValidationError( - f"Invalid SWHID: namespace is '{value}' but must be '{SWHID_NAMESPACE}'" + "Invalid SWHID: invalid namespace: %(namespace)s", + params={"namespace": value}, ) @scheme_version.validator def check_scheme_version(self, attribute, value): if value != SWHID_VERSION: raise ValidationError( - f"Invalid SWHID: version is {value} but must be {SWHID_VERSION}" + "Invalid SWHID: invalid version: %(version)s", params={"version": value} ) @object_type.validator def check_object_type(self, attribute, value): if value not in _object_type_map: - supported_types = ", ".join(_object_type_map.keys()) raise ValidationError( - f"Invalid SWHID: object type is {value} but must be " - f"one of {supported_types}" + "Invalid SWHID: invalid type: %(object_type)s)", + params={"object_type": value}, ) @object_id.validator def check_object_id(self, attribute, value): - validate_sha1(value) # can raise if invalid hash + try: + validate_sha1(value) # can raise if invalid hash + except ValidationError: + raise ValidationError( + "Invalid SWHID: invalid checksum: %(object_id)s", + params={"object_id": value}, + ) from None + + @metadata.validator + def check_qualifiers(self, attribute, value): + for k in value: + if k not in SWHID_QUALIFIERS: + raise ValidationError( + "Invalid SWHID: unknown qualifier: %(qualifier)s", + params={"qualifier": k}, + ) def to_dict(self) -> Dict[str, Any]: return attr.asdict(self) @@ -801,77 +835,44 @@ return str(swhid) -CONTEXT_QUALIFIERS = {"origin", "anchor", "visit", "path", "lines"} - - def parse_swhid(swhid: str) -> SWHID: - """Parse :ref:`persistent-identifiers`. + """Parse a Software Heritage identifier (SWHID) from string (see: + :ref:`persistent-identifiers`.) Args: swhid (str): A persistent identifier - Raises: - swh.model.exceptions.ValidationError: in case of: - - * missing mandatory values (4) - * invalid namespace supplied - * invalid version supplied - * invalid type supplied - * missing hash - * invalid hash identifier supplied - Returns: a named tuple holding the parsing result - """ - if re.search(r"[ \t\n\r\f\v]", swhid): - raise ValidationError("Invalid SwHID: SWHIDs cannot contain whitespaces") - - # ; - swhid_parts = swhid.split(SWHID_CTXT_SEP) - swhid_data = swhid_parts.pop(0).split(":") - - if len(swhid_data) != 4: - raise ValidationError( - "Invalid SWHID, format must be 'swh:1:OBJECT_TYPE:OBJECT_ID'" - ) - - # Checking for parsing errors - _ns, _version, _type, _id = swhid_data - - for otype, data in _object_type_map.items(): - if _type == data["short_name"]: - _type = otype - break + Raises: + swh.model.exceptions.ValidationError: if passed string is not a valid SWHID - if not _id: + """ + m = SWHID_RE.fullmatch(swhid) + if not m: raise ValidationError( - "Invalid SWHID: missing OBJECT_ID (as a 40 hex digit string)" - ) - - _metadata = {} - for part in swhid_parts: - try: - qualifier, val = part.split("=") - _metadata[qualifier] = val - except Exception: - raise ValidationError( - "Invalid SWHID: contextual data must be a ;-separated list of " - "key=value pairs" - ) - - wrong_qualifiers = set(_metadata) - set(CONTEXT_QUALIFIERS) - if wrong_qualifiers: - error_msg = ( - f"Invalid SWHID: Wrong qualifiers {', '.join(wrong_qualifiers)}. " - f"The qualifiers must be one of {', '.join(CONTEXT_QUALIFIERS)}" + "Invalid SWHID: invalid syntax: %(swhid)s", params={"swhid": swhid} ) - raise ValidationError(error_msg) + parts = m.groupdict() + + _qualifiers = {} + qualifiers_raw = parts["qualifiers"] + if qualifiers_raw: + for qualifier in qualifiers_raw.split(SWHID_CTXT_SEP): + try: + k, v = qualifier.split("=") + except ValueError: + raise ValidationError( + "Invalid SWHID: invalid qualifier: %(qualifier)s", + params={"qualifier": qualifier}, + ) + _qualifiers[k] = v return SWHID( - _ns, - int(_version), - _type, - _id, - _metadata, # type: ignore # mypy can't properly unify types + parts["scheme"], + int(parts["version"]), + _swhid_type_map[parts["object_type"]], + parts["object_id"], + _qualifiers, # type: ignore # mypy can't properly unify types ) diff --git a/swh/model/tests/test_identifiers.py b/swh/model/tests/test_identifiers.py --- a/swh/model/tests/test_identifiers.py +++ b/swh/model/tests/test_identifiers.py @@ -218,6 +218,8 @@ ], } +dummy_qualifiers = {"origin": "https://example.com", "lines": "42"} + class DirectoryIdentifier(unittest.TestCase): def setUp(self): @@ -843,7 +845,7 @@ for _type, _hash in [ (SNAPSHOT, _snapshot_id), (SNAPSHOT, _snapshot), - ("foo", ""), + ("lines", "42"), ]: with self.assertRaises(ValidationError): identifiers.swhid(_type, _hash) @@ -1117,17 +1119,9 @@ ) assert hash( - SWHID( - object_type="directory", - object_id=object_id, - metadata={"foo": "bar", "baz": "qux"}, - ) + SWHID(object_type="directory", object_id=object_id, metadata=dummy_qualifiers,) ) == hash( - SWHID( - object_type="directory", - object_id=object_id, - metadata={"foo": "bar", "baz": "qux"}, - ) + SWHID(object_type="directory", object_id=object_id, metadata=dummy_qualifiers,) ) # Different order of the dictionary, so the underlying order of the tuple in @@ -1136,13 +1130,13 @@ SWHID( object_type="directory", object_id=object_id, - metadata={"foo": "bar", "baz": "qux"}, + metadata={"origin": "https://example.com", "lines": "42"}, ) ) == hash( SWHID( object_type="directory", object_id=object_id, - metadata={"baz": "qux", "foo": "bar"}, + metadata={"lines": "42", "origin": "https://example.com"}, ) ) @@ -1155,21 +1149,9 @@ ) assert SWHID( - object_type="directory", - object_id=object_id, - metadata={"foo": "bar", "baz": "qux"}, - ) == SWHID( - object_type="directory", - object_id=object_id, - metadata={"foo": "bar", "baz": "qux"}, - ) + object_type="directory", object_id=object_id, metadata=dummy_qualifiers, + ) == SWHID(object_type="directory", object_id=object_id, metadata=dummy_qualifiers,) assert SWHID( - object_type="directory", - object_id=object_id, - metadata={"foo": "bar", "baz": "qux"}, - ) == SWHID( - object_type="directory", - object_id=object_id, - metadata={"baz": "qux", "foo": "bar"}, - ) + object_type="directory", object_id=object_id, metadata=dummy_qualifiers, + ) == SWHID(object_type="directory", object_id=object_id, metadata=dummy_qualifiers,) diff --git a/swh/model/tests/test_model.py b/swh/model/tests/test_model.py --- a/swh/model/tests/test_model.py +++ b/swh/model/tests/test_model.py @@ -782,12 +782,13 @@ _metadata_fetcher = MetadataFetcher(name="test-fetcher", version="0.0.1",) _content_swhid = parse_swhid("swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2") _origin_url = "https://forge.softwareheritage.org/source/swh-model.git" +_dummy_qualifiers = {"origin": "https://example.com", "lines": "42"} _common_metadata_fields = dict( discovery_date=datetime.datetime.now(tz=datetime.timezone.utc), authority=_metadata_authority, fetcher=_metadata_fetcher, format="json", - metadata=b'{"foo": "bar"}', + metadata=b'{"origin": "https://example.com", "lines": "42"}', ) @@ -815,7 +816,7 @@ "fetcher": {"name": "test-fetcher", "version": "0.0.1",}, "discovery_date": _common_metadata_fields["discovery_date"], "format": "json", - "metadata": b'{"foo": "bar"}', + "metadata": b'{"origin": "https://example.com", "lines": "42"}', } m = RawExtrinsicMetadata( @@ -893,7 +894,7 @@ target=SWHID( object_type="content", object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", - metadata={"foo": "bar"}, + metadata=_dummy_qualifiers, ), **_common_metadata_fields, ) @@ -1018,7 +1019,7 @@ snapshot=SWHID( object_type="snapshot", object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", - metadata={"foo": "bar"}, + metadata=_dummy_qualifiers, ), **_common_metadata_fields, ) @@ -1073,7 +1074,7 @@ release=SWHID( object_type="release", object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", - metadata={"foo": "bar"}, + metadata=_dummy_qualifiers, ), **_common_metadata_fields, ) @@ -1128,7 +1129,7 @@ revision=SWHID( object_type="revision", object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", - metadata={"foo": "bar"}, + metadata=_dummy_qualifiers, ), **_common_metadata_fields, ) @@ -1205,7 +1206,7 @@ directory=SWHID( object_type="directory", object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", - metadata={"foo": "bar"}, + metadata=_dummy_qualifiers, ), **_common_metadata_fields, )