diff --git a/swh/deposit/tests/data/atom/entry-data-with-metadata-provenance.xml b/swh/deposit/tests/data/atom/entry-data-with-metadata-provenance.xml
new file mode 100644
index 00000000..91cd6039
--- /dev/null
+++ b/swh/deposit/tests/data/atom/entry-data-with-metadata-provenance.xml
@@ -0,0 +1,13 @@
+
+
+ Awesome stuff
+ urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a
+ ssedud
+
+
+ {url}
+
+
+
diff --git a/swh/deposit/tests/test_utils.py b/swh/deposit/tests/test_utils.py
index 5402a5b7..ee1beb8d 100644
--- a/swh/deposit/tests/test_utils.py
+++ b/swh/deposit/tests/test_utils.py
@@ -1,273 +1,303 @@
-# Copyright (C) 2018-2020 The Software Heritage developers
+# Copyright (C) 2018-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import pytest
from swh.deposit import utils
from swh.deposit.parsers import parse_xml
from swh.model.exceptions import ValidationError
from swh.model.swhids import CoreSWHID, QualifiedSWHID
@pytest.fixture
def xml_with_origin_reference():
xml_data = """
"""
return xml_data.strip()
def test_merge():
"""Calling utils.merge on dicts should merge without losing information
"""
d0 = {"author": "someone", "license": [["gpl2"]], "a": 1}
d1 = {
"author": ["author0", {"name": "author1"}],
"license": [["gpl3"]],
"b": {"1": "2"},
}
d2 = {"author": map(lambda x: x, ["else"]), "license": "mit", "b": {"2": "3",}}
d3 = {
"author": (v for v in ["no one"]),
}
actual_merge = utils.merge(d0, d1, d2, d3)
expected_merge = {
"a": 1,
"license": [["gpl2"], ["gpl3"], "mit"],
"author": ["someone", "author0", {"name": "author1"}, "else", "no one"],
"b": {"1": "2", "2": "3",},
}
assert actual_merge == expected_merge
def test_merge_2():
d0 = {"license": "gpl2", "runtime": {"os": "unix derivative"}}
d1 = {"license": "gpl3", "runtime": "GNU/Linux"}
expected = {
"license": ["gpl2", "gpl3"],
"runtime": [{"os": "unix derivative"}, "GNU/Linux"],
}
actual = utils.merge(d0, d1)
assert actual == expected
def test_merge_edge_cases():
input_dict = {
"license": ["gpl2", "gpl3"],
"runtime": [{"os": "unix derivative"}, "GNU/Linux"],
}
# against empty dict
actual = utils.merge(input_dict, {})
assert actual == input_dict
# against oneself
actual = utils.merge(input_dict, input_dict, input_dict)
assert actual == input_dict
def test_merge_one_dict():
"""Merge one dict should result in the same dict value
"""
input_and_expected = {"anything": "really"}
actual = utils.merge(input_and_expected)
assert actual == input_and_expected
def test_merge_raise():
"""Calling utils.merge with any no dict argument should raise
"""
d0 = {"author": "someone", "a": 1}
d1 = ["not a dict"]
with pytest.raises(ValueError):
utils.merge(d0, d1)
with pytest.raises(ValueError):
utils.merge(d1, d0)
with pytest.raises(ValueError):
utils.merge(d1)
assert utils.merge(d0) == d0
def test_normalize_date_0():
"""When date is a list, choose the first date and normalize it
"""
actual_date = utils.normalize_date(["2017-10-12", "date1"])
assert actual_date == {
"timestamp": {"microseconds": 0, "seconds": 1507766400},
"offset": 0,
}
def test_normalize_date_1():
"""Providing a date in a reasonable format, everything is fine
"""
actual_date = utils.normalize_date("2018-06-11 17:02:02")
assert actual_date == {
"timestamp": {"microseconds": 0, "seconds": 1528736522},
"offset": 0,
}
def test_normalize_date_doing_irrelevant_stuff():
"""Providing a date with only the year results in a reasonable date
"""
actual_date = utils.normalize_date("2017")
assert actual_date == {
"timestamp": {"seconds": 1483228800, "microseconds": 0},
"offset": 0,
}
@pytest.mark.parametrize(
"swhid,expected_metadata_context",
[
("swh:1:cnt:51b5c8cc985d190b5a7ef4878128ebfdc2358f49", {"origin": None},),
(
"swh:1:snp:51b5c8cc985d190b5a7ef4878128ebfdc2358f49;origin=http://blah",
{"origin": "http://blah", "path": None},
),
(
"swh:1:dir:51b5c8cc985d190b5a7ef4878128ebfdc2358f49;path=/path",
{"origin": None, "path": b"/path"},
),
(
"swh:1:rev:51b5c8cc985d190b5a7ef4878128ebfdc2358f49;visit=swh:1:snp:41b5c8cc985d190b5a7ef4878128ebfdc2358f49", # noqa
{
"origin": None,
"path": None,
"snapshot": CoreSWHID.from_string(
"swh:1:snp:41b5c8cc985d190b5a7ef4878128ebfdc2358f49"
),
},
),
(
"swh:1:rel:51b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:dir:41b5c8cc985d190b5a7ef4878128ebfdc2358f49", # noqa
{
"origin": None,
"path": None,
"directory": CoreSWHID.from_string(
"swh:1:dir:41b5c8cc985d190b5a7ef4878128ebfdc2358f49"
),
},
),
],
)
def test_compute_metadata_context(swhid: str, expected_metadata_context):
assert expected_metadata_context == utils.compute_metadata_context(
QualifiedSWHID.from_string(swhid)
)
def test_parse_swh_reference_origin(xml_with_origin_reference):
url = "https://url"
xml_data = xml_with_origin_reference.format(url=url)
metadata = parse_xml(xml_data)
actual_origin = utils.parse_swh_reference(metadata)
assert actual_origin == url
@pytest.fixture
-def xml_with_empty_reference():
+def xml_swh_deposit_template():
xml_data = """
-
+
- {swh_reference}
+ {swh_deposit}
"""
return xml_data.strip()
@pytest.mark.parametrize(
"xml_ref",
[
"",
"",
"",
"""""",
],
)
-def test_parse_swh_reference_empty(xml_with_empty_reference, xml_ref):
- xml_body = xml_with_empty_reference.format(swh_reference=xml_ref)
+def test_parse_swh_reference_empty(xml_swh_deposit_template, xml_ref):
+ xml_body = xml_swh_deposit_template.format(swh_deposit=xml_ref)
metadata = utils.parse_xml(xml_body)
assert utils.parse_swh_reference(metadata) is None
@pytest.fixture
def xml_with_swhid(atom_dataset):
return atom_dataset["entry-data-with-swhid"]
@pytest.mark.parametrize(
"swhid",
[
"swh:1:cnt:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;origin=https://hal.archives-ouvertes.fr/hal-01243573;visit=swh:1:snp:4fc1e36fca86b2070204bedd51106014a614f321;anchor=swh:1:rev:9c5de20cfb54682370a398fcc733e829903c8cba;path=/moranegg-AffectationRO-df7f68b/", # noqa
"swh:1:dir:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:dir:9c5de20cfb54682370a398fcc733e829903c8cba", # noqa
"swh:1:rev:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:rev:9c5de20cfb54682370a398fcc733e829903c8cba", # noqa
"swh:1:rel:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:rel:9c5de20cfb54682370a398fcc733e829903c8cba", # noqa
"swh:1:snp:31b5c8cc985d190b5a7ef4878128ebfdc2358f49;anchor=swh:1:snp:9c5de20cfb54682370a398fcc733e829903c8cba", # noqa
"swh:1:dir:31b5c8cc985d190b5a7ef4878128ebfdc2358f49",
],
)
def test_parse_swh_reference_swhid(swhid, xml_with_swhid):
xml_data = xml_with_swhid.format(swhid=swhid)
metadata = utils.parse_xml(xml_data)
actual_swhid = utils.parse_swh_reference(metadata)
assert actual_swhid is not None
expected_swhid = QualifiedSWHID.from_string(swhid)
assert actual_swhid == expected_swhid
@pytest.mark.parametrize(
"invalid_swhid",
[
# incorrect length
"swh:1:cnt:31b5c8cc985d190b5a7ef4878128ebfdc235" # noqa
# visit qualifier should be a core SWHID with type,
"swh:1:dir:c4993c872593e960dc84e4430dbbfbc34fd706d0;visit=swh:1:rev:0175049fc45055a3824a1675ac06e3711619a55a", # noqa
# anchor qualifier should be a core SWHID with type one of
"swh:1:rev:c4993c872593e960dc84e4430dbbfbc34fd706d0;anchor=swh:1:cnt:b5f505b005435fa5c4fa4c279792bd7b17167c04;path=/", # noqa
"swh:1:rev:c4993c872593e960dc84e4430dbbfbc34fd706d0;visit=swh:1:snp:0175049fc45055a3824a1675ac06e3711619a55a;anchor=swh:1:snp:b5f505b005435fa5c4fa4c279792bd7b17167c04", # noqa
],
)
def test_parse_swh_reference_invalid_swhid(invalid_swhid, xml_with_swhid):
"""Unparsable swhid should raise
"""
xml_invalid_swhid = xml_with_swhid.format(swhid=invalid_swhid)
metadata = utils.parse_xml(xml_invalid_swhid)
with pytest.raises(ValidationError):
utils.parse_swh_reference(metadata)
+
+
+@pytest.mark.parametrize(
+ "xml_ref",
+ [
+ "",
+ "",
+ "",
+ ],
+)
+def test_parse_swh_metatada_provenance_empty(xml_swh_deposit_template, xml_ref):
+ xml_body = xml_swh_deposit_template.format(swh_deposit=xml_ref)
+ metadata = utils.parse_xml(xml_body)
+
+ assert utils.parse_swh_metadata_provenance(metadata) is None
+
+
+@pytest.fixture
+def xml_with_metadata_provenance(atom_dataset):
+ return atom_dataset["entry-data-with-metadata-provenance"]
+
+
+def test_parse_swh_metadata_provenance2(xml_with_metadata_provenance):
+ xml_data = xml_with_metadata_provenance.format(url="https://url.org/metadata/url")
+ metadata = utils.parse_xml(xml_data)
+
+ actual_url = utils.parse_swh_metadata_provenance(metadata)
+
+ assert actual_url == "https://url.org/metadata/url"
diff --git a/swh/deposit/utils.py b/swh/deposit/utils.py
index 2c1c2eb5..0ad2c1af 100644
--- a/swh/deposit/utils.py
+++ b/swh/deposit/utils.py
@@ -1,254 +1,291 @@
-# Copyright (C) 2018-2020 The Software Heritage developers
+# Copyright (C) 2018-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from types import GeneratorType
from typing import Any, Dict, Optional, Union
import iso8601
import xmltodict
from swh.model.exceptions import ValidationError
from swh.model.model import TimestampWithTimezone
from swh.model.swhids import ExtendedSWHID, ObjectType, QualifiedSWHID
logger = logging.getLogger(__name__)
def parse_xml(stream, encoding="utf-8"):
namespaces = {
"http://www.w3.org/2005/Atom": "atom",
"http://www.w3.org/2007/app": "app",
"http://purl.org/dc/terms/": "dc",
"https://doi.org/10.5063/SCHEMA/CODEMETA-2.0": "codemeta",
"http://purl.org/net/sword/terms/": "sword",
"https://www.softwareheritage.org/schema/2018/deposit": "swh",
+ "https://schema.org/": "schema",
}
data = xmltodict.parse(
stream,
encoding=encoding,
namespaces=namespaces,
process_namespaces=True,
dict_constructor=dict,
)
if "atom:entry" in data:
data = data["atom:entry"]
return data
def merge(*dicts):
"""Given an iterator of dicts, merge them losing no information.
Args:
*dicts: arguments are all supposed to be dict to merge into one
Returns:
dict merged without losing information
"""
def _extend(existing_val, value):
"""Given an existing value and a value (as potential lists), merge
them together without repetition.
"""
if isinstance(value, (list, map, GeneratorType)):
vals = value
else:
vals = [value]
for v in vals:
if v in existing_val:
continue
existing_val.append(v)
return existing_val
d = {}
for data in dicts:
if not isinstance(data, dict):
raise ValueError("dicts is supposed to be a variable arguments of dict")
for key, value in data.items():
existing_val = d.get(key)
if not existing_val:
d[key] = value
continue
if isinstance(existing_val, (list, map, GeneratorType)):
new_val = _extend(existing_val, value)
elif isinstance(existing_val, dict):
if isinstance(value, dict):
new_val = merge(existing_val, value)
else:
new_val = _extend([existing_val], value)
else:
new_val = _extend([existing_val], value)
d[key] = new_val
return d
def normalize_date(date):
"""Normalize date fields as expected by swh workers.
If date is a list, elect arbitrarily the first element of that
list
If date is (then) a string, parse it through
dateutil.parser.parse to extract a datetime.
Then normalize it through
:class:`swh.model.model.TimestampWithTimezone`
Returns
The swh date object
"""
if isinstance(date, list):
date = date[0]
if isinstance(date, str):
date = iso8601.parse_date(date)
tstz = TimestampWithTimezone.from_dict(date)
return {
"timestamp": tstz.timestamp.to_dict(),
"offset": tstz.offset_minutes(),
}
def compute_metadata_context(swhid_reference: QualifiedSWHID) -> Dict[str, Any]:
"""Given a SWHID object, determine the context as a dict.
"""
metadata_context: Dict[str, Any] = {"origin": None}
if swhid_reference.qualifiers():
metadata_context = {
"origin": swhid_reference.origin,
"path": swhid_reference.path,
}
snapshot = swhid_reference.visit
if snapshot:
metadata_context["snapshot"] = snapshot
anchor = swhid_reference.anchor
if anchor:
metadata_context[anchor.object_type.name.lower()] = anchor
return metadata_context
ALLOWED_QUALIFIERS_NODE_TYPE = (
ObjectType.SNAPSHOT,
ObjectType.REVISION,
ObjectType.RELEASE,
ObjectType.DIRECTORY,
)
+def parse_swh_metadata_provenance(
+ metadata: Dict,
+) -> Optional[Union[QualifiedSWHID, str]]:
+ """Parse swh metadata-provenance within the metadata dict reference if found, None
+ otherwise.
+
+ .. code-block:: xml
+
+
+
+ https://url.org/metadata/url
+
+
+
+ Args:
+ metadata: result of parsing an Atom document with :func:`parse_xml`
+
+ Raises:
+ ValidationError in case of invalid xml
+
+ Returns:
+ Either the metadata provenance url if any or None otherwise
+
+ """
+
+ swh_deposit = metadata.get("swh:deposit")
+ if not swh_deposit:
+ return None
+
+ swh_metadata_provenance = swh_deposit.get("swh:metadata-provenance")
+ if not swh_metadata_provenance:
+ return None
+
+ return swh_metadata_provenance.get("schema:url")
+
+
def parse_swh_reference(metadata: Dict,) -> Optional[Union[QualifiedSWHID, str]]:
"""Parse swh reference within the metadata dict (or origin) reference if found,
None otherwise.
.. code-block:: xml
or:
.. code-block:: xml
Args:
metadata: result of parsing an Atom document with :func:`parse_xml`
Raises:
ValidationError in case the swhid referenced (if any) is invalid
Returns:
Either swhid or origin reference if any. None otherwise.
""" # noqa
swh_deposit = metadata.get("swh:deposit")
if not swh_deposit:
return None
swh_reference = swh_deposit.get("swh:reference")
if not swh_reference:
return None
swh_origin = swh_reference.get("swh:origin")
if swh_origin:
url = swh_origin.get("@url")
if url:
return url
swh_object = swh_reference.get("swh:object")
if not swh_object:
return None
swhid = swh_object.get("@swhid")
if not swhid:
return None
swhid_reference = QualifiedSWHID.from_string(swhid)
if swhid_reference.qualifiers():
anchor = swhid_reference.anchor
if anchor:
if anchor.object_type not in ALLOWED_QUALIFIERS_NODE_TYPE:
error_msg = (
"anchor qualifier should be a core SWHID with type one of "
f"{', '.join(t.name.lower() for t in ALLOWED_QUALIFIERS_NODE_TYPE)}"
)
raise ValidationError(error_msg)
visit = swhid_reference.visit
if visit:
if visit.object_type != ObjectType.SNAPSHOT:
raise ValidationError(
f"visit qualifier should be a core SWHID with type snp, "
f"not {visit.object_type.value}"
)
if (
visit
and anchor
and visit.object_type == ObjectType.SNAPSHOT
and anchor.object_type == ObjectType.SNAPSHOT
):
logger.warn(
"SWHID use of both anchor and visit targeting "
f"a snapshot: {swhid_reference}"
)
raise ValidationError(
"'anchor=swh:1:snp:' is not supported when 'visit' is also provided."
)
return swhid_reference
def extended_swhid_from_qualified(swhid: QualifiedSWHID) -> ExtendedSWHID:
"""Used to get the target of a metadata object from a ,
as the latter uses a QualifiedSWHID."""
return ExtendedSWHID.from_string(str(swhid).split(";")[0])
def to_header_link(link: str, link_name: str) -> str:
"""Build a single header link.
>>> link_next = to_header_link("next-url", "next")
>>> link_next
'; rel="next"'
>>> ','.join([link_next, to_header_link("prev-url", "prev")])
'; rel="next",; rel="prev"'
"""
return f'<{link}>; rel="{link_name}"'