diff --git a/swh/deposit/parsers.py b/swh/deposit/parsers.py
index 2369238d..28c5b357 100644
--- a/swh/deposit/parsers.py
+++ b/swh/deposit/parsers.py
@@ -1,187 +1,207 @@
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Module in charge of defining parsers with SWORD 2.0 supported mediatypes.
"""
+import logging
from typing import Dict, Optional, Union
from xml.parsers.expat import ExpatError
from django.conf import settings
from rest_framework.parsers import BaseParser, FileUploadParser, MultiPartParser
import xmltodict
from swh.deposit.errors import ParserError
from swh.model.exceptions import ValidationError
from swh.model.identifiers import (
DIRECTORY,
RELEASE,
REVISION,
SNAPSHOT,
SWHID,
parse_swhid,
)
+logger = logging.getLogger(__name__)
+
class SWHFileUploadZipParser(FileUploadParser):
"""File upload parser limited to zip archive.
"""
media_type = "application/zip"
class SWHFileUploadTarParser(FileUploadParser):
"""File upload parser limited to tarball (tar, tar.gz, tar.*) archives.
"""
media_type = "application/x-tar"
class SWHXMLParser(BaseParser):
"""
XML parser.
"""
media_type = "application/xml"
def parse(self, stream, media_type=None, parser_context=None):
"""
Parses the incoming bytestream as XML and returns the resulting data.
"""
parser_context = parser_context or {}
encoding = parser_context.get("encoding", settings.DEFAULT_CHARSET)
namespaces = {
"http://www.w3.org/2005/Atom": None,
"http://purl.org/dc/terms/": None,
"https://doi.org/10.5063/SCHEMA/CODEMETA-2.0": "codemeta",
"http://purl.org/net/sword/": "sword",
"https://www.softwareheritage.org/schema/2018/deposit": "swh",
}
data = xmltodict.parse(
stream, encoding=encoding, namespaces=namespaces, process_namespaces=True
)
if "entry" in data:
data = data["entry"]
return data
class SWHAtomEntryParser(SWHXMLParser):
"""Atom entry parser limited to specific mediatype
"""
media_type = "application/atom+xml;type=entry"
def parse(self, stream, media_type=None, parser_context=None):
# We do not actually want to parse the stream yet
# because we want to keep the raw data as well
# this is done later in the atom entry call
# (cf. swh.deposit.api.common.APIBase._atom_entry)
return stream
class SWHMultiPartParser(MultiPartParser):
"""Multipart parser limited to a subset of mediatypes.
"""
media_type = "multipart/*; *"
def parse_xml(raw_content):
"""Parse xml body.
Args:
raw_content (bytes): The content to parse
Raises:
ParserError in case of a malformed xml
Returns:
content parsed as dict.
"""
try:
return SWHXMLParser().parse(raw_content)
except ExpatError as e:
raise ParserError(str(e))
ALLOWED_QUALIFIERS_NODE_TYPE = (SNAPSHOT, REVISION, RELEASE, DIRECTORY)
def parse_swh_reference(metadata: Dict) -> Optional[Union[str, SWHID]]:
"""Parse swh reference within the metadata dict (or origin) reference if found, None
otherwise.
or:
Raises:
ValidationError in case the swhid referenced (if any) is invalid
Returns:
Either swhid or origin reference if any. None otherwise.
""" # noqa
+ visit_swhid = None
+ anchor_swhid = None
+
swh_deposit = metadata.get("swh:deposit")
if not swh_deposit:
return None
swh_reference = swh_deposit.get("swh:reference")
if not swh_reference:
return None
swh_origin = swh_reference.get("swh:origin")
if swh_origin:
url = swh_origin.get("@url")
if url:
return url
swh_object = swh_reference.get("swh:object")
if not swh_object:
return None
swhid = swh_object.get("@swhid")
if not swhid:
return None
swhid_reference = parse_swhid(swhid)
if swhid_reference.metadata:
anchor = swhid_reference.metadata.get("anchor")
if anchor:
anchor_swhid = parse_swhid(anchor)
if anchor_swhid.object_type not in ALLOWED_QUALIFIERS_NODE_TYPE:
error_msg = (
"anchor qualifier should be a core SWHID with type one of "
f" {', '.join(ALLOWED_QUALIFIERS_NODE_TYPE)}"
)
raise ValidationError(error_msg)
visit = swhid_reference.metadata.get("visit")
if visit:
visit_swhid = parse_swhid(visit)
if visit_swhid.object_type != SNAPSHOT:
raise ValidationError(
f"visit qualifier should be a core SWHID with type {SNAPSHOT}"
)
+ if (
+ visit_swhid
+ and anchor_swhid
+ and visit_swhid.object_type == SNAPSHOT
+ and anchor_swhid.object_type == SNAPSHOT
+ ):
+ logger.warn(
+ "SWHID use of both anchor and visit targeting "
+ f"a snapshot: {swhid_reference}"
+ )
+ raise ValidationError(
+ "'anchor=swh:1:snp:' is not supported when 'visit' is also provided."
+ )
+
return swhid_reference
diff --git a/swh/deposit/tests/api/test_parsers.py b/swh/deposit/tests/api/test_parsers.py
index 765584ff..a72d6923 100644
--- a/swh/deposit/tests/api/test_parsers.py
+++ b/swh/deposit/tests/api/test_parsers.py
@@ -1,238 +1,242 @@
# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import OrderedDict
import io
import pytest
from swh.deposit.parsers import SWHXMLParser, parse_swh_reference, parse_xml
from swh.model.exceptions import ValidationError
from swh.model.identifiers import parse_swhid
def test_parsing_without_duplicates():
xml_no_duplicate = io.BytesIO(
b"""
Awesome Compiler
GPL3.0
https://opensource.org/licenses/GPL-3.0
Python3
author1
Inria
ocaml
http://issuetracker.com
"""
)
actual_result = SWHXMLParser().parse(xml_no_duplicate)
expected_dict = OrderedDict(
[
("title", "Awesome Compiler"),
(
"codemeta:license",
OrderedDict(
[
("codemeta:name", "GPL3.0"),
("codemeta:url", "https://opensource.org/licenses/GPL-3.0"),
]
),
),
("codemeta:runtimePlatform", "Python3"),
(
"codemeta:author",
OrderedDict(
[("codemeta:name", "author1"), ("codemeta:affiliation", "Inria")]
),
),
("codemeta:programmingLanguage", "ocaml"),
("codemeta:issueTracker", "http://issuetracker.com"),
]
)
assert expected_dict == actual_result
def test_parsing_with_duplicates():
xml_with_duplicates = io.BytesIO(
b"""
Another Compiler
GNU/Linux
GPL3.0
https://opensource.org/licenses/GPL-3.0
Un*x
author1
Inria
author2
Inria
ocaml
haskell
spdx
http://spdx.org
python3
"""
)
actual_result = SWHXMLParser().parse(xml_with_duplicates)
expected_dict = OrderedDict(
[
("title", "Another Compiler"),
("codemeta:runtimePlatform", ["GNU/Linux", "Un*x"]),
(
"codemeta:license",
[
OrderedDict(
[
("codemeta:name", "GPL3.0"),
("codemeta:url", "https://opensource.org/licenses/GPL-3.0"),
]
),
OrderedDict(
[("codemeta:name", "spdx"), ("codemeta:url", "http://spdx.org")]
),
],
),
(
"codemeta:author",
[
OrderedDict(
[
("codemeta:name", "author1"),
("codemeta:affiliation", "Inria"),
]
),
OrderedDict(
[
("codemeta:name", "author2"),
("codemeta:affiliation", "Inria"),
]
),
],
),
("codemeta:programmingLanguage", ["ocaml", "haskell", "python3"]),
]
)
assert expected_dict == actual_result
@pytest.fixture
def xml_with_origin_reference():
xml_data = """
"""
return xml_data.strip()
def test_parse_swh_reference_origin(xml_with_origin_reference):
url = "https://url"
xml_data = xml_with_origin_reference.format(url=url)
metadata = parse_xml(xml_data)
actual_origin = parse_swh_reference(metadata)
assert actual_origin == url
@pytest.fixture
def xml_with_empty_reference():
xml_data = """
{swh_reference}
"""
return xml_data.strip()
@pytest.mark.parametrize(
"xml_ref",
[
"",
"",
"",
"""""",
],
)
def test_parse_swh_reference_empty(xml_with_empty_reference, xml_ref):
xml_body = xml_with_empty_reference.format(swh_reference=xml_ref)
metadata = parse_xml(xml_body)
assert parse_swh_reference(metadata) is None
@pytest.fixture
def xml_with_swhid(atom_dataset):
return atom_dataset["entry-data-with-swhid"]
@pytest.mark.parametrize(
"swhid",
[
"swh:1:cnt:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;origin=https://hal.archives-ouvertes.fr/hal-01243573;visit=swh:1:snp:4fc1e36fca86b2070204bedd51106014a614f321;anchor=swh:1:rev:9c5de20cfb54682370a398fcc733e829903c8cba;path=/moranegg-AffectationRO-df7f68b/", # noqa
"swh:1:dir:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:dir:9c5de20cfb54682370a398fcc733e829903c8cba", # noqa
"swh:1:rev:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:rev:9c5de20cfb54682370a398fcc733e829903c8cba", # noqa
"swh:1:rel:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:rel:9c5de20cfb54682370a398fcc733e829903c8cba", # noqa
"swh:1:snp:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:snp:9c5de20cfb54682370a398fcc733e829903c8cba", # noqa
"swh:1:dir:31b5c8cc985d190b5a7ef4878128ebfdc2358f49",
],
)
def test_parse_swh_reference_swhid(swhid, xml_with_swhid):
xml_data = xml_with_swhid.format(swhid=swhid)
metadata = parse_xml(xml_data)
actual_swhid = parse_swh_reference(metadata)
assert actual_swhid is not None
expected_swhid = parse_swhid(swhid)
assert actual_swhid == expected_swhid
@pytest.mark.parametrize(
"invalid_swhid,error_msg",
[
("swh:1:cnt:31b5c8cc985d190b5a7ef4878128ebfdc235", "Unexpected length"),
(
"swh:1:dir:c4993c872593e960dc84e4430dbbfbc34fd706d0;visit=swh:1:rev:0175049fc45055a3824a1675ac06e3711619a55a", # noqa
"visit qualifier should be a core SWHID with type",
),
(
"swh:1:rev:c4993c872593e960dc84e4430dbbfbc34fd706d0;anchor=swh:1:cnt:b5f505b005435fa5c4fa4c279792bd7b17167c04;path=/", # noqa
"anchor qualifier should be a core SWHID with type one of",
- ), # noqa
+ ),
+ (
+ "swh:1:rev:c4993c872593e960dc84e4430dbbfbc34fd706d0;visit=swh:1:snp:0175049fc45055a3824a1675ac06e3711619a55a;anchor=swh:1:snp:b5f505b005435fa5c4fa4c279792bd7b17167c04", # noqa
+ "anchor=swh:1:snp",
+ ),
],
)
def test_parse_swh_reference_invalid_swhid(invalid_swhid, error_msg, xml_with_swhid):
"""Unparsable swhid should raise
"""
xml_invalid_swhid = xml_with_swhid.format(swhid=invalid_swhid)
metadata = parse_xml(xml_invalid_swhid)
with pytest.raises(ValidationError, match=error_msg):
parse_swh_reference(metadata)