diff --git a/swh/deposit/parsers.py b/swh/deposit/parsers.py index 2369238d..28c5b357 100644 --- a/swh/deposit/parsers.py +++ b/swh/deposit/parsers.py @@ -1,187 +1,207 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Module in charge of defining parsers with SWORD 2.0 supported mediatypes. """ +import logging from typing import Dict, Optional, Union from xml.parsers.expat import ExpatError from django.conf import settings from rest_framework.parsers import BaseParser, FileUploadParser, MultiPartParser import xmltodict from swh.deposit.errors import ParserError from swh.model.exceptions import ValidationError from swh.model.identifiers import ( DIRECTORY, RELEASE, REVISION, SNAPSHOT, SWHID, parse_swhid, ) +logger = logging.getLogger(__name__) + class SWHFileUploadZipParser(FileUploadParser): """File upload parser limited to zip archive. """ media_type = "application/zip" class SWHFileUploadTarParser(FileUploadParser): """File upload parser limited to tarball (tar, tar.gz, tar.*) archives. """ media_type = "application/x-tar" class SWHXMLParser(BaseParser): """ XML parser. """ media_type = "application/xml" def parse(self, stream, media_type=None, parser_context=None): """ Parses the incoming bytestream as XML and returns the resulting data. """ parser_context = parser_context or {} encoding = parser_context.get("encoding", settings.DEFAULT_CHARSET) namespaces = { "http://www.w3.org/2005/Atom": None, "http://purl.org/dc/terms/": None, "https://doi.org/10.5063/SCHEMA/CODEMETA-2.0": "codemeta", "http://purl.org/net/sword/": "sword", "https://www.softwareheritage.org/schema/2018/deposit": "swh", } data = xmltodict.parse( stream, encoding=encoding, namespaces=namespaces, process_namespaces=True ) if "entry" in data: data = data["entry"] return data class SWHAtomEntryParser(SWHXMLParser): """Atom entry parser limited to specific mediatype """ media_type = "application/atom+xml;type=entry" def parse(self, stream, media_type=None, parser_context=None): # We do not actually want to parse the stream yet # because we want to keep the raw data as well # this is done later in the atom entry call # (cf. swh.deposit.api.common.APIBase._atom_entry) return stream class SWHMultiPartParser(MultiPartParser): """Multipart parser limited to a subset of mediatypes. """ media_type = "multipart/*; *" def parse_xml(raw_content): """Parse xml body. Args: raw_content (bytes): The content to parse Raises: ParserError in case of a malformed xml Returns: content parsed as dict. """ try: return SWHXMLParser().parse(raw_content) except ExpatError as e: raise ParserError(str(e)) ALLOWED_QUALIFIERS_NODE_TYPE = (SNAPSHOT, REVISION, RELEASE, DIRECTORY) def parse_swh_reference(metadata: Dict) -> Optional[Union[str, SWHID]]: """Parse swh reference within the metadata dict (or origin) reference if found, None otherwise. or: Raises: ValidationError in case the swhid referenced (if any) is invalid Returns: Either swhid or origin reference if any. None otherwise. """ # noqa + visit_swhid = None + anchor_swhid = None + swh_deposit = metadata.get("swh:deposit") if not swh_deposit: return None swh_reference = swh_deposit.get("swh:reference") if not swh_reference: return None swh_origin = swh_reference.get("swh:origin") if swh_origin: url = swh_origin.get("@url") if url: return url swh_object = swh_reference.get("swh:object") if not swh_object: return None swhid = swh_object.get("@swhid") if not swhid: return None swhid_reference = parse_swhid(swhid) if swhid_reference.metadata: anchor = swhid_reference.metadata.get("anchor") if anchor: anchor_swhid = parse_swhid(anchor) if anchor_swhid.object_type not in ALLOWED_QUALIFIERS_NODE_TYPE: error_msg = ( "anchor qualifier should be a core SWHID with type one of " f" {', '.join(ALLOWED_QUALIFIERS_NODE_TYPE)}" ) raise ValidationError(error_msg) visit = swhid_reference.metadata.get("visit") if visit: visit_swhid = parse_swhid(visit) if visit_swhid.object_type != SNAPSHOT: raise ValidationError( f"visit qualifier should be a core SWHID with type {SNAPSHOT}" ) + if ( + visit_swhid + and anchor_swhid + and visit_swhid.object_type == SNAPSHOT + and anchor_swhid.object_type == SNAPSHOT + ): + logger.warn( + "SWHID use of both anchor and visit targeting " + f"a snapshot: {swhid_reference}" + ) + raise ValidationError( + "'anchor=swh:1:snp:' is not supported when 'visit' is also provided." + ) + return swhid_reference diff --git a/swh/deposit/tests/api/test_parsers.py b/swh/deposit/tests/api/test_parsers.py index 765584ff..a72d6923 100644 --- a/swh/deposit/tests/api/test_parsers.py +++ b/swh/deposit/tests/api/test_parsers.py @@ -1,238 +1,242 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import OrderedDict import io import pytest from swh.deposit.parsers import SWHXMLParser, parse_swh_reference, parse_xml from swh.model.exceptions import ValidationError from swh.model.identifiers import parse_swhid def test_parsing_without_duplicates(): xml_no_duplicate = io.BytesIO( b""" Awesome Compiler GPL3.0 https://opensource.org/licenses/GPL-3.0 Python3 author1 Inria ocaml http://issuetracker.com """ ) actual_result = SWHXMLParser().parse(xml_no_duplicate) expected_dict = OrderedDict( [ ("title", "Awesome Compiler"), ( "codemeta:license", OrderedDict( [ ("codemeta:name", "GPL3.0"), ("codemeta:url", "https://opensource.org/licenses/GPL-3.0"), ] ), ), ("codemeta:runtimePlatform", "Python3"), ( "codemeta:author", OrderedDict( [("codemeta:name", "author1"), ("codemeta:affiliation", "Inria")] ), ), ("codemeta:programmingLanguage", "ocaml"), ("codemeta:issueTracker", "http://issuetracker.com"), ] ) assert expected_dict == actual_result def test_parsing_with_duplicates(): xml_with_duplicates = io.BytesIO( b""" Another Compiler GNU/Linux GPL3.0 https://opensource.org/licenses/GPL-3.0 Un*x author1 Inria author2 Inria ocaml haskell spdx http://spdx.org python3 """ ) actual_result = SWHXMLParser().parse(xml_with_duplicates) expected_dict = OrderedDict( [ ("title", "Another Compiler"), ("codemeta:runtimePlatform", ["GNU/Linux", "Un*x"]), ( "codemeta:license", [ OrderedDict( [ ("codemeta:name", "GPL3.0"), ("codemeta:url", "https://opensource.org/licenses/GPL-3.0"), ] ), OrderedDict( [("codemeta:name", "spdx"), ("codemeta:url", "http://spdx.org")] ), ], ), ( "codemeta:author", [ OrderedDict( [ ("codemeta:name", "author1"), ("codemeta:affiliation", "Inria"), ] ), OrderedDict( [ ("codemeta:name", "author2"), ("codemeta:affiliation", "Inria"), ] ), ], ), ("codemeta:programmingLanguage", ["ocaml", "haskell", "python3"]), ] ) assert expected_dict == actual_result @pytest.fixture def xml_with_origin_reference(): xml_data = """ """ return xml_data.strip() def test_parse_swh_reference_origin(xml_with_origin_reference): url = "https://url" xml_data = xml_with_origin_reference.format(url=url) metadata = parse_xml(xml_data) actual_origin = parse_swh_reference(metadata) assert actual_origin == url @pytest.fixture def xml_with_empty_reference(): xml_data = """ {swh_reference} """ return xml_data.strip() @pytest.mark.parametrize( "xml_ref", [ "", "", "", """""", ], ) def test_parse_swh_reference_empty(xml_with_empty_reference, xml_ref): xml_body = xml_with_empty_reference.format(swh_reference=xml_ref) metadata = parse_xml(xml_body) assert parse_swh_reference(metadata) is None @pytest.fixture def xml_with_swhid(atom_dataset): return atom_dataset["entry-data-with-swhid"] @pytest.mark.parametrize( "swhid", [ "swh:1:cnt:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;origin=https://hal.archives-ouvertes.fr/hal-01243573;visit=swh:1:snp:4fc1e36fca86b2070204bedd51106014a614f321;anchor=swh:1:rev:9c5de20cfb54682370a398fcc733e829903c8cba;path=/moranegg-AffectationRO-df7f68b/", # noqa "swh:1:dir:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:dir:9c5de20cfb54682370a398fcc733e829903c8cba", # noqa "swh:1:rev:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:rev:9c5de20cfb54682370a398fcc733e829903c8cba", # noqa "swh:1:rel:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:rel:9c5de20cfb54682370a398fcc733e829903c8cba", # noqa "swh:1:snp:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:snp:9c5de20cfb54682370a398fcc733e829903c8cba", # noqa "swh:1:dir:31b5c8cc985d190b5a7ef4878128ebfdc2358f49", ], ) def test_parse_swh_reference_swhid(swhid, xml_with_swhid): xml_data = xml_with_swhid.format(swhid=swhid) metadata = parse_xml(xml_data) actual_swhid = parse_swh_reference(metadata) assert actual_swhid is not None expected_swhid = parse_swhid(swhid) assert actual_swhid == expected_swhid @pytest.mark.parametrize( "invalid_swhid,error_msg", [ ("swh:1:cnt:31b5c8cc985d190b5a7ef4878128ebfdc235", "Unexpected length"), ( "swh:1:dir:c4993c872593e960dc84e4430dbbfbc34fd706d0;visit=swh:1:rev:0175049fc45055a3824a1675ac06e3711619a55a", # noqa "visit qualifier should be a core SWHID with type", ), ( "swh:1:rev:c4993c872593e960dc84e4430dbbfbc34fd706d0;anchor=swh:1:cnt:b5f505b005435fa5c4fa4c279792bd7b17167c04;path=/", # noqa "anchor qualifier should be a core SWHID with type one of", - ), # noqa + ), + ( + "swh:1:rev:c4993c872593e960dc84e4430dbbfbc34fd706d0;visit=swh:1:snp:0175049fc45055a3824a1675ac06e3711619a55a;anchor=swh:1:snp:b5f505b005435fa5c4fa4c279792bd7b17167c04", # noqa + "anchor=swh:1:snp", + ), ], ) def test_parse_swh_reference_invalid_swhid(invalid_swhid, error_msg, xml_with_swhid): """Unparsable swhid should raise """ xml_invalid_swhid = xml_with_swhid.format(swhid=invalid_swhid) metadata = parse_xml(xml_invalid_swhid) with pytest.raises(ValidationError, match=error_msg): parse_swh_reference(metadata)