diff --git a/swh/deposit/parsers.py b/swh/deposit/parsers.py
index 28c5b357..e86d65cd 100644
--- a/swh/deposit/parsers.py
+++ b/swh/deposit/parsers.py
@@ -1,207 +1,194 @@
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Module in charge of defining parsers with SWORD 2.0 supported mediatypes.
"""
import logging
from typing import Dict, Optional, Union
from xml.parsers.expat import ExpatError
from django.conf import settings
from rest_framework.parsers import BaseParser, FileUploadParser, MultiPartParser
-import xmltodict
from swh.deposit.errors import ParserError
+from swh.deposit.utils import parse_xml as _parse_xml
from swh.model.exceptions import ValidationError
from swh.model.identifiers import (
DIRECTORY,
RELEASE,
REVISION,
SNAPSHOT,
SWHID,
parse_swhid,
)
logger = logging.getLogger(__name__)
class SWHFileUploadZipParser(FileUploadParser):
"""File upload parser limited to zip archive.
"""
media_type = "application/zip"
class SWHFileUploadTarParser(FileUploadParser):
"""File upload parser limited to tarball (tar, tar.gz, tar.*) archives.
"""
media_type = "application/x-tar"
class SWHXMLParser(BaseParser):
"""
XML parser.
"""
media_type = "application/xml"
def parse(self, stream, media_type=None, parser_context=None):
"""
Parses the incoming bytestream as XML and returns the resulting data.
"""
parser_context = parser_context or {}
encoding = parser_context.get("encoding", settings.DEFAULT_CHARSET)
- namespaces = {
- "http://www.w3.org/2005/Atom": None,
- "http://purl.org/dc/terms/": None,
- "https://doi.org/10.5063/SCHEMA/CODEMETA-2.0": "codemeta",
- "http://purl.org/net/sword/": "sword",
- "https://www.softwareheritage.org/schema/2018/deposit": "swh",
- }
-
- data = xmltodict.parse(
- stream, encoding=encoding, namespaces=namespaces, process_namespaces=True
- )
- if "entry" in data:
- data = data["entry"]
- return data
+ return _parse_xml(stream, encoding=encoding)
class SWHAtomEntryParser(SWHXMLParser):
"""Atom entry parser limited to specific mediatype
"""
media_type = "application/atom+xml;type=entry"
def parse(self, stream, media_type=None, parser_context=None):
# We do not actually want to parse the stream yet
# because we want to keep the raw data as well
# this is done later in the atom entry call
# (cf. swh.deposit.api.common.APIBase._atom_entry)
return stream
class SWHMultiPartParser(MultiPartParser):
"""Multipart parser limited to a subset of mediatypes.
"""
media_type = "multipart/*; *"
def parse_xml(raw_content):
"""Parse xml body.
Args:
raw_content (bytes): The content to parse
Raises:
ParserError in case of a malformed xml
Returns:
content parsed as dict.
"""
try:
return SWHXMLParser().parse(raw_content)
except ExpatError as e:
raise ParserError(str(e))
ALLOWED_QUALIFIERS_NODE_TYPE = (SNAPSHOT, REVISION, RELEASE, DIRECTORY)
def parse_swh_reference(metadata: Dict) -> Optional[Union[str, SWHID]]:
"""Parse swh reference within the metadata dict (or origin) reference if found, None
otherwise.
or:
Raises:
ValidationError in case the swhid referenced (if any) is invalid
Returns:
Either swhid or origin reference if any. None otherwise.
""" # noqa
visit_swhid = None
anchor_swhid = None
swh_deposit = metadata.get("swh:deposit")
if not swh_deposit:
return None
swh_reference = swh_deposit.get("swh:reference")
if not swh_reference:
return None
swh_origin = swh_reference.get("swh:origin")
if swh_origin:
url = swh_origin.get("@url")
if url:
return url
swh_object = swh_reference.get("swh:object")
if not swh_object:
return None
swhid = swh_object.get("@swhid")
if not swhid:
return None
swhid_reference = parse_swhid(swhid)
if swhid_reference.metadata:
anchor = swhid_reference.metadata.get("anchor")
if anchor:
anchor_swhid = parse_swhid(anchor)
if anchor_swhid.object_type not in ALLOWED_QUALIFIERS_NODE_TYPE:
error_msg = (
"anchor qualifier should be a core SWHID with type one of "
f" {', '.join(ALLOWED_QUALIFIERS_NODE_TYPE)}"
)
raise ValidationError(error_msg)
visit = swhid_reference.metadata.get("visit")
if visit:
visit_swhid = parse_swhid(visit)
if visit_swhid.object_type != SNAPSHOT:
raise ValidationError(
f"visit qualifier should be a core SWHID with type {SNAPSHOT}"
)
if (
visit_swhid
and anchor_swhid
and visit_swhid.object_type == SNAPSHOT
and anchor_swhid.object_type == SNAPSHOT
):
logger.warn(
"SWHID use of both anchor and visit targeting "
f"a snapshot: {swhid_reference}"
)
raise ValidationError(
"'anchor=swh:1:snp:' is not supported when 'visit' is also provided."
)
return swhid_reference
diff --git a/swh/deposit/utils.py b/swh/deposit/utils.py
index e306902a..04229583 100644
--- a/swh/deposit/utils.py
+++ b/swh/deposit/utils.py
@@ -1,119 +1,137 @@
# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from types import GeneratorType
from typing import Any, Dict, Tuple, Union
import iso8601
+import xmltodict
from swh.model.identifiers import SWHID, normalize_timestamp, parse_swhid
from swh.model.model import MetadataTargetType
+def parse_xml(stream, encoding="utf-8"):
+ namespaces = {
+ "http://www.w3.org/2005/Atom": None,
+ "http://purl.org/dc/terms/": None,
+ "https://doi.org/10.5063/SCHEMA/CODEMETA-2.0": "codemeta",
+ "http://purl.org/net/sword/": "sword",
+ "https://www.softwareheritage.org/schema/2018/deposit": "swh",
+ }
+
+ data = xmltodict.parse(
+ stream, encoding=encoding, namespaces=namespaces, process_namespaces=True
+ )
+ if "entry" in data:
+ data = data["entry"]
+ return data
+
+
def merge(*dicts):
"""Given an iterator of dicts, merge them losing no information.
Args:
*dicts: arguments are all supposed to be dict to merge into one
Returns:
dict merged without losing information
"""
def _extend(existing_val, value):
"""Given an existing value and a value (as potential lists), merge
them together without repetition.
"""
if isinstance(value, (list, map, GeneratorType)):
vals = value
else:
vals = [value]
for v in vals:
if v in existing_val:
continue
existing_val.append(v)
return existing_val
d = {}
for data in dicts:
if not isinstance(data, dict):
raise ValueError("dicts is supposed to be a variable arguments of dict")
for key, value in data.items():
existing_val = d.get(key)
if not existing_val:
d[key] = value
continue
if isinstance(existing_val, (list, map, GeneratorType)):
new_val = _extend(existing_val, value)
elif isinstance(existing_val, dict):
if isinstance(value, dict):
new_val = merge(existing_val, value)
else:
new_val = _extend([existing_val], value)
else:
new_val = _extend([existing_val], value)
d[key] = new_val
return d
def normalize_date(date):
"""Normalize date fields as expected by swh workers.
If date is a list, elect arbitrarily the first element of that
list
If date is (then) a string, parse it through
dateutil.parser.parse to extract a datetime.
Then normalize it through
swh.model.identifiers.normalize_timestamp.
Returns
The swh date object
"""
if isinstance(date, list):
date = date[0]
if isinstance(date, str):
date = iso8601.parse_date(date)
return normalize_timestamp(date)
def compute_metadata_context(
swhid_reference: Union[SWHID, str]
) -> Tuple[MetadataTargetType, Dict[str, Any]]:
"""Given a SWHID object, determine the context as a dict.
The parse_swhid calls within are not expected to raise (because they should have
been caught early on).
"""
metadata_context: Dict[str, Any] = {"origin": None}
if isinstance(swhid_reference, SWHID):
object_type = MetadataTargetType(swhid_reference.object_type)
assert object_type != MetadataTargetType.ORIGIN
if swhid_reference.metadata:
path = swhid_reference.metadata.get("path")
metadata_context = {
"origin": swhid_reference.metadata.get("origin"),
"path": path.encode() if path else None,
}
snapshot = swhid_reference.metadata.get("visit")
if snapshot:
metadata_context["snapshot"] = parse_swhid(snapshot)
anchor = swhid_reference.metadata.get("anchor")
if anchor:
anchor_swhid = parse_swhid(anchor)
metadata_context[anchor_swhid.object_type] = anchor_swhid
else:
object_type = MetadataTargetType.ORIGIN
return object_type, metadata_context