diff --git a/swh/deposit/parsers.py b/swh/deposit/parsers.py --- a/swh/deposit/parsers.py +++ b/swh/deposit/parsers.py @@ -51,6 +51,7 @@ "http://purl.org/dc/terms/": None, "https://doi.org/10.5063/SCHEMA/CODEMETA-2.0": "codemeta", "http://purl.org/net/sword/": "sword", + "https://www.softwareheritage.org/schema/2018/deposit": "swh", } data = xmltodict.parse( diff --git a/swh/deposit/tests/test_utils.py b/swh/deposit/tests/test_utils.py --- a/swh/deposit/tests/test_utils.py +++ b/swh/deposit/tests/test_utils.py @@ -1,4 +1,4 @@ -# Copyright (C) 2018-2019 The Software Heritage developers +# Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -8,6 +8,9 @@ import pytest from swh.deposit import utils +from swh.deposit.parsers import parse_xml +from swh.deposit.utils import read_swh_object_reference +from swh.model.identifiers import parse_swhid def test_merge(): @@ -139,3 +142,93 @@ expected_date = "2017-01-01 00:00:00+00:00" assert str(actual_date) == expected_date + + +@pytest.fixture +def xml_with_origin_reference(): + xml_data = """ + + + + + + + + """ + return xml_data.strip() + + +def test_read_swh_reference_origin(xml_with_origin_reference): + url = "https://url" + xml_data = xml_with_origin_reference.format(url=url) + metadata = parse_xml(xml_data) + + actual_origin = read_swh_object_reference(metadata) + assert actual_origin == url + + +@pytest.fixture +def xml_with_empty_reference(): + xml_data = """ + + + {swh_reference} + + + """ + return xml_data.strip() + + +@pytest.mark.parametrize( + "xml_ref", + [ + "", + "", + "", + """""", + ], +) +def test_read_swh_reference_empty(xml_with_empty_reference, xml_ref): + xml_body = xml_with_empty_reference.format(swh_reference=xml_ref) + metadata = parse_xml(xml_body) + + assert read_swh_object_reference(metadata) is None + + +@pytest.fixture +def xml_with_swhid(): + xml_data = """ + + + + + + + + """ + return xml_data.strip() + + +@pytest.mark.parametrize( + "swhid", + [ + # """swh:1:dir:31b5c8cc985d190b5a7ef4878128ebfdc2358f49; + # origin=https://hal.archives-ouvertes.fr/hal-01243573; + # visit=swh:1:snp:4fc1e36fca86b2070204bedd51106014a614f321; + # anchor=swh:1:rev:9c5de20cfb54682370a398fcc733e829903c8cba; + # path=/moranegg-AffectationRO-df7f68b/""", + "swh:1:dir:31b5c8cc985d190b5a7ef4878128ebfdc2358f49" + ], +) +def test_read_swh_reference_swhid(swhid, xml_with_swhid): + xml_data = xml_with_swhid.format(swhid=swhid) + metadata = parse_xml(xml_data) + + actual_swhid = read_swh_object_reference(metadata) + assert actual_swhid is not None + + assert actual_swhid == parse_swhid(swhid) diff --git a/swh/deposit/utils.py b/swh/deposit/utils.py --- a/swh/deposit/utils.py +++ b/swh/deposit/utils.py @@ -1,13 +1,14 @@ -# Copyright (C) 2018-2019 The Software Heritage developers +# Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from types import GeneratorType +from typing import Dict, Optional, Union import iso8601 -from swh.model.identifiers import normalize_timestamp +from swh.model.identifiers import SWHID, normalize_timestamp, parse_swhid def merge(*dicts): @@ -81,3 +82,49 @@ date = iso8601.parse_date(date) return normalize_timestamp(date) + + +def read_swh_object_reference(metadata: Dict) -> Optional[Union[str, SWHID]]: + """Returns a deposit swhid (or origin) reference if found, None otherwise. + + + + + + + + or: + + + + + + + """ + swh_deposit = metadata.get("swh:deposit") + if not swh_deposit: + return None + + swh_reference = swh_deposit.get("swh:reference") + if not swh_reference: + return None + + swh_origin = swh_reference.get("swh:origin") + if swh_origin: + url = swh_origin.get("@url") + if url: + return url + + swh_object = swh_reference.get("swh:object") + if not swh_object: + return None + + swhid = swh_object.get("@swhid") + if not swhid: + return None + return parse_swhid(swhid)