diff --git a/swh/deposit/parsers.py b/swh/deposit/parsers.py index e8d8a0a0..2369238d 100644 --- a/swh/deposit/parsers.py +++ b/swh/deposit/parsers.py @@ -1,155 +1,187 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Module in charge of defining parsers with SWORD 2.0 supported mediatypes. """ from typing import Dict, Optional, Union from xml.parsers.expat import ExpatError from django.conf import settings from rest_framework.parsers import BaseParser, FileUploadParser, MultiPartParser import xmltodict from swh.deposit.errors import ParserError -from swh.model.identifiers import SWHID, parse_swhid +from swh.model.exceptions import ValidationError +from swh.model.identifiers import ( + DIRECTORY, + RELEASE, + REVISION, + SNAPSHOT, + SWHID, + parse_swhid, +) class SWHFileUploadZipParser(FileUploadParser): """File upload parser limited to zip archive. """ media_type = "application/zip" class SWHFileUploadTarParser(FileUploadParser): """File upload parser limited to tarball (tar, tar.gz, tar.*) archives. """ media_type = "application/x-tar" class SWHXMLParser(BaseParser): """ XML parser. """ media_type = "application/xml" def parse(self, stream, media_type=None, parser_context=None): """ Parses the incoming bytestream as XML and returns the resulting data. """ parser_context = parser_context or {} encoding = parser_context.get("encoding", settings.DEFAULT_CHARSET) namespaces = { "http://www.w3.org/2005/Atom": None, "http://purl.org/dc/terms/": None, "https://doi.org/10.5063/SCHEMA/CODEMETA-2.0": "codemeta", "http://purl.org/net/sword/": "sword", "https://www.softwareheritage.org/schema/2018/deposit": "swh", } data = xmltodict.parse( stream, encoding=encoding, namespaces=namespaces, process_namespaces=True ) if "entry" in data: data = data["entry"] return data class SWHAtomEntryParser(SWHXMLParser): """Atom entry parser limited to specific mediatype """ media_type = "application/atom+xml;type=entry" def parse(self, stream, media_type=None, parser_context=None): # We do not actually want to parse the stream yet # because we want to keep the raw data as well # this is done later in the atom entry call # (cf. swh.deposit.api.common.APIBase._atom_entry) return stream class SWHMultiPartParser(MultiPartParser): """Multipart parser limited to a subset of mediatypes. """ media_type = "multipart/*; *" def parse_xml(raw_content): """Parse xml body. Args: raw_content (bytes): The content to parse Raises: ParserError in case of a malformed xml Returns: content parsed as dict. """ try: return SWHXMLParser().parse(raw_content) except ExpatError as e: raise ParserError(str(e)) +ALLOWED_QUALIFIERS_NODE_TYPE = (SNAPSHOT, REVISION, RELEASE, DIRECTORY) + + def parse_swh_reference(metadata: Dict) -> Optional[Union[str, SWHID]]: """Parse swh reference within the metadata dict (or origin) reference if found, None otherwise. or: Raises: ValidationError in case the swhid referenced (if any) is invalid Returns: Either swhid or origin reference if any. None otherwise. """ # noqa swh_deposit = metadata.get("swh:deposit") if not swh_deposit: return None swh_reference = swh_deposit.get("swh:reference") if not swh_reference: return None swh_origin = swh_reference.get("swh:origin") if swh_origin: url = swh_origin.get("@url") if url: return url swh_object = swh_reference.get("swh:object") if not swh_object: return None swhid = swh_object.get("@swhid") if not swhid: return None - return parse_swhid(swhid) + swhid_reference = parse_swhid(swhid) + + if swhid_reference.metadata: + anchor = swhid_reference.metadata.get("anchor") + if anchor: + anchor_swhid = parse_swhid(anchor) + if anchor_swhid.object_type not in ALLOWED_QUALIFIERS_NODE_TYPE: + error_msg = ( + "anchor qualifier should be a core SWHID with type one of " + f" {', '.join(ALLOWED_QUALIFIERS_NODE_TYPE)}" + ) + raise ValidationError(error_msg) + + visit = swhid_reference.metadata.get("visit") + if visit: + visit_swhid = parse_swhid(visit) + if visit_swhid.object_type != SNAPSHOT: + raise ValidationError( + f"visit qualifier should be a core SWHID with type {SNAPSHOT}" + ) + + return swhid_reference diff --git a/swh/deposit/tests/api/test_parsers.py b/swh/deposit/tests/api/test_parsers.py index 1806139c..374b2c5f 100644 --- a/swh/deposit/tests/api/test_parsers.py +++ b/swh/deposit/tests/api/test_parsers.py @@ -1,232 +1,249 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import OrderedDict import io import pytest from swh.deposit.parsers import SWHXMLParser, parse_swh_reference, parse_xml from swh.model.exceptions import ValidationError from swh.model.identifiers import parse_swhid def test_parsing_without_duplicates(): xml_no_duplicate = io.BytesIO( b""" Awesome Compiler GPL3.0 https://opensource.org/licenses/GPL-3.0 Python3 author1 Inria ocaml http://issuetracker.com """ ) actual_result = SWHXMLParser().parse(xml_no_duplicate) expected_dict = OrderedDict( [ ("title", "Awesome Compiler"), ( "codemeta:license", OrderedDict( [ ("codemeta:name", "GPL3.0"), ("codemeta:url", "https://opensource.org/licenses/GPL-3.0"), ] ), ), ("codemeta:runtimePlatform", "Python3"), ( "codemeta:author", OrderedDict( [("codemeta:name", "author1"), ("codemeta:affiliation", "Inria")] ), ), ("codemeta:programmingLanguage", "ocaml"), ("codemeta:issueTracker", "http://issuetracker.com"), ] ) assert expected_dict == actual_result def test_parsing_with_duplicates(): xml_with_duplicates = io.BytesIO( b""" Another Compiler GNU/Linux GPL3.0 https://opensource.org/licenses/GPL-3.0 Un*x author1 Inria author2 Inria ocaml haskell spdx http://spdx.org python3 """ ) actual_result = SWHXMLParser().parse(xml_with_duplicates) expected_dict = OrderedDict( [ ("title", "Another Compiler"), ("codemeta:runtimePlatform", ["GNU/Linux", "Un*x"]), ( "codemeta:license", [ OrderedDict( [ ("codemeta:name", "GPL3.0"), ("codemeta:url", "https://opensource.org/licenses/GPL-3.0"), ] ), OrderedDict( [("codemeta:name", "spdx"), ("codemeta:url", "http://spdx.org")] ), ], ), ( "codemeta:author", [ OrderedDict( [ ("codemeta:name", "author1"), ("codemeta:affiliation", "Inria"), ] ), OrderedDict( [ ("codemeta:name", "author2"), ("codemeta:affiliation", "Inria"), ] ), ], ), ("codemeta:programmingLanguage", ["ocaml", "haskell", "python3"]), ] ) assert expected_dict == actual_result @pytest.fixture def xml_with_origin_reference(): xml_data = """ """ return xml_data.strip() def test_parse_swh_reference_origin(xml_with_origin_reference): url = "https://url" xml_data = xml_with_origin_reference.format(url=url) metadata = parse_xml(xml_data) actual_origin = parse_swh_reference(metadata) assert actual_origin == url @pytest.fixture def xml_with_empty_reference(): xml_data = """ {swh_reference} """ return xml_data.strip() @pytest.mark.parametrize( "xml_ref", [ "", "", "", """""", ], ) def test_parse_swh_reference_empty(xml_with_empty_reference, xml_ref): xml_body = xml_with_empty_reference.format(swh_reference=xml_ref) metadata = parse_xml(xml_body) assert parse_swh_reference(metadata) is None @pytest.fixture def xml_with_swhid(): xml_data = """ """ return xml_data.strip() @pytest.mark.parametrize( "swhid", [ - "swh:1:dir:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;origin=https://hal.archives-ouvertes.fr/hal-01243573;visit=swh:1:snp:4fc1e36fca86b2070204bedd51106014a614f321;anchor=swh:1:rev:9c5de20cfb54682370a398fcc733e829903c8cba;path=/moranegg-AffectationRO-df7f68b/", # noqa + "swh:1:cnt:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;origin=https://hal.archives-ouvertes.fr/hal-01243573;visit=swh:1:snp:4fc1e36fca86b2070204bedd51106014a614f321;anchor=swh:1:rev:9c5de20cfb54682370a398fcc733e829903c8cba;path=/moranegg-AffectationRO-df7f68b/", # noqa + "swh:1:dir:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:dir:9c5de20cfb54682370a398fcc733e829903c8cba", # noqa + "swh:1:rev:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:rev:9c5de20cfb54682370a398fcc733e829903c8cba", # noqa + "swh:1:rel:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:rel:9c5de20cfb54682370a398fcc733e829903c8cba", # noqa + "swh:1:snp:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:snp:9c5de20cfb54682370a398fcc733e829903c8cba", # noqa "swh:1:dir:31b5c8cc985d190b5a7ef4878128ebfdc2358f49", ], ) def test_parse_swh_reference_swhid(swhid, xml_with_swhid): xml_data = xml_with_swhid.format(swhid=swhid) metadata = parse_xml(xml_data) actual_swhid = parse_swh_reference(metadata) assert actual_swhid is not None expected_swhid = parse_swhid(swhid) assert actual_swhid == expected_swhid -def test_parse_swh_reference_invalid_swhid(xml_with_swhid): +@pytest.mark.parametrize( + "invalid_swhid,error_msg", + [ + ("swh:1:cnt:31b5c8cc985d190b5a7ef4878128ebfdc235", "Unexpected length"), + ( + "swh:1:dir:c4993c872593e960dc84e4430dbbfbc34fd706d0;visit=swh:1:rev:0175049fc45055a3824a1675ac06e3711619a55a", # noqa + "visit qualifier should be a core SWHID with type", + ), + ( + "swh:1:rev:c4993c872593e960dc84e4430dbbfbc34fd706d0;anchor=swh:1:cnt:b5f505b005435fa5c4fa4c279792bd7b17167c04;path=/", # noqa + "anchor qualifier should be a core SWHID with type one of", + ), # noqa + ], +) +def test_parse_swh_reference_invalid_swhid(invalid_swhid, error_msg, xml_with_swhid): """Unparsable swhid should raise """ - invalid_swhid = "swh:1:dir:31b5c8cc985d190b5a7ef4878128ebfdc235" xml_invalid_swhid = xml_with_swhid.format(swhid=invalid_swhid) metadata = parse_xml(xml_invalid_swhid) - with pytest.raises(ValidationError, match="Unexpected length"): + with pytest.raises(ValidationError, match=error_msg): parse_swh_reference(metadata)