diff --git a/swh/deposit/parsers.py b/swh/deposit/parsers.py index 28c5b357..e86d65cd 100644 --- a/swh/deposit/parsers.py +++ b/swh/deposit/parsers.py @@ -1,207 +1,194 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Module in charge of defining parsers with SWORD 2.0 supported mediatypes. """ import logging from typing import Dict, Optional, Union from xml.parsers.expat import ExpatError from django.conf import settings from rest_framework.parsers import BaseParser, FileUploadParser, MultiPartParser -import xmltodict from swh.deposit.errors import ParserError +from swh.deposit.utils import parse_xml as _parse_xml from swh.model.exceptions import ValidationError from swh.model.identifiers import ( DIRECTORY, RELEASE, REVISION, SNAPSHOT, SWHID, parse_swhid, ) logger = logging.getLogger(__name__) class SWHFileUploadZipParser(FileUploadParser): """File upload parser limited to zip archive. """ media_type = "application/zip" class SWHFileUploadTarParser(FileUploadParser): """File upload parser limited to tarball (tar, tar.gz, tar.*) archives. """ media_type = "application/x-tar" class SWHXMLParser(BaseParser): """ XML parser. """ media_type = "application/xml" def parse(self, stream, media_type=None, parser_context=None): """ Parses the incoming bytestream as XML and returns the resulting data. """ parser_context = parser_context or {} encoding = parser_context.get("encoding", settings.DEFAULT_CHARSET) - namespaces = { - "http://www.w3.org/2005/Atom": None, - "http://purl.org/dc/terms/": None, - "https://doi.org/10.5063/SCHEMA/CODEMETA-2.0": "codemeta", - "http://purl.org/net/sword/": "sword", - "https://www.softwareheritage.org/schema/2018/deposit": "swh", - } - - data = xmltodict.parse( - stream, encoding=encoding, namespaces=namespaces, process_namespaces=True - ) - if "entry" in data: - data = data["entry"] - return data + return _parse_xml(stream, encoding=encoding) class SWHAtomEntryParser(SWHXMLParser): """Atom entry parser limited to specific mediatype """ media_type = "application/atom+xml;type=entry" def parse(self, stream, media_type=None, parser_context=None): # We do not actually want to parse the stream yet # because we want to keep the raw data as well # this is done later in the atom entry call # (cf. swh.deposit.api.common.APIBase._atom_entry) return stream class SWHMultiPartParser(MultiPartParser): """Multipart parser limited to a subset of mediatypes. """ media_type = "multipart/*; *" def parse_xml(raw_content): """Parse xml body. Args: raw_content (bytes): The content to parse Raises: ParserError in case of a malformed xml Returns: content parsed as dict. """ try: return SWHXMLParser().parse(raw_content) except ExpatError as e: raise ParserError(str(e)) ALLOWED_QUALIFIERS_NODE_TYPE = (SNAPSHOT, REVISION, RELEASE, DIRECTORY) def parse_swh_reference(metadata: Dict) -> Optional[Union[str, SWHID]]: """Parse swh reference within the metadata dict (or origin) reference if found, None otherwise. or: Raises: ValidationError in case the swhid referenced (if any) is invalid Returns: Either swhid or origin reference if any. None otherwise. """ # noqa visit_swhid = None anchor_swhid = None swh_deposit = metadata.get("swh:deposit") if not swh_deposit: return None swh_reference = swh_deposit.get("swh:reference") if not swh_reference: return None swh_origin = swh_reference.get("swh:origin") if swh_origin: url = swh_origin.get("@url") if url: return url swh_object = swh_reference.get("swh:object") if not swh_object: return None swhid = swh_object.get("@swhid") if not swhid: return None swhid_reference = parse_swhid(swhid) if swhid_reference.metadata: anchor = swhid_reference.metadata.get("anchor") if anchor: anchor_swhid = parse_swhid(anchor) if anchor_swhid.object_type not in ALLOWED_QUALIFIERS_NODE_TYPE: error_msg = ( "anchor qualifier should be a core SWHID with type one of " f" {', '.join(ALLOWED_QUALIFIERS_NODE_TYPE)}" ) raise ValidationError(error_msg) visit = swhid_reference.metadata.get("visit") if visit: visit_swhid = parse_swhid(visit) if visit_swhid.object_type != SNAPSHOT: raise ValidationError( f"visit qualifier should be a core SWHID with type {SNAPSHOT}" ) if ( visit_swhid and anchor_swhid and visit_swhid.object_type == SNAPSHOT and anchor_swhid.object_type == SNAPSHOT ): logger.warn( "SWHID use of both anchor and visit targeting " f"a snapshot: {swhid_reference}" ) raise ValidationError( "'anchor=swh:1:snp:' is not supported when 'visit' is also provided." ) return swhid_reference diff --git a/swh/deposit/utils.py b/swh/deposit/utils.py index e306902a..04229583 100644 --- a/swh/deposit/utils.py +++ b/swh/deposit/utils.py @@ -1,119 +1,137 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from types import GeneratorType from typing import Any, Dict, Tuple, Union import iso8601 +import xmltodict from swh.model.identifiers import SWHID, normalize_timestamp, parse_swhid from swh.model.model import MetadataTargetType +def parse_xml(stream, encoding="utf-8"): + namespaces = { + "http://www.w3.org/2005/Atom": None, + "http://purl.org/dc/terms/": None, + "https://doi.org/10.5063/SCHEMA/CODEMETA-2.0": "codemeta", + "http://purl.org/net/sword/": "sword", + "https://www.softwareheritage.org/schema/2018/deposit": "swh", + } + + data = xmltodict.parse( + stream, encoding=encoding, namespaces=namespaces, process_namespaces=True + ) + if "entry" in data: + data = data["entry"] + return data + + def merge(*dicts): """Given an iterator of dicts, merge them losing no information. Args: *dicts: arguments are all supposed to be dict to merge into one Returns: dict merged without losing information """ def _extend(existing_val, value): """Given an existing value and a value (as potential lists), merge them together without repetition. """ if isinstance(value, (list, map, GeneratorType)): vals = value else: vals = [value] for v in vals: if v in existing_val: continue existing_val.append(v) return existing_val d = {} for data in dicts: if not isinstance(data, dict): raise ValueError("dicts is supposed to be a variable arguments of dict") for key, value in data.items(): existing_val = d.get(key) if not existing_val: d[key] = value continue if isinstance(existing_val, (list, map, GeneratorType)): new_val = _extend(existing_val, value) elif isinstance(existing_val, dict): if isinstance(value, dict): new_val = merge(existing_val, value) else: new_val = _extend([existing_val], value) else: new_val = _extend([existing_val], value) d[key] = new_val return d def normalize_date(date): """Normalize date fields as expected by swh workers. If date is a list, elect arbitrarily the first element of that list If date is (then) a string, parse it through dateutil.parser.parse to extract a datetime. Then normalize it through swh.model.identifiers.normalize_timestamp. Returns The swh date object """ if isinstance(date, list): date = date[0] if isinstance(date, str): date = iso8601.parse_date(date) return normalize_timestamp(date) def compute_metadata_context( swhid_reference: Union[SWHID, str] ) -> Tuple[MetadataTargetType, Dict[str, Any]]: """Given a SWHID object, determine the context as a dict. The parse_swhid calls within are not expected to raise (because they should have been caught early on). """ metadata_context: Dict[str, Any] = {"origin": None} if isinstance(swhid_reference, SWHID): object_type = MetadataTargetType(swhid_reference.object_type) assert object_type != MetadataTargetType.ORIGIN if swhid_reference.metadata: path = swhid_reference.metadata.get("path") metadata_context = { "origin": swhid_reference.metadata.get("origin"), "path": path.encode() if path else None, } snapshot = swhid_reference.metadata.get("visit") if snapshot: metadata_context["snapshot"] = parse_swhid(snapshot) anchor = swhid_reference.metadata.get("anchor") if anchor: anchor_swhid = parse_swhid(anchor) metadata_context[anchor_swhid.object_type] = anchor_swhid else: object_type = MetadataTargetType.ORIGIN return object_type, metadata_context