diff --git a/swh/deposit/tests/data/atom/entry-data-with-metadata-provenance.xml b/swh/deposit/tests/data/atom/entry-data-with-metadata-provenance.xml index 91cd6039..86ccd1d7 100644 --- a/swh/deposit/tests/data/atom/entry-data-with-metadata-provenance.xml +++ b/swh/deposit/tests/data/atom/entry-data-with-metadata-provenance.xml @@ -1,13 +1,13 @@ Awesome stuff urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a ssedud {url} diff --git a/swh/deposit/tests/test_utils.py b/swh/deposit/tests/test_utils.py index d72fefad..e59820f3 100644 --- a/swh/deposit/tests/test_utils.py +++ b/swh/deposit/tests/test_utils.py @@ -1,217 +1,217 @@ # Copyright (C) 2018-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.deposit import utils from swh.deposit.parsers import parse_xml from swh.model.exceptions import ValidationError from swh.model.swhids import CoreSWHID, QualifiedSWHID @pytest.fixture def xml_with_origin_reference(): xml_data = """ """ return xml_data.strip() def test_normalize_date_0(): """When date is a list, choose the first date and normalize it """ actual_date = utils.normalize_date(["2017-10-12", "date1"]) assert actual_date == { "timestamp": {"microseconds": 0, "seconds": 1507766400}, "offset": 0, } def test_normalize_date_1(): """Providing a date in a reasonable format, everything is fine """ actual_date = utils.normalize_date("2018-06-11 17:02:02") assert actual_date == { "timestamp": {"microseconds": 0, "seconds": 1528736522}, "offset": 0, } def test_normalize_date_doing_irrelevant_stuff(): """Providing a date with only the year results in a reasonable date """ actual_date = utils.normalize_date("2017") assert actual_date == { "timestamp": {"seconds": 1483228800, "microseconds": 0}, "offset": 0, } @pytest.mark.parametrize( "swhid,expected_metadata_context", [ ("swh:1:cnt:51b5c8cc985d190b5a7ef4878128ebfdc2358f49", {"origin": None},), ( "swh:1:snp:51b5c8cc985d190b5a7ef4878128ebfdc2358f49;origin=http://blah", {"origin": "http://blah", "path": None}, ), ( "swh:1:dir:51b5c8cc985d190b5a7ef4878128ebfdc2358f49;path=/path", {"origin": None, "path": b"/path"}, ), ( "swh:1:rev:51b5c8cc985d190b5a7ef4878128ebfdc2358f49;visit=swh:1:snp:41b5c8cc985d190b5a7ef4878128ebfdc2358f49", # noqa { "origin": None, "path": None, "snapshot": CoreSWHID.from_string( "swh:1:snp:41b5c8cc985d190b5a7ef4878128ebfdc2358f49" ), }, ), ( "swh:1:rel:51b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:dir:41b5c8cc985d190b5a7ef4878128ebfdc2358f49", # noqa { "origin": None, "path": None, "directory": CoreSWHID.from_string( "swh:1:dir:41b5c8cc985d190b5a7ef4878128ebfdc2358f49" ), }, ), ], ) def test_compute_metadata_context(swhid: str, expected_metadata_context): assert expected_metadata_context == utils.compute_metadata_context( QualifiedSWHID.from_string(swhid) ) def test_parse_swh_reference_origin(xml_with_origin_reference): url = "https://url" xml_data = xml_with_origin_reference.format(url=url) metadata = parse_xml(xml_data) actual_origin = utils.parse_swh_reference(metadata) assert actual_origin == url @pytest.fixture def xml_swh_deposit_template(): xml_data = """ + xmlns:schema="http://schema.org/"> {swh_deposit} """ return xml_data.strip() @pytest.mark.parametrize( "xml_ref", [ "", "", "", """""", ], ) def test_parse_swh_reference_empty(xml_swh_deposit_template, xml_ref): xml_body = xml_swh_deposit_template.format(swh_deposit=xml_ref) metadata = utils.parse_xml(xml_body) assert utils.parse_swh_reference(metadata) is None @pytest.fixture def xml_with_swhid(atom_dataset): return atom_dataset["entry-data-with-swhid"] @pytest.mark.parametrize( "swhid", [ "swh:1:cnt:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;origin=https://hal.archives-ouvertes.fr/hal-01243573;visit=swh:1:snp:4fc1e36fca86b2070204bedd51106014a614f321;anchor=swh:1:rev:9c5de20cfb54682370a398fcc733e829903c8cba;path=/moranegg-AffectationRO-df7f68b/", # noqa "swh:1:dir:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:dir:9c5de20cfb54682370a398fcc733e829903c8cba", # noqa "swh:1:rev:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:rev:9c5de20cfb54682370a398fcc733e829903c8cba", # noqa "swh:1:rel:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:rel:9c5de20cfb54682370a398fcc733e829903c8cba", # noqa "swh:1:snp:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:snp:9c5de20cfb54682370a398fcc733e829903c8cba", # noqa "swh:1:dir:31b5c8cc985d190b5a7ef4878128ebfdc2358f49", ], ) def test_parse_swh_reference_swhid(swhid, xml_with_swhid): xml_data = xml_with_swhid.format(swhid=swhid) metadata = utils.parse_xml(xml_data) actual_swhid = utils.parse_swh_reference(metadata) assert actual_swhid is not None expected_swhid = QualifiedSWHID.from_string(swhid) assert actual_swhid == expected_swhid @pytest.mark.parametrize( "invalid_swhid", [ # incorrect length "swh:1:cnt:31b5c8cc985d190b5a7ef4878128ebfdc235" # noqa # visit qualifier should be a core SWHID with type, "swh:1:dir:c4993c872593e960dc84e4430dbbfbc34fd706d0;visit=swh:1:rev:0175049fc45055a3824a1675ac06e3711619a55a", # noqa # anchor qualifier should be a core SWHID with type one of "swh:1:rev:c4993c872593e960dc84e4430dbbfbc34fd706d0;anchor=swh:1:cnt:b5f505b005435fa5c4fa4c279792bd7b17167c04;path=/", # noqa "swh:1:rev:c4993c872593e960dc84e4430dbbfbc34fd706d0;visit=swh:1:snp:0175049fc45055a3824a1675ac06e3711619a55a;anchor=swh:1:snp:b5f505b005435fa5c4fa4c279792bd7b17167c04", # noqa ], ) def test_parse_swh_reference_invalid_swhid(invalid_swhid, xml_with_swhid): """Unparsable swhid should raise """ xml_invalid_swhid = xml_with_swhid.format(swhid=invalid_swhid) metadata = utils.parse_xml(xml_invalid_swhid) with pytest.raises(ValidationError): utils.parse_swh_reference(metadata) @pytest.mark.parametrize( "xml_ref", [ "", "", "", ], ) def test_parse_swh_metatada_provenance_empty(xml_swh_deposit_template, xml_ref): xml_body = xml_swh_deposit_template.format(swh_deposit=xml_ref) metadata = utils.parse_xml(xml_body) assert utils.parse_swh_metadata_provenance(metadata) is None @pytest.fixture def xml_with_metadata_provenance(atom_dataset): return atom_dataset["entry-data-with-metadata-provenance"] def test_parse_swh_metadata_provenance2(xml_with_metadata_provenance): xml_data = xml_with_metadata_provenance.format(url="https://url.org/metadata/url") metadata = utils.parse_xml(xml_data) actual_url = utils.parse_swh_metadata_provenance(metadata) assert actual_url == "https://url.org/metadata/url" diff --git a/swh/deposit/utils.py b/swh/deposit/utils.py index 1adb7258..c8e2ce26 100644 --- a/swh/deposit/utils.py +++ b/swh/deposit/utils.py @@ -1,241 +1,241 @@ # Copyright (C) 2018-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from typing import Any, Dict, Optional, Union import iso8601 import xmltodict from swh.model.exceptions import ValidationError from swh.model.model import TimestampWithTimezone from swh.model.swhids import ExtendedSWHID, ObjectType, QualifiedSWHID logger = logging.getLogger(__name__) def parse_xml(stream, encoding="utf-8"): namespaces = { "http://www.w3.org/2005/Atom": "atom", "http://www.w3.org/2007/app": "app", "http://purl.org/dc/terms/": "dc", "https://doi.org/10.5063/SCHEMA/CODEMETA-2.0": "codemeta", "http://purl.org/net/sword/terms/": "sword", "https://www.softwareheritage.org/schema/2018/deposit": "swh", - "https://schema.org/": "schema", + "http://schema.org/": "schema", } data = xmltodict.parse( stream, encoding=encoding, namespaces=namespaces, process_namespaces=True, dict_constructor=dict, ) if "atom:entry" in data: data = data["atom:entry"] return data def normalize_date(date): """Normalize date fields as expected by swh workers. If date is a list, elect arbitrarily the first element of that list If date is (then) a string, parse it through dateutil.parser.parse to extract a datetime. Then normalize it through :class:`swh.model.model.TimestampWithTimezone` Returns The swh date object """ if isinstance(date, list): date = date[0] if isinstance(date, str): date = iso8601.parse_date(date) tstz = TimestampWithTimezone.from_dict(date) return { "timestamp": tstz.timestamp.to_dict(), "offset": tstz.offset_minutes(), } def compute_metadata_context(swhid_reference: QualifiedSWHID) -> Dict[str, Any]: """Given a SWHID object, determine the context as a dict. """ metadata_context: Dict[str, Any] = {"origin": None} if swhid_reference.qualifiers(): metadata_context = { "origin": swhid_reference.origin, "path": swhid_reference.path, } snapshot = swhid_reference.visit if snapshot: metadata_context["snapshot"] = snapshot anchor = swhid_reference.anchor if anchor: metadata_context[anchor.object_type.name.lower()] = anchor return metadata_context ALLOWED_QUALIFIERS_NODE_TYPE = ( ObjectType.SNAPSHOT, ObjectType.REVISION, ObjectType.RELEASE, ObjectType.DIRECTORY, ) def parse_swh_metadata_provenance( metadata: Dict, ) -> Optional[Union[QualifiedSWHID, str]]: """Parse swh metadata-provenance within the metadata dict reference if found, None otherwise. .. code-block:: xml https://url.org/metadata/url Args: metadata: result of parsing an Atom document with :func:`parse_xml` Raises: ValidationError in case of invalid xml Returns: Either the metadata provenance url if any or None otherwise """ swh_deposit = metadata.get("swh:deposit") if not swh_deposit: return None swh_metadata_provenance = swh_deposit.get("swh:metadata-provenance") if not swh_metadata_provenance: return None return swh_metadata_provenance.get("schema:url") def parse_swh_reference(metadata: Dict,) -> Optional[Union[QualifiedSWHID, str]]: """Parse swh reference within the metadata dict (or origin) reference if found, None otherwise. .. code-block:: xml or: .. code-block:: xml Args: metadata: result of parsing an Atom document with :func:`parse_xml` Raises: ValidationError in case the swhid referenced (if any) is invalid Returns: Either swhid or origin reference if any. None otherwise. """ # noqa swh_deposit = metadata.get("swh:deposit") if not swh_deposit: return None swh_reference = swh_deposit.get("swh:reference") if not swh_reference: return None swh_origin = swh_reference.get("swh:origin") if swh_origin: url = swh_origin.get("@url") if url: return url swh_object = swh_reference.get("swh:object") if not swh_object: return None swhid = swh_object.get("@swhid") if not swhid: return None swhid_reference = QualifiedSWHID.from_string(swhid) if swhid_reference.qualifiers(): anchor = swhid_reference.anchor if anchor: if anchor.object_type not in ALLOWED_QUALIFIERS_NODE_TYPE: error_msg = ( "anchor qualifier should be a core SWHID with type one of " f"{', '.join(t.name.lower() for t in ALLOWED_QUALIFIERS_NODE_TYPE)}" ) raise ValidationError(error_msg) visit = swhid_reference.visit if visit: if visit.object_type != ObjectType.SNAPSHOT: raise ValidationError( f"visit qualifier should be a core SWHID with type snp, " f"not {visit.object_type.value}" ) if ( visit and anchor and visit.object_type == ObjectType.SNAPSHOT and anchor.object_type == ObjectType.SNAPSHOT ): logger.warn( "SWHID use of both anchor and visit targeting " f"a snapshot: {swhid_reference}" ) raise ValidationError( "'anchor=swh:1:snp:' is not supported when 'visit' is also provided." ) return swhid_reference def extended_swhid_from_qualified(swhid: QualifiedSWHID) -> ExtendedSWHID: """Used to get the target of a metadata object from a , as the latter uses a QualifiedSWHID.""" return ExtendedSWHID.from_string(str(swhid).split(";")[0]) def to_header_link(link: str, link_name: str) -> str: """Build a single header link. >>> link_next = to_header_link("next-url", "next") >>> link_next '; rel="next"' >>> ','.join([link_next, to_header_link("prev-url", "prev")]) '; rel="next",; rel="prev"' """ return f'<{link}>; rel="{link_name}"'