diff --git a/swh/deposit/parsers.py b/swh/deposit/parsers.py
index e8d8a0a0..2369238d 100644
--- a/swh/deposit/parsers.py
+++ b/swh/deposit/parsers.py
@@ -1,155 +1,187 @@
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Module in charge of defining parsers with SWORD 2.0 supported mediatypes.
"""
from typing import Dict, Optional, Union
from xml.parsers.expat import ExpatError
from django.conf import settings
from rest_framework.parsers import BaseParser, FileUploadParser, MultiPartParser
import xmltodict
from swh.deposit.errors import ParserError
-from swh.model.identifiers import SWHID, parse_swhid
+from swh.model.exceptions import ValidationError
+from swh.model.identifiers import (
+ DIRECTORY,
+ RELEASE,
+ REVISION,
+ SNAPSHOT,
+ SWHID,
+ parse_swhid,
+)
class SWHFileUploadZipParser(FileUploadParser):
"""File upload parser limited to zip archive.
"""
media_type = "application/zip"
class SWHFileUploadTarParser(FileUploadParser):
"""File upload parser limited to tarball (tar, tar.gz, tar.*) archives.
"""
media_type = "application/x-tar"
class SWHXMLParser(BaseParser):
"""
XML parser.
"""
media_type = "application/xml"
def parse(self, stream, media_type=None, parser_context=None):
"""
Parses the incoming bytestream as XML and returns the resulting data.
"""
parser_context = parser_context or {}
encoding = parser_context.get("encoding", settings.DEFAULT_CHARSET)
namespaces = {
"http://www.w3.org/2005/Atom": None,
"http://purl.org/dc/terms/": None,
"https://doi.org/10.5063/SCHEMA/CODEMETA-2.0": "codemeta",
"http://purl.org/net/sword/": "sword",
"https://www.softwareheritage.org/schema/2018/deposit": "swh",
}
data = xmltodict.parse(
stream, encoding=encoding, namespaces=namespaces, process_namespaces=True
)
if "entry" in data:
data = data["entry"]
return data
class SWHAtomEntryParser(SWHXMLParser):
"""Atom entry parser limited to specific mediatype
"""
media_type = "application/atom+xml;type=entry"
def parse(self, stream, media_type=None, parser_context=None):
# We do not actually want to parse the stream yet
# because we want to keep the raw data as well
# this is done later in the atom entry call
# (cf. swh.deposit.api.common.APIBase._atom_entry)
return stream
class SWHMultiPartParser(MultiPartParser):
"""Multipart parser limited to a subset of mediatypes.
"""
media_type = "multipart/*; *"
def parse_xml(raw_content):
"""Parse xml body.
Args:
raw_content (bytes): The content to parse
Raises:
ParserError in case of a malformed xml
Returns:
content parsed as dict.
"""
try:
return SWHXMLParser().parse(raw_content)
except ExpatError as e:
raise ParserError(str(e))
+ALLOWED_QUALIFIERS_NODE_TYPE = (SNAPSHOT, REVISION, RELEASE, DIRECTORY)
+
+
def parse_swh_reference(metadata: Dict) -> Optional[Union[str, SWHID]]:
"""Parse swh reference within the metadata dict (or origin) reference if found, None
otherwise.
or:
Raises:
ValidationError in case the swhid referenced (if any) is invalid
Returns:
Either swhid or origin reference if any. None otherwise.
""" # noqa
swh_deposit = metadata.get("swh:deposit")
if not swh_deposit:
return None
swh_reference = swh_deposit.get("swh:reference")
if not swh_reference:
return None
swh_origin = swh_reference.get("swh:origin")
if swh_origin:
url = swh_origin.get("@url")
if url:
return url
swh_object = swh_reference.get("swh:object")
if not swh_object:
return None
swhid = swh_object.get("@swhid")
if not swhid:
return None
- return parse_swhid(swhid)
+ swhid_reference = parse_swhid(swhid)
+
+ if swhid_reference.metadata:
+ anchor = swhid_reference.metadata.get("anchor")
+ if anchor:
+ anchor_swhid = parse_swhid(anchor)
+ if anchor_swhid.object_type not in ALLOWED_QUALIFIERS_NODE_TYPE:
+ error_msg = (
+ "anchor qualifier should be a core SWHID with type one of "
+ f" {', '.join(ALLOWED_QUALIFIERS_NODE_TYPE)}"
+ )
+ raise ValidationError(error_msg)
+
+ visit = swhid_reference.metadata.get("visit")
+ if visit:
+ visit_swhid = parse_swhid(visit)
+ if visit_swhid.object_type != SNAPSHOT:
+ raise ValidationError(
+ f"visit qualifier should be a core SWHID with type {SNAPSHOT}"
+ )
+
+ return swhid_reference
diff --git a/swh/deposit/tests/api/test_parsers.py b/swh/deposit/tests/api/test_parsers.py
index 1806139c..374b2c5f 100644
--- a/swh/deposit/tests/api/test_parsers.py
+++ b/swh/deposit/tests/api/test_parsers.py
@@ -1,232 +1,249 @@
# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import OrderedDict
import io
import pytest
from swh.deposit.parsers import SWHXMLParser, parse_swh_reference, parse_xml
from swh.model.exceptions import ValidationError
from swh.model.identifiers import parse_swhid
def test_parsing_without_duplicates():
xml_no_duplicate = io.BytesIO(
b"""
Awesome Compiler
GPL3.0
https://opensource.org/licenses/GPL-3.0
Python3
author1
Inria
ocaml
http://issuetracker.com
"""
)
actual_result = SWHXMLParser().parse(xml_no_duplicate)
expected_dict = OrderedDict(
[
("title", "Awesome Compiler"),
(
"codemeta:license",
OrderedDict(
[
("codemeta:name", "GPL3.0"),
("codemeta:url", "https://opensource.org/licenses/GPL-3.0"),
]
),
),
("codemeta:runtimePlatform", "Python3"),
(
"codemeta:author",
OrderedDict(
[("codemeta:name", "author1"), ("codemeta:affiliation", "Inria")]
),
),
("codemeta:programmingLanguage", "ocaml"),
("codemeta:issueTracker", "http://issuetracker.com"),
]
)
assert expected_dict == actual_result
def test_parsing_with_duplicates():
xml_with_duplicates = io.BytesIO(
b"""
Another Compiler
GNU/Linux
GPL3.0
https://opensource.org/licenses/GPL-3.0
Un*x
author1
Inria
author2
Inria
ocaml
haskell
spdx
http://spdx.org
python3
"""
)
actual_result = SWHXMLParser().parse(xml_with_duplicates)
expected_dict = OrderedDict(
[
("title", "Another Compiler"),
("codemeta:runtimePlatform", ["GNU/Linux", "Un*x"]),
(
"codemeta:license",
[
OrderedDict(
[
("codemeta:name", "GPL3.0"),
("codemeta:url", "https://opensource.org/licenses/GPL-3.0"),
]
),
OrderedDict(
[("codemeta:name", "spdx"), ("codemeta:url", "http://spdx.org")]
),
],
),
(
"codemeta:author",
[
OrderedDict(
[
("codemeta:name", "author1"),
("codemeta:affiliation", "Inria"),
]
),
OrderedDict(
[
("codemeta:name", "author2"),
("codemeta:affiliation", "Inria"),
]
),
],
),
("codemeta:programmingLanguage", ["ocaml", "haskell", "python3"]),
]
)
assert expected_dict == actual_result
@pytest.fixture
def xml_with_origin_reference():
xml_data = """
"""
return xml_data.strip()
def test_parse_swh_reference_origin(xml_with_origin_reference):
url = "https://url"
xml_data = xml_with_origin_reference.format(url=url)
metadata = parse_xml(xml_data)
actual_origin = parse_swh_reference(metadata)
assert actual_origin == url
@pytest.fixture
def xml_with_empty_reference():
xml_data = """
{swh_reference}
"""
return xml_data.strip()
@pytest.mark.parametrize(
"xml_ref",
[
"",
"",
"",
"""""",
],
)
def test_parse_swh_reference_empty(xml_with_empty_reference, xml_ref):
xml_body = xml_with_empty_reference.format(swh_reference=xml_ref)
metadata = parse_xml(xml_body)
assert parse_swh_reference(metadata) is None
@pytest.fixture
def xml_with_swhid():
xml_data = """
"""
return xml_data.strip()
@pytest.mark.parametrize(
"swhid",
[
- "swh:1:dir:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;origin=https://hal.archives-ouvertes.fr/hal-01243573;visit=swh:1:snp:4fc1e36fca86b2070204bedd51106014a614f321;anchor=swh:1:rev:9c5de20cfb54682370a398fcc733e829903c8cba;path=/moranegg-AffectationRO-df7f68b/", # noqa
+ "swh:1:cnt:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;origin=https://hal.archives-ouvertes.fr/hal-01243573;visit=swh:1:snp:4fc1e36fca86b2070204bedd51106014a614f321;anchor=swh:1:rev:9c5de20cfb54682370a398fcc733e829903c8cba;path=/moranegg-AffectationRO-df7f68b/", # noqa
+ "swh:1:dir:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:dir:9c5de20cfb54682370a398fcc733e829903c8cba", # noqa
+ "swh:1:rev:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:rev:9c5de20cfb54682370a398fcc733e829903c8cba", # noqa
+ "swh:1:rel:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:rel:9c5de20cfb54682370a398fcc733e829903c8cba", # noqa
+ "swh:1:snp:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:snp:9c5de20cfb54682370a398fcc733e829903c8cba", # noqa
"swh:1:dir:31b5c8cc985d190b5a7ef4878128ebfdc2358f49",
],
)
def test_parse_swh_reference_swhid(swhid, xml_with_swhid):
xml_data = xml_with_swhid.format(swhid=swhid)
metadata = parse_xml(xml_data)
actual_swhid = parse_swh_reference(metadata)
assert actual_swhid is not None
expected_swhid = parse_swhid(swhid)
assert actual_swhid == expected_swhid
-def test_parse_swh_reference_invalid_swhid(xml_with_swhid):
+@pytest.mark.parametrize(
+ "invalid_swhid,error_msg",
+ [
+ ("swh:1:cnt:31b5c8cc985d190b5a7ef4878128ebfdc235", "Unexpected length"),
+ (
+ "swh:1:dir:c4993c872593e960dc84e4430dbbfbc34fd706d0;visit=swh:1:rev:0175049fc45055a3824a1675ac06e3711619a55a", # noqa
+ "visit qualifier should be a core SWHID with type",
+ ),
+ (
+ "swh:1:rev:c4993c872593e960dc84e4430dbbfbc34fd706d0;anchor=swh:1:cnt:b5f505b005435fa5c4fa4c279792bd7b17167c04;path=/", # noqa
+ "anchor qualifier should be a core SWHID with type one of",
+ ), # noqa
+ ],
+)
+def test_parse_swh_reference_invalid_swhid(invalid_swhid, error_msg, xml_with_swhid):
"""Unparsable swhid should raise
"""
- invalid_swhid = "swh:1:dir:31b5c8cc985d190b5a7ef4878128ebfdc235"
xml_invalid_swhid = xml_with_swhid.format(swhid=invalid_swhid)
metadata = parse_xml(xml_invalid_swhid)
- with pytest.raises(ValidationError, match="Unexpected length"):
+ with pytest.raises(ValidationError, match=error_msg):
parse_swh_reference(metadata)