diff --git a/swh/deposit/parsers.py b/swh/deposit/parsers.py --- a/swh/deposit/parsers.py +++ b/swh/deposit/parsers.py @@ -16,7 +16,15 @@ import xmltodict from swh.deposit.errors import ParserError -from swh.model.identifiers import SWHID, parse_swhid +from swh.model.exceptions import ValidationError +from swh.model.identifiers import ( + DIRECTORY, + RELEASE, + REVISION, + SNAPSHOT, + SWHID, + parse_swhid, +) class SWHFileUploadZipParser(FileUploadParser): @@ -106,6 +114,9 @@ raise ParserError(str(e)) +ALLOWED_QUALIFIERS_NODE_TYPE = (SNAPSHOT, REVISION, RELEASE, DIRECTORY) + + def parse_swh_reference(metadata: Dict) -> Optional[Union[str, SWHID]]: """Parse swh reference within the metadata dict (or origin) reference if found, None otherwise. @@ -152,4 +163,25 @@ swhid = swh_object.get("@swhid") if not swhid: return None - return parse_swhid(swhid) + swhid_reference = parse_swhid(swhid) + + if swhid_reference.metadata: + anchor = swhid_reference.metadata.get("anchor") + if anchor: + anchor_swhid = parse_swhid(anchor) + if anchor_swhid.object_type not in ALLOWED_QUALIFIERS_NODE_TYPE: + error_msg = ( + "anchor qualifier should be a core SWHID with type one of " + f" {', '.join(ALLOWED_QUALIFIERS_NODE_TYPE)}" + ) + raise ValidationError(error_msg) + + visit = swhid_reference.metadata.get("visit") + if visit: + visit_swhid = parse_swhid(visit) + if visit_swhid.object_type != SNAPSHOT: + raise ValidationError( + f"visit qualifier should be a core SWHID with type {SNAPSHOT}" + ) + + return swhid_reference diff --git a/swh/deposit/tests/api/test_parsers.py b/swh/deposit/tests/api/test_parsers.py --- a/swh/deposit/tests/api/test_parsers.py +++ b/swh/deposit/tests/api/test_parsers.py @@ -205,7 +205,11 @@ @pytest.mark.parametrize( "swhid", [ - "swh:1:dir:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;origin=https://hal.archives-ouvertes.fr/hal-01243573;visit=swh:1:snp:4fc1e36fca86b2070204bedd51106014a614f321;anchor=swh:1:rev:9c5de20cfb54682370a398fcc733e829903c8cba;path=/moranegg-AffectationRO-df7f68b/", # noqa + "swh:1:cnt:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;origin=https://hal.archives-ouvertes.fr/hal-01243573;visit=swh:1:snp:4fc1e36fca86b2070204bedd51106014a614f321;anchor=swh:1:rev:9c5de20cfb54682370a398fcc733e829903c8cba;path=/moranegg-AffectationRO-df7f68b/", # noqa + "swh:1:dir:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:dir:9c5de20cfb54682370a398fcc733e829903c8cba", # noqa + "swh:1:rev:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:rev:9c5de20cfb54682370a398fcc733e829903c8cba", # noqa + "swh:1:rel:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:rel:9c5de20cfb54682370a398fcc733e829903c8cba", # noqa + "swh:1:snp:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:snp:9c5de20cfb54682370a398fcc733e829903c8cba", # noqa "swh:1:dir:31b5c8cc985d190b5a7ef4878128ebfdc2358f49", ], ) @@ -220,13 +224,26 @@ assert actual_swhid == expected_swhid -def test_parse_swh_reference_invalid_swhid(xml_with_swhid): +@pytest.mark.parametrize( + "invalid_swhid,error_msg", + [ + ("swh:1:cnt:31b5c8cc985d190b5a7ef4878128ebfdc235", "Unexpected length"), + ( + "swh:1:dir:c4993c872593e960dc84e4430dbbfbc34fd706d0;visit=swh:1:rev:0175049fc45055a3824a1675ac06e3711619a55a", # noqa + "visit qualifier should be a core SWHID with type", + ), + ( + "swh:1:rev:c4993c872593e960dc84e4430dbbfbc34fd706d0;anchor=swh:1:cnt:b5f505b005435fa5c4fa4c279792bd7b17167c04;path=/", # noqa + "anchor qualifier should be a core SWHID with type one of", + ), # noqa + ], +) +def test_parse_swh_reference_invalid_swhid(invalid_swhid, error_msg, xml_with_swhid): """Unparsable swhid should raise """ - invalid_swhid = "swh:1:dir:31b5c8cc985d190b5a7ef4878128ebfdc235" xml_invalid_swhid = xml_with_swhid.format(swhid=invalid_swhid) metadata = parse_xml(xml_invalid_swhid) - with pytest.raises(ValidationError, match="Unexpected length"): + with pytest.raises(ValidationError, match=error_msg): parse_swh_reference(metadata)