diff --git a/swh/deposit/client.py b/swh/deposit/client.py
index f1dcc7ac..50255f0f 100644
--- a/swh/deposit/client.py
+++ b/swh/deposit/client.py
@@ -1,753 +1,846 @@
# Copyright (C) 2017-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Module in charge of defining an swh-deposit client
"""
import hashlib
import logging
import os
from typing import Any, Dict, Optional, Tuple
from urllib.parse import urljoin
import warnings
import requests
from requests import Response
+from requests.utils import parse_header_links
from swh.core.config import load_from_envvar
from swh.deposit import __version__ as swh_deposit_version
from swh.deposit.utils import parse_xml
logger = logging.getLogger(__name__)
def compute_unified_information(
collection: str,
in_progress: bool,
slug: str,
*,
filepath: Optional[str] = None,
swhid: Optional[str] = None,
**kwargs,
) -> Dict[str, Any]:
"""Given a filepath, compute necessary information on that file.
Args:
collection: Deposit collection
in_progress: do we finalize the deposit?
slug: external id to use
filepath: Path to the file to compute the necessary information out of
swhid: Deposit swhid if any
Returns:
dict with keys:
'slug': external id to use
'in_progress': do we finalize the deposit?
'content-type': content type associated
'md5sum': md5 sum
'filename': filename
'filepath': filepath
'swhid': deposit swhid
"""
result: Dict[str, Any] = {
"slug": slug,
"in_progress": in_progress,
"swhid": swhid,
}
content_type: Optional[str] = None
md5sum: Optional[str] = None
if filepath:
filename = os.path.basename(filepath)
md5sum = hashlib.md5(open(filepath, "rb").read()).hexdigest()
extension = filename.split(".")[-1]
if "zip" in extension:
content_type = "application/zip"
else:
content_type = "application/x-tar"
result.update(
{
"content-type": content_type,
"md5sum": md5sum,
"filename": filename,
"filepath": filepath,
}
)
return result
class MaintenanceError(ValueError):
"""Informational maintenance error exception
"""
pass
def handle_deprecated_config(config: Dict) -> Tuple[str, Optional[Tuple[str, str]]]:
warnings.warn(
'"config" argument is deprecated, please '
'use "url" and "auth" arguments instead; note that "auth" '
"expects now a couple (username, password) and not a dict.",
DeprecationWarning,
)
url: str = config["url"]
auth: Optional[Tuple[str, str]] = None
if config.get("auth"):
auth = (config["auth"]["username"], config["auth"]["password"])
return (url, auth)
class BaseApiDepositClient:
"""Deposit client base class
"""
def __init__(
self,
config: Optional[Dict] = None,
url: Optional[str] = None,
auth: Optional[Tuple[str, str]] = None,
):
if not url and not config:
config = load_from_envvar()
if config:
url, auth = handle_deprecated_config(config)
# needed to help mypy not be fooled by the Optional nature of url
assert url is not None
self.base_url = url.strip("/") + "/"
self.auth = auth
self.session = requests.Session()
if auth:
self.session.auth = auth
self.session.headers.update(
{"user-agent": f"swh-deposit/{swh_deposit_version}"}
)
def do(self, method, url, *args, **kwargs):
"""Internal method to deal with requests, possibly with basic http
authentication.
Args:
method (str): supported http methods as in self._methods' keys
Returns:
The request's execution
"""
full_url = urljoin(self.base_url, url.lstrip("/"))
return self.session.request(method, full_url, *args, **kwargs)
class PrivateApiDepositClient(BaseApiDepositClient):
"""Private API deposit client to:
- read a given deposit's archive(s)
- read a given deposit's metadata
- update a given deposit's status
"""
def archive_get(self, archive_update_url: str, archive: str) -> Optional[str]:
"""Retrieve the archive from the deposit to a local directory.
Args:
archive_update_url (str): The full deposit archive(s)'s raw content
to retrieve locally
archive (str): the local archive's path where to store
the raw content
Returns:
The archive path to the local archive to load.
Or None if any problem arose.
"""
response = self.do("get", archive_update_url, stream=True)
if response.ok:
with open(archive, "wb") as f:
for chunk in response.iter_content():
f.write(chunk)
return archive
msg = "Problem when retrieving deposit archive at %s" % (archive_update_url,)
logger.error(msg)
raise ValueError(msg)
def metadata_get(self, metadata_url):
"""Retrieve the metadata information on a given deposit.
Args:
metadata_url (str): The full deposit metadata url to retrieve
locally
Returns:
The dictionary of metadata for that deposit or None if any
problem arose.
"""
r = self.do("get", metadata_url)
if r.ok:
return r.json()
msg = "Problem when retrieving metadata at %s" % metadata_url
logger.error(msg)
raise ValueError(msg)
def status_update(
self,
update_status_url,
status,
revision_id=None,
directory_id=None,
origin_url=None,
):
"""Update the deposit's status.
Args:
update_status_url (str): the full deposit's archive
status (str): The status to update the deposit with
revision_id (str/None): the revision's identifier to update to
directory_id (str/None): the directory's identifier to update to
origin_url (str/None): deposit's associated origin url
"""
payload = {"status": status}
if revision_id:
payload["revision_id"] = revision_id
if directory_id:
payload["directory_id"] = directory_id
if origin_url:
payload["origin_url"] = origin_url
self.do("put", update_status_url, json=payload)
def check(self, check_url):
"""Check the deposit's associated data (metadata, archive(s))
Args:
check_url (str): the full deposit's check url
"""
r = self.do("get", check_url)
if r.ok:
data = r.json()
return data["status"]
msg = "Problem when checking deposit %s" % check_url
logger.error(msg)
raise ValueError(msg)
class BaseDepositClient(BaseApiDepositClient):
"""Base Deposit client to access the public api.
"""
def __init__(
self, config=None, url=None, auth=None, error_msg=None, empty_result={}
):
super().__init__(url=url, auth=auth, config=config)
self.error_msg = error_msg
self.empty_result = empty_result
def compute_url(self, *args, **kwargs):
"""Compute api url endpoint to query."""
raise NotImplementedError
def compute_method(self, *args, **kwargs):
"""Http method to use on the url"""
raise NotImplementedError
- def parse_result_ok(self, xml_content: str) -> Dict[str, Any]:
+ def parse_result_ok(
+ self, xml_content: str, headers: Optional[Dict] = None
+ ) -> Dict[str, Any]:
"""Given an xml result from the api endpoint, parse it and returns a
dict.
"""
raise NotImplementedError
def compute_information(self, *args, **kwargs) -> Dict[str, Any]:
"""Compute some more information given the inputs (e.g http headers,
...)
"""
return {}
def parse_result_error(self, xml_content: str) -> Dict[str, Any]:
"""Given an error response in xml, parse it into a dict.
Returns:
dict with following keys:
'error': The error message
'detail': Some more detail about the error if any
"""
data = parse_xml(xml_content)
sword_error = data["sword:error"]
return {
"summary": sword_error["atom:summary"],
"detail": sword_error.get("detail", ""),
"sword:verboseDescription": sword_error.get("sword:verboseDescription", ""),
}
- def do_execute(self, method: str, url: str, info: Dict) -> Response:
+ def do_execute(self, method: str, url: str, info: Dict, **kwargs) -> Response:
"""Execute the http query to url using method and info information.
- By default, execute a simple query to url with the http
- method. Override this in daughter class to improve the
- default behavior if needed.
+ By default, execute a simple query to url with the http method. Override this in
+ subclass to improve the default behavior if needed.
"""
- return self.do(method, url)
+ return self.do(method, url, **kwargs)
+
+ def compute_params(self, **kwargs) -> Dict[str, Any]:
+ """Determine the params out of the kwargs"""
+ return {}
def execute(self, *args, **kwargs) -> Dict[str, Any]:
"""Main endpoint to prepare and execute the http query to the api.
Raises:
MaintenanceError if some api maintenance is happening.
Returns:
Dict of computed api data
"""
url = self.compute_url(*args, **kwargs)
method = self.compute_method(*args, **kwargs)
info = self.compute_information(*args, **kwargs)
+ params = self.compute_params(**kwargs)
try:
- response = self.do_execute(method, url, info)
+ response = self.do_execute(method, url, info, params=params)
except Exception as e:
msg = self.error_msg % (url, e)
result = self.empty_result
result.update(
{"error": msg,}
)
return result
else:
if response.ok:
if int(response.status_code) == 204: # 204 returns no body
return {"status": response.status_code}
else:
- return self.parse_result_ok(response.text)
+ headers = dict(response.headers) if response.headers else None
+ return self.parse_result_ok(response.text, headers)
else:
error = self.parse_result_error(response.text)
empty = self.empty_result
error.update(empty)
if response.status_code == 503:
summary = error.get("summary")
detail = error.get("sword:verboseDescription")
# Maintenance error
if summary and detail:
raise MaintenanceError(f"{summary}: {detail}")
error.update(
{"status": response.status_code,}
)
return error
class ServiceDocumentDepositClient(BaseDepositClient):
"""Service Document information retrieval.
"""
def __init__(self, config=None, url=None, auth=None):
super().__init__(
url=url,
auth=auth,
config=config,
error_msg="Service document failure at %s: %s",
empty_result={"collection": None},
)
def compute_url(self, *args, **kwargs):
return "/servicedocument/"
def compute_method(self, *args, **kwargs):
return "get"
- def parse_result_ok(self, xml_content: str) -> Dict[str, Any]:
+ def parse_result_ok(
+ self, xml_content: str, headers: Optional[Dict] = None
+ ) -> Dict[str, Any]:
"""Parse service document's success response.
"""
return parse_xml(xml_content)
def parse_result_error(self, xml_content: str) -> Dict[str, Any]:
result = super().parse_result_error(xml_content)
return {"error": result["summary"]}
class StatusDepositClient(BaseDepositClient):
"""Status information on a deposit.
"""
def __init__(self, config=None, url=None, auth=None):
super().__init__(
url=url,
auth=auth,
config=config,
error_msg="Status check failure at %s: %s",
empty_result={
"deposit_status": None,
"deposit_status_detail": None,
"deposit_swh_id": None,
},
)
def compute_url(self, collection, deposit_id):
return "/%s/%s/status/" % (collection, deposit_id)
def compute_method(self, *args, **kwargs):
return "get"
- def parse_result_ok(self, xml_content: str) -> Dict[str, Any]:
+ def parse_result_ok(
+ self, xml_content: str, headers: Optional[Dict] = None
+ ) -> Dict[str, Any]:
"""Given an xml content as string, returns a deposit dict.
"""
data = parse_xml(xml_content)
keys = [
"deposit_id",
"deposit_status",
"deposit_status_detail",
"deposit_swh_id",
"deposit_swh_id_context",
"deposit_external_id",
]
return {key: data.get("swh:" + key) for key in keys}
+class CollectionListDepositClient(BaseDepositClient):
+ """List a collection of deposits (owned by a user)
+
+ """
+
+ def __init__(self, config=None, url=None, auth=None):
+ super().__init__(
+ url=url,
+ auth=auth,
+ config=config,
+ error_msg="List deposits failure at %s: %s",
+ empty_result={},
+ )
+
+ def compute_url(self, collection, **kwargs):
+ return f"/{collection}/"
+
+ def compute_method(self, *args, **kwargs):
+ return "get"
+
+ def compute_params(self, **kwargs) -> Dict[str, Any]:
+ """Transmit pagination params if values provided are not None
+ (e.g. page, page_size)
+
+ """
+ return {k: v for k, v in kwargs.items() if v is not None}
+
+ def parse_result_ok(
+ self, xml_content: str, headers: Optional[Dict] = None
+ ) -> Dict[str, Any]:
+ """Given an xml content as string, returns a deposit dict.
+
+ """
+ link_header = headers.get("Link", "") if headers else ""
+ links = parse_header_links(link_header)
+ data = parse_xml(xml_content)["atom:feed"]
+ total_result = data.get("swh:count", 0)
+ keys = [
+ "id",
+ "reception_date",
+ "complete_date",
+ "external_id",
+ "swhid",
+ "status",
+ "status_detail",
+ "swhid_context",
+ "origin_url",
+ ]
+ entries_ = data.get("atom:entry", [])
+ entries = [entries_] if isinstance(entries_, dict) else entries_
+ deposits_d = [
+ {
+ key: deposit.get(f"swh:{key}")
+ for key in keys
+ if deposit.get(f"swh:{key}") is not None
+ }
+ for deposit in entries
+ ]
+
+ return {
+ "count": total_result,
+ "deposits": deposits_d,
+ **{entry["rel"]: entry["url"] for entry in links},
+ }
+
+
class BaseCreateDepositClient(BaseDepositClient):
"""Deposit client base class to post new deposit.
"""
def __init__(self, config=None, url=None, auth=None):
super().__init__(
url=url,
auth=auth,
config=config,
error_msg="Post Deposit failure at %s: %s",
empty_result={"swh:deposit_id": None, "swh:deposit_status": None,},
)
def compute_url(self, collection, *args, **kwargs):
return "/%s/" % collection
def compute_method(self, *args, **kwargs):
return "post"
- def parse_result_ok(self, xml_content: str) -> Dict[str, Any]:
+ def parse_result_ok(
+ self, xml_content: str, headers: Optional[Dict] = None
+ ) -> Dict[str, Any]:
"""Given an xml content as string, returns a deposit dict.
"""
data = parse_xml(xml_content)
keys = [
"deposit_id",
"deposit_status",
"deposit_status_detail",
"deposit_date",
]
return {key: data.get("swh:" + key) for key in keys}
def compute_headers(self, info: Dict[str, Any]) -> Dict[str, Any]:
return info
- def do_execute(self, method, url, info):
+ def do_execute(self, method, url, info, **kwargs):
with open(info["filepath"], "rb") as f:
return self.do(method, url, data=f, headers=info["headers"])
class CreateArchiveDepositClient(BaseCreateDepositClient):
"""Post an archive (binary) deposit client."""
def compute_headers(self, info):
headers = {
"CONTENT_MD5": info["md5sum"],
"IN-PROGRESS": str(info["in_progress"]),
"CONTENT-TYPE": info["content-type"],
"CONTENT-DISPOSITION": "attachment; filename=%s" % (info["filename"],),
}
if "slug" in info:
headers["SLUG"] = info["slug"]
return headers
def compute_information(self, *args, **kwargs) -> Dict[str, Any]:
info = compute_unified_information(
*args, filepath=kwargs["archive_path"], **kwargs
)
info["headers"] = self.compute_headers(info)
return info
class UpdateArchiveDepositClient(CreateArchiveDepositClient):
"""Update (add/replace) an archive (binary) deposit client."""
def compute_url(self, collection, *args, deposit_id=None, **kwargs):
return "/%s/%s/media/" % (collection, deposit_id)
def compute_method(self, *args, replace=False, **kwargs):
return "put" if replace else "post"
class CreateMetadataDepositClient(BaseCreateDepositClient):
"""Post a metadata deposit client."""
def compute_headers(self, info):
headers = {
"IN-PROGRESS": str(info["in_progress"]),
"CONTENT-TYPE": "application/atom+xml;type=entry",
}
if "slug" in info:
headers["SLUG"] = info["slug"]
return headers
def compute_information(self, *args, **kwargs) -> Dict[str, Any]:
info = compute_unified_information(
*args, filepath=kwargs["metadata_path"], **kwargs
)
info["headers"] = self.compute_headers(info)
return info
class UpdateMetadataOnPartialDepositClient(CreateMetadataDepositClient):
"""Update (add/replace) metadata on partial deposit scenario."""
def compute_url(self, collection, *args, deposit_id=None, **kwargs):
return f"/{collection}/{deposit_id}/metadata/"
def compute_method(self, *args, replace: bool = False, **kwargs) -> str:
return "put" if replace else "post"
class UpdateMetadataOnDoneDepositClient(CreateMetadataDepositClient):
"""Update metadata on "done" deposit. This requires the deposit swhid."""
def compute_url(self, collection, *args, deposit_id=None, **kwargs):
return f"/{collection}/{deposit_id}/atom/"
def compute_headers(self, info: Dict[str, Any]) -> Dict[str, Any]:
return {
"CONTENT-TYPE": "application/atom+xml;type=entry",
"X_CHECK_SWHID": info["swhid"],
}
def compute_method(self, *args, **kwargs) -> str:
return "put"
class CreateMetadataOnlyDepositClient(BaseCreateDepositClient):
"""Create metadata-only deposit."""
def compute_information(self, *args, **kwargs) -> Dict[str, Any]:
return {
"headers": {"CONTENT-TYPE": "application/atom+xml;type=entry",},
"filepath": kwargs["metadata_path"],
}
- def parse_result_ok(self, xml_content: str) -> Dict[str, Any]:
+ def parse_result_ok(
+ self, xml_content: str, headers: Optional[Dict] = None
+ ) -> Dict[str, Any]:
"""Given an xml content as string, returns a deposit dict.
"""
data = parse_xml(xml_content)
keys = [
"deposit_id",
"deposit_status",
"deposit_date",
]
return {key: data.get("swh:" + key) for key in keys}
class CreateMultipartDepositClient(BaseCreateDepositClient):
"""Create a multipart deposit client."""
def _multipart_info(self, info, info_meta):
files = [
(
"file",
(info["filename"], open(info["filepath"], "rb"), info["content-type"]),
),
(
"atom",
(
info_meta["filename"],
open(info_meta["filepath"], "rb"),
"application/atom+xml",
),
),
]
headers = {
"CONTENT_MD5": info["md5sum"],
"IN-PROGRESS": str(info["in_progress"]),
}
if "slug" in info:
headers["SLUG"] = info["slug"]
return files, headers
def compute_information(self, *args, **kwargs) -> Dict[str, Any]:
info = compute_unified_information(*args, filepath=kwargs["archive_path"],)
info_meta = compute_unified_information(
*args, filepath=kwargs["metadata_path"],
)
files, headers = self._multipart_info(info, info_meta)
return {"files": files, "headers": headers}
- def do_execute(self, method, url, info):
+ def do_execute(self, method, url, info, **kwargs):
return self.do(method, url, files=info["files"], headers=info["headers"])
class UpdateMultipartDepositClient(CreateMultipartDepositClient):
"""Update a multipart deposit client."""
def compute_url(self, collection, *args, deposit_id=None, **kwargs):
return "/%s/%s/metadata/" % (collection, deposit_id)
def compute_method(self, *args, replace=False, **kwargs):
return "put" if replace else "post"
class PublicApiDepositClient(BaseApiDepositClient):
"""Public api deposit client."""
def service_document(self):
"""Retrieve service document endpoint's information."""
return ServiceDocumentDepositClient(url=self.base_url, auth=self.auth).execute()
def deposit_status(self, collection: str, deposit_id: int):
"""Retrieve status information on a deposit."""
return StatusDepositClient(url=self.base_url, auth=self.auth).execute(
collection, deposit_id
)
+ def deposit_list(
+ self,
+ collection: str,
+ page: Optional[int] = None,
+ page_size: Optional[int] = None,
+ ):
+ """List deposits from the collection"""
+ return CollectionListDepositClient(url=self.base_url, auth=self.auth).execute(
+ collection, page=page, page_size=page_size
+ )
+
def deposit_create(
self,
collection: str,
slug: Optional[str],
archive: Optional[str] = None,
metadata: Optional[str] = None,
in_progress: bool = False,
):
"""Create a new deposit (archive, metadata, both as multipart)."""
if archive and not metadata:
return CreateArchiveDepositClient(
url=self.base_url, auth=self.auth
).execute(collection, in_progress, slug, archive_path=archive)
elif not archive and metadata:
return CreateMetadataDepositClient(
url=self.base_url, auth=self.auth
).execute(collection, in_progress, slug, metadata_path=metadata)
else:
return CreateMultipartDepositClient(
url=self.base_url, auth=self.auth
).execute(
collection,
in_progress,
slug,
archive_path=archive,
metadata_path=metadata,
)
def deposit_update(
self,
collection: str,
deposit_id: int,
slug: Optional[str],
archive: Optional[str] = None,
metadata: Optional[str] = None,
in_progress: bool = False,
replace: bool = False,
swhid: Optional[str] = None,
):
"""Update (add/replace) existing deposit (archive, metadata, both)."""
response = self.deposit_status(collection, deposit_id)
if "error" in response:
return response
status = response["deposit_status"]
if swhid is None and status != "partial":
return {
"error": "You can only act on deposit with status 'partial'",
"detail": f"The deposit {deposit_id} has status '{status}'",
"deposit_status": status,
"deposit_id": deposit_id,
}
if swhid is not None and status != "done":
return {
"error": "You can only update metadata on deposit with status 'done'",
"detail": f"The deposit {deposit_id} has status '{status}'",
"deposit_status": status,
"deposit_id": deposit_id,
}
if archive and not metadata:
result = UpdateArchiveDepositClient(
url=self.base_url, auth=self.auth
).execute(
collection,
in_progress,
slug,
deposit_id=deposit_id,
archive_path=archive,
replace=replace,
)
elif not archive and metadata and swhid is None:
result = UpdateMetadataOnPartialDepositClient(
url=self.base_url, auth=self.auth
).execute(
collection,
in_progress,
slug,
deposit_id=deposit_id,
metadata_path=metadata,
replace=replace,
)
elif not archive and metadata and swhid is not None:
result = UpdateMetadataOnDoneDepositClient(
url=self.base_url, auth=self.auth
).execute(
collection,
in_progress,
slug,
deposit_id=deposit_id,
metadata_path=metadata,
swhid=swhid,
)
else:
result = UpdateMultipartDepositClient(
url=self.base_url, auth=self.auth
).execute(
collection,
in_progress,
slug,
deposit_id=deposit_id,
archive_path=archive,
metadata_path=metadata,
replace=replace,
)
if "error" in result:
return result
return self.deposit_status(collection, deposit_id)
def deposit_metadata_only(
self, collection: str, metadata: Optional[str] = None,
):
assert metadata is not None
return CreateMetadataOnlyDepositClient(
url=self.base_url, auth=self.auth
).execute(collection, metadata_path=metadata)
diff --git a/swh/deposit/tests/data/atom/entry-list-deposits-page1.xml b/swh/deposit/tests/data/atom/entry-list-deposits-page1.xml
new file mode 100644
index 00000000..d094aab2
--- /dev/null
+++ b/swh/deposit/tests/data/atom/entry-list-deposits-page1.xml
@@ -0,0 +1,18 @@
+
+ 3
+
+ 1031
+ rejected
+ Deposit without archive
+ check-deposit-2020-10-09T13:10:00.000000
+
+
+ 1032
+ rejected
+ Deposit without archive
+ check-deposit-2020-10-10T13:20:00.000000
+
+
diff --git a/swh/deposit/tests/data/atom/entry-list-deposits-page2.xml b/swh/deposit/tests/data/atom/entry-list-deposits-page2.xml
new file mode 100644
index 00000000..eba33ad8
--- /dev/null
+++ b/swh/deposit/tests/data/atom/entry-list-deposits-page2.xml
@@ -0,0 +1,16 @@
+
+ 3
+
+ 1033
+ 2020-10-08T13:50:30
+ 2020-10-08T13:52:34.509655
+ done
+ The deposit has been successfully loaded into the Software Heritage archive
+ swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea
+ swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea;origin=https://www.softwareheritage.org/check-deposit-2020-10-08T13:52:34.509655;visit=swh:1:snp:c477c6ef51833127b13a86ece7d75e5b3cc4e93d;anchor=swh:1:rev:f26f3960c175f15f6e24200171d446b86f6f7230;path=/
+ check-deposit-2020-10-08T13:52:34.509655
+
+
diff --git a/swh/deposit/tests/data/atom/entry-list-deposits.xml b/swh/deposit/tests/data/atom/entry-list-deposits.xml
new file mode 100644
index 00000000..e830bd33
--- /dev/null
+++ b/swh/deposit/tests/data/atom/entry-list-deposits.xml
@@ -0,0 +1,28 @@
+
+ 3
+
+ 1031
+ rejected
+ Deposit without archive
+ check-deposit-2020-10-09T13:10:00.000000
+
+
+ 1032
+ rejected
+ Deposit without archive
+ check-deposit-2020-10-10T13:20:00.000000
+
+
+ 1033
+ 2020-10-08T13:50:30
+ 2020-10-08T13:52:34.509655
+ done
+ The deposit has been successfully loaded into the Software Heritage archive
+ swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea
+ swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea;origin=https://www.softwareheritage.org/check-deposit-2020-10-08T13:52:34.509655;visit=swh:1:snp:c477c6ef51833127b13a86ece7d75e5b3cc4e93d;anchor=swh:1:rev:f26f3960c175f15f6e24200171d446b86f6f7230;path=/
+ check-deposit-2020-10-08T13:52:34.509655
+
+
diff --git a/swh/deposit/tests/test_client_module.py b/swh/deposit/tests/test_client_module.py
index 545641b1..03e64c1c 100644
--- a/swh/deposit/tests/test_client_module.py
+++ b/swh/deposit/tests/test_client_module.py
@@ -1,96 +1,215 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
# Ensure the gist of the BaseDepositClient.execute works as expected in corner cases The
# following tests uses the ServiceDocumentDepositClient and StatusDepositClient because
# they are BaseDepositClient subclasses. We could have used other classes but those ones
# got elected as they are fairly simple ones.
import pytest
from swh.deposit.client import (
+ CollectionListDepositClient,
MaintenanceError,
+ PublicApiDepositClient,
ServiceDocumentDepositClient,
StatusDepositClient,
)
+from swh.deposit.utils import to_header_link
def test_client_read_data_ok(requests_mock_datadir):
client = ServiceDocumentDepositClient(
url="https://deposit.swh.test/1", auth=("test", "test")
)
result = client.execute()
assert isinstance(result, dict)
collection = result["app:service"]["app:workspace"]["app:collection"]
assert collection["sword:name"] == "test"
def test_client_read_data_fails(mocker):
mock = mocker.patch("swh.deposit.client.BaseDepositClient.do_execute")
mock.side_effect = ValueError("here comes trouble")
client = ServiceDocumentDepositClient(
url="https://deposit.swh.test/1", auth=("test", "test")
)
result = client.execute()
assert isinstance(result, dict)
assert "error" in result
assert mock.called
def test_client_read_data_no_result(requests_mock):
url = "https://deposit.swh.test/1"
requests_mock.get(f"{url}/servicedocument/", status_code=204)
client = ServiceDocumentDepositClient(
url="https://deposit.swh.test/1", auth=("test", "test")
)
result = client.execute()
assert isinstance(result, dict)
assert result == {"status": 204}
def test_client_read_data_collection_error_503(requests_mock, atom_dataset):
error_content = atom_dataset["error-cli"].format(
summary="forbidden", verboseDescription="Access restricted",
)
url = "https://deposit.swh.test/1"
requests_mock.get(f"{url}/servicedocument/", status_code=503, text=error_content)
client = ServiceDocumentDepositClient(
url="https://deposit.swh.test/1", auth=("test", "test")
)
result = client.execute()
assert isinstance(result, dict)
assert result == {
"error": "forbidden",
"status": 503,
"collection": None,
}
def test_client_read_data_status_error_503(requests_mock, atom_dataset):
error_content = atom_dataset["error-cli"].format(
summary="forbidden", verboseDescription="Access restricted",
)
collection = "test"
deposit_id = 1
url = "https://deposit.swh.test/1"
requests_mock.get(
f"{url}/{collection}/{deposit_id}/status/", status_code=503, text=error_content
)
client = StatusDepositClient(
url="https://deposit.swh.test/1", auth=("test", "test")
)
with pytest.raises(MaintenanceError, match="forbidden"):
client.execute(collection, deposit_id)
+
+
+EXPECTED_DEPOSIT = {
+ "id": "1031",
+ "external_id": "check-deposit-2020-10-09T13:10:00.000000",
+ "status": "rejected",
+ "status_detail": "Deposit without archive",
+}
+
+EXPECTED_DEPOSIT2 = {
+ "id": "1032",
+ "external_id": "check-deposit-2020-10-10T13:20:00.000000",
+ "status": "rejected",
+ "status_detail": "Deposit without archive",
+}
+
+EXPECTED_DEPOSIT3 = {
+ "id": "1033",
+ "external_id": "check-deposit-2020-10-08T13:52:34.509655",
+ "status": "done",
+ "status_detail": (
+ "The deposit has been successfully loaded into the Software " "Heritage archive"
+ ),
+ "reception_date": "2020-10-08T13:50:30",
+ "complete_date": "2020-10-08T13:52:34.509655",
+ "swhid": "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea",
+ "swhid_context": "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea;origin=https://www.softwareheritage.org/check-deposit-2020-10-08T13:52:34.509655;visit=swh:1:snp:c477c6ef51833127b13a86ece7d75e5b3cc4e93d;anchor=swh:1:rev:f26f3960c175f15f6e24200171d446b86f6f7230;path=/", # noqa
+}
+
+
+def test_client_collection_list(requests_mock, atom_dataset):
+ collection_list_xml = atom_dataset["entry-list-deposits"]
+ base_url = "https://deposit.test.list/1"
+ collection = "test"
+ url = f"{base_url}/{collection}/"
+ requests_mock.get(url, status_code=200, text=collection_list_xml)
+ expected_result = {
+ "count": "3",
+ "deposits": [EXPECTED_DEPOSIT, EXPECTED_DEPOSIT2, EXPECTED_DEPOSIT3],
+ }
+
+ # use dedicated client
+ client = CollectionListDepositClient(url=base_url, auth=("test", "test"))
+
+ # no pagination
+ result = client.execute(collection)
+
+ assert result == expected_result
+
+ # The main public client should work the same way
+ client2 = PublicApiDepositClient(url=base_url, auth=("test", "test"))
+ result2 = client2.deposit_list(collection)
+
+ assert result2 == expected_result
+
+ assert requests_mock.called
+ request_history = [m.url for m in requests_mock.request_history]
+ assert request_history == [url] * 2
+
+
+def test_client_collection_list_with_pagination_headers(requests_mock, atom_dataset):
+ collection_list_xml_page1 = atom_dataset["entry-list-deposits-page1"]
+ collection_list_xml_page2 = atom_dataset["entry-list-deposits-page2"]
+ base_url = "https://deposit.test.list/1"
+ collection = "test"
+ url = f"{base_url}/{collection}/"
+ page1 = 1
+ page2 = 2
+ page_size = 10
+ url_page1 = f"{url}?page={page1}"
+ url_page2 = f"{url}?page={page2}&page_size={page_size}"
+ requests_mock.get(
+ url_page1,
+ status_code=200,
+ text=collection_list_xml_page1,
+ headers={"Link": to_header_link(url_page2, "next"),},
+ )
+ requests_mock.get(
+ url_page2,
+ status_code=200,
+ text=collection_list_xml_page2,
+ headers={"Link": to_header_link(url_page1, "previous"),},
+ )
+
+ expected_result_page1 = {
+ "count": "3",
+ "deposits": [EXPECTED_DEPOSIT, EXPECTED_DEPOSIT2],
+ "next": url_page2,
+ }
+ expected_result_page2 = {
+ "count": "3",
+ "deposits": [EXPECTED_DEPOSIT3],
+ "previous": url_page1,
+ }
+
+ client = CollectionListDepositClient(
+ url="https://deposit.test.list/1", auth=("test", "test")
+ )
+ client2 = PublicApiDepositClient(url=base_url, auth=("test", "test"))
+
+ result = client.execute(collection, page=page1)
+ assert result == expected_result_page1
+
+ result2 = client.execute(collection, page=page2, page_size=page_size)
+ assert result2 == expected_result_page2
+
+ # The main public client should work the same way
+ result = client2.deposit_list(collection, page=page1)
+ assert result == expected_result_page1
+
+ result2 = client2.deposit_list(collection, page=page2, page_size=page_size)
+ assert result2 == expected_result_page2
+
+ assert requests_mock.called
+ request_history = [m.url for m in requests_mock.request_history]
+ assert request_history == [url_page1, url_page2] * 2
diff --git a/swh/deposit/utils.py b/swh/deposit/utils.py
index 0bb94c86..2e01de8c 100644
--- a/swh/deposit/utils.py
+++ b/swh/deposit/utils.py
@@ -1,240 +1,253 @@
# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from types import GeneratorType
from typing import Any, Dict, Optional, Union
import iso8601
import xmltodict
from swh.model.exceptions import ValidationError
from swh.model.identifiers import (
ExtendedSWHID,
ObjectType,
QualifiedSWHID,
normalize_timestamp,
)
logger = logging.getLogger(__name__)
def parse_xml(stream, encoding="utf-8"):
namespaces = {
"http://www.w3.org/2005/Atom": "atom",
"http://www.w3.org/2007/app": "app",
"http://purl.org/dc/terms/": "dc",
"https://doi.org/10.5063/SCHEMA/CODEMETA-2.0": "codemeta",
"http://purl.org/net/sword/terms/": "sword",
"https://www.softwareheritage.org/schema/2018/deposit": "swh",
}
data = xmltodict.parse(
stream,
encoding=encoding,
namespaces=namespaces,
process_namespaces=True,
dict_constructor=dict,
)
if "atom:entry" in data:
data = data["atom:entry"]
return data
def merge(*dicts):
"""Given an iterator of dicts, merge them losing no information.
Args:
*dicts: arguments are all supposed to be dict to merge into one
Returns:
dict merged without losing information
"""
def _extend(existing_val, value):
"""Given an existing value and a value (as potential lists), merge
them together without repetition.
"""
if isinstance(value, (list, map, GeneratorType)):
vals = value
else:
vals = [value]
for v in vals:
if v in existing_val:
continue
existing_val.append(v)
return existing_val
d = {}
for data in dicts:
if not isinstance(data, dict):
raise ValueError("dicts is supposed to be a variable arguments of dict")
for key, value in data.items():
existing_val = d.get(key)
if not existing_val:
d[key] = value
continue
if isinstance(existing_val, (list, map, GeneratorType)):
new_val = _extend(existing_val, value)
elif isinstance(existing_val, dict):
if isinstance(value, dict):
new_val = merge(existing_val, value)
else:
new_val = _extend([existing_val], value)
else:
new_val = _extend([existing_val], value)
d[key] = new_val
return d
def normalize_date(date):
"""Normalize date fields as expected by swh workers.
If date is a list, elect arbitrarily the first element of that
list
If date is (then) a string, parse it through
dateutil.parser.parse to extract a datetime.
Then normalize it through
swh.model.identifiers.normalize_timestamp.
Returns
The swh date object
"""
if isinstance(date, list):
date = date[0]
if isinstance(date, str):
date = iso8601.parse_date(date)
return normalize_timestamp(date)
def compute_metadata_context(swhid_reference: QualifiedSWHID) -> Dict[str, Any]:
"""Given a SWHID object, determine the context as a dict.
"""
metadata_context: Dict[str, Any] = {"origin": None}
if swhid_reference.qualifiers():
metadata_context = {
"origin": swhid_reference.origin,
"path": swhid_reference.path,
}
snapshot = swhid_reference.visit
if snapshot:
metadata_context["snapshot"] = snapshot
anchor = swhid_reference.anchor
if anchor:
metadata_context[anchor.object_type.name.lower()] = anchor
return metadata_context
ALLOWED_QUALIFIERS_NODE_TYPE = (
ObjectType.SNAPSHOT,
ObjectType.REVISION,
ObjectType.RELEASE,
ObjectType.DIRECTORY,
)
def parse_swh_reference(metadata: Dict,) -> Optional[Union[QualifiedSWHID, str]]:
"""Parse swh reference within the metadata dict (or origin) reference if found,
None otherwise.
.. code-block:: xml
or:
.. code-block:: xml
Args:
metadata: result of parsing an Atom document with :func:`parse_xml`
Raises:
ValidationError in case the swhid referenced (if any) is invalid
Returns:
Either swhid or origin reference if any. None otherwise.
""" # noqa
swh_deposit = metadata.get("swh:deposit")
if not swh_deposit:
return None
swh_reference = swh_deposit.get("swh:reference")
if not swh_reference:
return None
swh_origin = swh_reference.get("swh:origin")
if swh_origin:
url = swh_origin.get("@url")
if url:
return url
swh_object = swh_reference.get("swh:object")
if not swh_object:
return None
swhid = swh_object.get("@swhid")
if not swhid:
return None
swhid_reference = QualifiedSWHID.from_string(swhid)
if swhid_reference.qualifiers():
anchor = swhid_reference.anchor
if anchor:
if anchor.object_type not in ALLOWED_QUALIFIERS_NODE_TYPE:
error_msg = (
"anchor qualifier should be a core SWHID with type one of "
f"{', '.join(t.name.lower() for t in ALLOWED_QUALIFIERS_NODE_TYPE)}"
)
raise ValidationError(error_msg)
visit = swhid_reference.visit
if visit:
if visit.object_type != ObjectType.SNAPSHOT:
raise ValidationError(
f"visit qualifier should be a core SWHID with type snp, "
f"not {visit.object_type.value}"
)
if (
visit
and anchor
and visit.object_type == ObjectType.SNAPSHOT
and anchor.object_type == ObjectType.SNAPSHOT
):
logger.warn(
"SWHID use of both anchor and visit targeting "
f"a snapshot: {swhid_reference}"
)
raise ValidationError(
"'anchor=swh:1:snp:' is not supported when 'visit' is also provided."
)
return swhid_reference
def extended_swhid_from_qualified(swhid: QualifiedSWHID) -> ExtendedSWHID:
"""Used to get the target of a metadata object from a ,
as the latter uses a QualifiedSWHID."""
return ExtendedSWHID.from_string(str(swhid).split(";")[0])
+
+
+def to_header_link(link: str, link_name: str) -> str:
+ """Build a single header link.
+
+ >>> link_next = to_header_link("next-url", "next")
+ >>> link_next
+ '; rel="next"'
+ >>> ','.join([link_next, to_header_link("prev-url", "prev")])
+ '; rel="next",; rel="prev"'
+
+ """
+ return f'<{link}>; rel="{link_name}"'