diff --git a/swh/deposit/client.py b/swh/deposit/client.py index f1dcc7ac..50255f0f 100644 --- a/swh/deposit/client.py +++ b/swh/deposit/client.py @@ -1,753 +1,846 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Module in charge of defining an swh-deposit client """ import hashlib import logging import os from typing import Any, Dict, Optional, Tuple from urllib.parse import urljoin import warnings import requests from requests import Response +from requests.utils import parse_header_links from swh.core.config import load_from_envvar from swh.deposit import __version__ as swh_deposit_version from swh.deposit.utils import parse_xml logger = logging.getLogger(__name__) def compute_unified_information( collection: str, in_progress: bool, slug: str, *, filepath: Optional[str] = None, swhid: Optional[str] = None, **kwargs, ) -> Dict[str, Any]: """Given a filepath, compute necessary information on that file. Args: collection: Deposit collection in_progress: do we finalize the deposit? slug: external id to use filepath: Path to the file to compute the necessary information out of swhid: Deposit swhid if any Returns: dict with keys: 'slug': external id to use 'in_progress': do we finalize the deposit? 'content-type': content type associated 'md5sum': md5 sum 'filename': filename 'filepath': filepath 'swhid': deposit swhid """ result: Dict[str, Any] = { "slug": slug, "in_progress": in_progress, "swhid": swhid, } content_type: Optional[str] = None md5sum: Optional[str] = None if filepath: filename = os.path.basename(filepath) md5sum = hashlib.md5(open(filepath, "rb").read()).hexdigest() extension = filename.split(".")[-1] if "zip" in extension: content_type = "application/zip" else: content_type = "application/x-tar" result.update( { "content-type": content_type, "md5sum": md5sum, "filename": filename, "filepath": filepath, } ) return result class MaintenanceError(ValueError): """Informational maintenance error exception """ pass def handle_deprecated_config(config: Dict) -> Tuple[str, Optional[Tuple[str, str]]]: warnings.warn( '"config" argument is deprecated, please ' 'use "url" and "auth" arguments instead; note that "auth" ' "expects now a couple (username, password) and not a dict.", DeprecationWarning, ) url: str = config["url"] auth: Optional[Tuple[str, str]] = None if config.get("auth"): auth = (config["auth"]["username"], config["auth"]["password"]) return (url, auth) class BaseApiDepositClient: """Deposit client base class """ def __init__( self, config: Optional[Dict] = None, url: Optional[str] = None, auth: Optional[Tuple[str, str]] = None, ): if not url and not config: config = load_from_envvar() if config: url, auth = handle_deprecated_config(config) # needed to help mypy not be fooled by the Optional nature of url assert url is not None self.base_url = url.strip("/") + "/" self.auth = auth self.session = requests.Session() if auth: self.session.auth = auth self.session.headers.update( {"user-agent": f"swh-deposit/{swh_deposit_version}"} ) def do(self, method, url, *args, **kwargs): """Internal method to deal with requests, possibly with basic http authentication. Args: method (str): supported http methods as in self._methods' keys Returns: The request's execution """ full_url = urljoin(self.base_url, url.lstrip("/")) return self.session.request(method, full_url, *args, **kwargs) class PrivateApiDepositClient(BaseApiDepositClient): """Private API deposit client to: - read a given deposit's archive(s) - read a given deposit's metadata - update a given deposit's status """ def archive_get(self, archive_update_url: str, archive: str) -> Optional[str]: """Retrieve the archive from the deposit to a local directory. Args: archive_update_url (str): The full deposit archive(s)'s raw content to retrieve locally archive (str): the local archive's path where to store the raw content Returns: The archive path to the local archive to load. Or None if any problem arose. """ response = self.do("get", archive_update_url, stream=True) if response.ok: with open(archive, "wb") as f: for chunk in response.iter_content(): f.write(chunk) return archive msg = "Problem when retrieving deposit archive at %s" % (archive_update_url,) logger.error(msg) raise ValueError(msg) def metadata_get(self, metadata_url): """Retrieve the metadata information on a given deposit. Args: metadata_url (str): The full deposit metadata url to retrieve locally Returns: The dictionary of metadata for that deposit or None if any problem arose. """ r = self.do("get", metadata_url) if r.ok: return r.json() msg = "Problem when retrieving metadata at %s" % metadata_url logger.error(msg) raise ValueError(msg) def status_update( self, update_status_url, status, revision_id=None, directory_id=None, origin_url=None, ): """Update the deposit's status. Args: update_status_url (str): the full deposit's archive status (str): The status to update the deposit with revision_id (str/None): the revision's identifier to update to directory_id (str/None): the directory's identifier to update to origin_url (str/None): deposit's associated origin url """ payload = {"status": status} if revision_id: payload["revision_id"] = revision_id if directory_id: payload["directory_id"] = directory_id if origin_url: payload["origin_url"] = origin_url self.do("put", update_status_url, json=payload) def check(self, check_url): """Check the deposit's associated data (metadata, archive(s)) Args: check_url (str): the full deposit's check url """ r = self.do("get", check_url) if r.ok: data = r.json() return data["status"] msg = "Problem when checking deposit %s" % check_url logger.error(msg) raise ValueError(msg) class BaseDepositClient(BaseApiDepositClient): """Base Deposit client to access the public api. """ def __init__( self, config=None, url=None, auth=None, error_msg=None, empty_result={} ): super().__init__(url=url, auth=auth, config=config) self.error_msg = error_msg self.empty_result = empty_result def compute_url(self, *args, **kwargs): """Compute api url endpoint to query.""" raise NotImplementedError def compute_method(self, *args, **kwargs): """Http method to use on the url""" raise NotImplementedError - def parse_result_ok(self, xml_content: str) -> Dict[str, Any]: + def parse_result_ok( + self, xml_content: str, headers: Optional[Dict] = None + ) -> Dict[str, Any]: """Given an xml result from the api endpoint, parse it and returns a dict. """ raise NotImplementedError def compute_information(self, *args, **kwargs) -> Dict[str, Any]: """Compute some more information given the inputs (e.g http headers, ...) """ return {} def parse_result_error(self, xml_content: str) -> Dict[str, Any]: """Given an error response in xml, parse it into a dict. Returns: dict with following keys: 'error': The error message 'detail': Some more detail about the error if any """ data = parse_xml(xml_content) sword_error = data["sword:error"] return { "summary": sword_error["atom:summary"], "detail": sword_error.get("detail", ""), "sword:verboseDescription": sword_error.get("sword:verboseDescription", ""), } - def do_execute(self, method: str, url: str, info: Dict) -> Response: + def do_execute(self, method: str, url: str, info: Dict, **kwargs) -> Response: """Execute the http query to url using method and info information. - By default, execute a simple query to url with the http - method. Override this in daughter class to improve the - default behavior if needed. + By default, execute a simple query to url with the http method. Override this in + subclass to improve the default behavior if needed. """ - return self.do(method, url) + return self.do(method, url, **kwargs) + + def compute_params(self, **kwargs) -> Dict[str, Any]: + """Determine the params out of the kwargs""" + return {} def execute(self, *args, **kwargs) -> Dict[str, Any]: """Main endpoint to prepare and execute the http query to the api. Raises: MaintenanceError if some api maintenance is happening. Returns: Dict of computed api data """ url = self.compute_url(*args, **kwargs) method = self.compute_method(*args, **kwargs) info = self.compute_information(*args, **kwargs) + params = self.compute_params(**kwargs) try: - response = self.do_execute(method, url, info) + response = self.do_execute(method, url, info, params=params) except Exception as e: msg = self.error_msg % (url, e) result = self.empty_result result.update( {"error": msg,} ) return result else: if response.ok: if int(response.status_code) == 204: # 204 returns no body return {"status": response.status_code} else: - return self.parse_result_ok(response.text) + headers = dict(response.headers) if response.headers else None + return self.parse_result_ok(response.text, headers) else: error = self.parse_result_error(response.text) empty = self.empty_result error.update(empty) if response.status_code == 503: summary = error.get("summary") detail = error.get("sword:verboseDescription") # Maintenance error if summary and detail: raise MaintenanceError(f"{summary}: {detail}") error.update( {"status": response.status_code,} ) return error class ServiceDocumentDepositClient(BaseDepositClient): """Service Document information retrieval. """ def __init__(self, config=None, url=None, auth=None): super().__init__( url=url, auth=auth, config=config, error_msg="Service document failure at %s: %s", empty_result={"collection": None}, ) def compute_url(self, *args, **kwargs): return "/servicedocument/" def compute_method(self, *args, **kwargs): return "get" - def parse_result_ok(self, xml_content: str) -> Dict[str, Any]: + def parse_result_ok( + self, xml_content: str, headers: Optional[Dict] = None + ) -> Dict[str, Any]: """Parse service document's success response. """ return parse_xml(xml_content) def parse_result_error(self, xml_content: str) -> Dict[str, Any]: result = super().parse_result_error(xml_content) return {"error": result["summary"]} class StatusDepositClient(BaseDepositClient): """Status information on a deposit. """ def __init__(self, config=None, url=None, auth=None): super().__init__( url=url, auth=auth, config=config, error_msg="Status check failure at %s: %s", empty_result={ "deposit_status": None, "deposit_status_detail": None, "deposit_swh_id": None, }, ) def compute_url(self, collection, deposit_id): return "/%s/%s/status/" % (collection, deposit_id) def compute_method(self, *args, **kwargs): return "get" - def parse_result_ok(self, xml_content: str) -> Dict[str, Any]: + def parse_result_ok( + self, xml_content: str, headers: Optional[Dict] = None + ) -> Dict[str, Any]: """Given an xml content as string, returns a deposit dict. """ data = parse_xml(xml_content) keys = [ "deposit_id", "deposit_status", "deposit_status_detail", "deposit_swh_id", "deposit_swh_id_context", "deposit_external_id", ] return {key: data.get("swh:" + key) for key in keys} +class CollectionListDepositClient(BaseDepositClient): + """List a collection of deposits (owned by a user) + + """ + + def __init__(self, config=None, url=None, auth=None): + super().__init__( + url=url, + auth=auth, + config=config, + error_msg="List deposits failure at %s: %s", + empty_result={}, + ) + + def compute_url(self, collection, **kwargs): + return f"/{collection}/" + + def compute_method(self, *args, **kwargs): + return "get" + + def compute_params(self, **kwargs) -> Dict[str, Any]: + """Transmit pagination params if values provided are not None + (e.g. page, page_size) + + """ + return {k: v for k, v in kwargs.items() if v is not None} + + def parse_result_ok( + self, xml_content: str, headers: Optional[Dict] = None + ) -> Dict[str, Any]: + """Given an xml content as string, returns a deposit dict. + + """ + link_header = headers.get("Link", "") if headers else "" + links = parse_header_links(link_header) + data = parse_xml(xml_content)["atom:feed"] + total_result = data.get("swh:count", 0) + keys = [ + "id", + "reception_date", + "complete_date", + "external_id", + "swhid", + "status", + "status_detail", + "swhid_context", + "origin_url", + ] + entries_ = data.get("atom:entry", []) + entries = [entries_] if isinstance(entries_, dict) else entries_ + deposits_d = [ + { + key: deposit.get(f"swh:{key}") + for key in keys + if deposit.get(f"swh:{key}") is not None + } + for deposit in entries + ] + + return { + "count": total_result, + "deposits": deposits_d, + **{entry["rel"]: entry["url"] for entry in links}, + } + + class BaseCreateDepositClient(BaseDepositClient): """Deposit client base class to post new deposit. """ def __init__(self, config=None, url=None, auth=None): super().__init__( url=url, auth=auth, config=config, error_msg="Post Deposit failure at %s: %s", empty_result={"swh:deposit_id": None, "swh:deposit_status": None,}, ) def compute_url(self, collection, *args, **kwargs): return "/%s/" % collection def compute_method(self, *args, **kwargs): return "post" - def parse_result_ok(self, xml_content: str) -> Dict[str, Any]: + def parse_result_ok( + self, xml_content: str, headers: Optional[Dict] = None + ) -> Dict[str, Any]: """Given an xml content as string, returns a deposit dict. """ data = parse_xml(xml_content) keys = [ "deposit_id", "deposit_status", "deposit_status_detail", "deposit_date", ] return {key: data.get("swh:" + key) for key in keys} def compute_headers(self, info: Dict[str, Any]) -> Dict[str, Any]: return info - def do_execute(self, method, url, info): + def do_execute(self, method, url, info, **kwargs): with open(info["filepath"], "rb") as f: return self.do(method, url, data=f, headers=info["headers"]) class CreateArchiveDepositClient(BaseCreateDepositClient): """Post an archive (binary) deposit client.""" def compute_headers(self, info): headers = { "CONTENT_MD5": info["md5sum"], "IN-PROGRESS": str(info["in_progress"]), "CONTENT-TYPE": info["content-type"], "CONTENT-DISPOSITION": "attachment; filename=%s" % (info["filename"],), } if "slug" in info: headers["SLUG"] = info["slug"] return headers def compute_information(self, *args, **kwargs) -> Dict[str, Any]: info = compute_unified_information( *args, filepath=kwargs["archive_path"], **kwargs ) info["headers"] = self.compute_headers(info) return info class UpdateArchiveDepositClient(CreateArchiveDepositClient): """Update (add/replace) an archive (binary) deposit client.""" def compute_url(self, collection, *args, deposit_id=None, **kwargs): return "/%s/%s/media/" % (collection, deposit_id) def compute_method(self, *args, replace=False, **kwargs): return "put" if replace else "post" class CreateMetadataDepositClient(BaseCreateDepositClient): """Post a metadata deposit client.""" def compute_headers(self, info): headers = { "IN-PROGRESS": str(info["in_progress"]), "CONTENT-TYPE": "application/atom+xml;type=entry", } if "slug" in info: headers["SLUG"] = info["slug"] return headers def compute_information(self, *args, **kwargs) -> Dict[str, Any]: info = compute_unified_information( *args, filepath=kwargs["metadata_path"], **kwargs ) info["headers"] = self.compute_headers(info) return info class UpdateMetadataOnPartialDepositClient(CreateMetadataDepositClient): """Update (add/replace) metadata on partial deposit scenario.""" def compute_url(self, collection, *args, deposit_id=None, **kwargs): return f"/{collection}/{deposit_id}/metadata/" def compute_method(self, *args, replace: bool = False, **kwargs) -> str: return "put" if replace else "post" class UpdateMetadataOnDoneDepositClient(CreateMetadataDepositClient): """Update metadata on "done" deposit. This requires the deposit swhid.""" def compute_url(self, collection, *args, deposit_id=None, **kwargs): return f"/{collection}/{deposit_id}/atom/" def compute_headers(self, info: Dict[str, Any]) -> Dict[str, Any]: return { "CONTENT-TYPE": "application/atom+xml;type=entry", "X_CHECK_SWHID": info["swhid"], } def compute_method(self, *args, **kwargs) -> str: return "put" class CreateMetadataOnlyDepositClient(BaseCreateDepositClient): """Create metadata-only deposit.""" def compute_information(self, *args, **kwargs) -> Dict[str, Any]: return { "headers": {"CONTENT-TYPE": "application/atom+xml;type=entry",}, "filepath": kwargs["metadata_path"], } - def parse_result_ok(self, xml_content: str) -> Dict[str, Any]: + def parse_result_ok( + self, xml_content: str, headers: Optional[Dict] = None + ) -> Dict[str, Any]: """Given an xml content as string, returns a deposit dict. """ data = parse_xml(xml_content) keys = [ "deposit_id", "deposit_status", "deposit_date", ] return {key: data.get("swh:" + key) for key in keys} class CreateMultipartDepositClient(BaseCreateDepositClient): """Create a multipart deposit client.""" def _multipart_info(self, info, info_meta): files = [ ( "file", (info["filename"], open(info["filepath"], "rb"), info["content-type"]), ), ( "atom", ( info_meta["filename"], open(info_meta["filepath"], "rb"), "application/atom+xml", ), ), ] headers = { "CONTENT_MD5": info["md5sum"], "IN-PROGRESS": str(info["in_progress"]), } if "slug" in info: headers["SLUG"] = info["slug"] return files, headers def compute_information(self, *args, **kwargs) -> Dict[str, Any]: info = compute_unified_information(*args, filepath=kwargs["archive_path"],) info_meta = compute_unified_information( *args, filepath=kwargs["metadata_path"], ) files, headers = self._multipart_info(info, info_meta) return {"files": files, "headers": headers} - def do_execute(self, method, url, info): + def do_execute(self, method, url, info, **kwargs): return self.do(method, url, files=info["files"], headers=info["headers"]) class UpdateMultipartDepositClient(CreateMultipartDepositClient): """Update a multipart deposit client.""" def compute_url(self, collection, *args, deposit_id=None, **kwargs): return "/%s/%s/metadata/" % (collection, deposit_id) def compute_method(self, *args, replace=False, **kwargs): return "put" if replace else "post" class PublicApiDepositClient(BaseApiDepositClient): """Public api deposit client.""" def service_document(self): """Retrieve service document endpoint's information.""" return ServiceDocumentDepositClient(url=self.base_url, auth=self.auth).execute() def deposit_status(self, collection: str, deposit_id: int): """Retrieve status information on a deposit.""" return StatusDepositClient(url=self.base_url, auth=self.auth).execute( collection, deposit_id ) + def deposit_list( + self, + collection: str, + page: Optional[int] = None, + page_size: Optional[int] = None, + ): + """List deposits from the collection""" + return CollectionListDepositClient(url=self.base_url, auth=self.auth).execute( + collection, page=page, page_size=page_size + ) + def deposit_create( self, collection: str, slug: Optional[str], archive: Optional[str] = None, metadata: Optional[str] = None, in_progress: bool = False, ): """Create a new deposit (archive, metadata, both as multipart).""" if archive and not metadata: return CreateArchiveDepositClient( url=self.base_url, auth=self.auth ).execute(collection, in_progress, slug, archive_path=archive) elif not archive and metadata: return CreateMetadataDepositClient( url=self.base_url, auth=self.auth ).execute(collection, in_progress, slug, metadata_path=metadata) else: return CreateMultipartDepositClient( url=self.base_url, auth=self.auth ).execute( collection, in_progress, slug, archive_path=archive, metadata_path=metadata, ) def deposit_update( self, collection: str, deposit_id: int, slug: Optional[str], archive: Optional[str] = None, metadata: Optional[str] = None, in_progress: bool = False, replace: bool = False, swhid: Optional[str] = None, ): """Update (add/replace) existing deposit (archive, metadata, both).""" response = self.deposit_status(collection, deposit_id) if "error" in response: return response status = response["deposit_status"] if swhid is None and status != "partial": return { "error": "You can only act on deposit with status 'partial'", "detail": f"The deposit {deposit_id} has status '{status}'", "deposit_status": status, "deposit_id": deposit_id, } if swhid is not None and status != "done": return { "error": "You can only update metadata on deposit with status 'done'", "detail": f"The deposit {deposit_id} has status '{status}'", "deposit_status": status, "deposit_id": deposit_id, } if archive and not metadata: result = UpdateArchiveDepositClient( url=self.base_url, auth=self.auth ).execute( collection, in_progress, slug, deposit_id=deposit_id, archive_path=archive, replace=replace, ) elif not archive and metadata and swhid is None: result = UpdateMetadataOnPartialDepositClient( url=self.base_url, auth=self.auth ).execute( collection, in_progress, slug, deposit_id=deposit_id, metadata_path=metadata, replace=replace, ) elif not archive and metadata and swhid is not None: result = UpdateMetadataOnDoneDepositClient( url=self.base_url, auth=self.auth ).execute( collection, in_progress, slug, deposit_id=deposit_id, metadata_path=metadata, swhid=swhid, ) else: result = UpdateMultipartDepositClient( url=self.base_url, auth=self.auth ).execute( collection, in_progress, slug, deposit_id=deposit_id, archive_path=archive, metadata_path=metadata, replace=replace, ) if "error" in result: return result return self.deposit_status(collection, deposit_id) def deposit_metadata_only( self, collection: str, metadata: Optional[str] = None, ): assert metadata is not None return CreateMetadataOnlyDepositClient( url=self.base_url, auth=self.auth ).execute(collection, metadata_path=metadata) diff --git a/swh/deposit/tests/data/atom/entry-list-deposits-page1.xml b/swh/deposit/tests/data/atom/entry-list-deposits-page1.xml new file mode 100644 index 00000000..d094aab2 --- /dev/null +++ b/swh/deposit/tests/data/atom/entry-list-deposits-page1.xml @@ -0,0 +1,18 @@ + + 3 + + 1031 + rejected + Deposit without archive + check-deposit-2020-10-09T13:10:00.000000 + + + 1032 + rejected + Deposit without archive + check-deposit-2020-10-10T13:20:00.000000 + + diff --git a/swh/deposit/tests/data/atom/entry-list-deposits-page2.xml b/swh/deposit/tests/data/atom/entry-list-deposits-page2.xml new file mode 100644 index 00000000..eba33ad8 --- /dev/null +++ b/swh/deposit/tests/data/atom/entry-list-deposits-page2.xml @@ -0,0 +1,16 @@ + + 3 + + 1033 + 2020-10-08T13:50:30 + 2020-10-08T13:52:34.509655 + done + The deposit has been successfully loaded into the Software Heritage archive + swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea + swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea;origin=https://www.softwareheritage.org/check-deposit-2020-10-08T13:52:34.509655;visit=swh:1:snp:c477c6ef51833127b13a86ece7d75e5b3cc4e93d;anchor=swh:1:rev:f26f3960c175f15f6e24200171d446b86f6f7230;path=/ + check-deposit-2020-10-08T13:52:34.509655 + + diff --git a/swh/deposit/tests/data/atom/entry-list-deposits.xml b/swh/deposit/tests/data/atom/entry-list-deposits.xml new file mode 100644 index 00000000..e830bd33 --- /dev/null +++ b/swh/deposit/tests/data/atom/entry-list-deposits.xml @@ -0,0 +1,28 @@ + + 3 + + 1031 + rejected + Deposit without archive + check-deposit-2020-10-09T13:10:00.000000 + + + 1032 + rejected + Deposit without archive + check-deposit-2020-10-10T13:20:00.000000 + + + 1033 + 2020-10-08T13:50:30 + 2020-10-08T13:52:34.509655 + done + The deposit has been successfully loaded into the Software Heritage archive + swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea + swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea;origin=https://www.softwareheritage.org/check-deposit-2020-10-08T13:52:34.509655;visit=swh:1:snp:c477c6ef51833127b13a86ece7d75e5b3cc4e93d;anchor=swh:1:rev:f26f3960c175f15f6e24200171d446b86f6f7230;path=/ + check-deposit-2020-10-08T13:52:34.509655 + + diff --git a/swh/deposit/tests/test_client_module.py b/swh/deposit/tests/test_client_module.py index 545641b1..03e64c1c 100644 --- a/swh/deposit/tests/test_client_module.py +++ b/swh/deposit/tests/test_client_module.py @@ -1,96 +1,215 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # Ensure the gist of the BaseDepositClient.execute works as expected in corner cases The # following tests uses the ServiceDocumentDepositClient and StatusDepositClient because # they are BaseDepositClient subclasses. We could have used other classes but those ones # got elected as they are fairly simple ones. import pytest from swh.deposit.client import ( + CollectionListDepositClient, MaintenanceError, + PublicApiDepositClient, ServiceDocumentDepositClient, StatusDepositClient, ) +from swh.deposit.utils import to_header_link def test_client_read_data_ok(requests_mock_datadir): client = ServiceDocumentDepositClient( url="https://deposit.swh.test/1", auth=("test", "test") ) result = client.execute() assert isinstance(result, dict) collection = result["app:service"]["app:workspace"]["app:collection"] assert collection["sword:name"] == "test" def test_client_read_data_fails(mocker): mock = mocker.patch("swh.deposit.client.BaseDepositClient.do_execute") mock.side_effect = ValueError("here comes trouble") client = ServiceDocumentDepositClient( url="https://deposit.swh.test/1", auth=("test", "test") ) result = client.execute() assert isinstance(result, dict) assert "error" in result assert mock.called def test_client_read_data_no_result(requests_mock): url = "https://deposit.swh.test/1" requests_mock.get(f"{url}/servicedocument/", status_code=204) client = ServiceDocumentDepositClient( url="https://deposit.swh.test/1", auth=("test", "test") ) result = client.execute() assert isinstance(result, dict) assert result == {"status": 204} def test_client_read_data_collection_error_503(requests_mock, atom_dataset): error_content = atom_dataset["error-cli"].format( summary="forbidden", verboseDescription="Access restricted", ) url = "https://deposit.swh.test/1" requests_mock.get(f"{url}/servicedocument/", status_code=503, text=error_content) client = ServiceDocumentDepositClient( url="https://deposit.swh.test/1", auth=("test", "test") ) result = client.execute() assert isinstance(result, dict) assert result == { "error": "forbidden", "status": 503, "collection": None, } def test_client_read_data_status_error_503(requests_mock, atom_dataset): error_content = atom_dataset["error-cli"].format( summary="forbidden", verboseDescription="Access restricted", ) collection = "test" deposit_id = 1 url = "https://deposit.swh.test/1" requests_mock.get( f"{url}/{collection}/{deposit_id}/status/", status_code=503, text=error_content ) client = StatusDepositClient( url="https://deposit.swh.test/1", auth=("test", "test") ) with pytest.raises(MaintenanceError, match="forbidden"): client.execute(collection, deposit_id) + + +EXPECTED_DEPOSIT = { + "id": "1031", + "external_id": "check-deposit-2020-10-09T13:10:00.000000", + "status": "rejected", + "status_detail": "Deposit without archive", +} + +EXPECTED_DEPOSIT2 = { + "id": "1032", + "external_id": "check-deposit-2020-10-10T13:20:00.000000", + "status": "rejected", + "status_detail": "Deposit without archive", +} + +EXPECTED_DEPOSIT3 = { + "id": "1033", + "external_id": "check-deposit-2020-10-08T13:52:34.509655", + "status": "done", + "status_detail": ( + "The deposit has been successfully loaded into the Software " "Heritage archive" + ), + "reception_date": "2020-10-08T13:50:30", + "complete_date": "2020-10-08T13:52:34.509655", + "swhid": "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea", + "swhid_context": "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea;origin=https://www.softwareheritage.org/check-deposit-2020-10-08T13:52:34.509655;visit=swh:1:snp:c477c6ef51833127b13a86ece7d75e5b3cc4e93d;anchor=swh:1:rev:f26f3960c175f15f6e24200171d446b86f6f7230;path=/", # noqa +} + + +def test_client_collection_list(requests_mock, atom_dataset): + collection_list_xml = atom_dataset["entry-list-deposits"] + base_url = "https://deposit.test.list/1" + collection = "test" + url = f"{base_url}/{collection}/" + requests_mock.get(url, status_code=200, text=collection_list_xml) + expected_result = { + "count": "3", + "deposits": [EXPECTED_DEPOSIT, EXPECTED_DEPOSIT2, EXPECTED_DEPOSIT3], + } + + # use dedicated client + client = CollectionListDepositClient(url=base_url, auth=("test", "test")) + + # no pagination + result = client.execute(collection) + + assert result == expected_result + + # The main public client should work the same way + client2 = PublicApiDepositClient(url=base_url, auth=("test", "test")) + result2 = client2.deposit_list(collection) + + assert result2 == expected_result + + assert requests_mock.called + request_history = [m.url for m in requests_mock.request_history] + assert request_history == [url] * 2 + + +def test_client_collection_list_with_pagination_headers(requests_mock, atom_dataset): + collection_list_xml_page1 = atom_dataset["entry-list-deposits-page1"] + collection_list_xml_page2 = atom_dataset["entry-list-deposits-page2"] + base_url = "https://deposit.test.list/1" + collection = "test" + url = f"{base_url}/{collection}/" + page1 = 1 + page2 = 2 + page_size = 10 + url_page1 = f"{url}?page={page1}" + url_page2 = f"{url}?page={page2}&page_size={page_size}" + requests_mock.get( + url_page1, + status_code=200, + text=collection_list_xml_page1, + headers={"Link": to_header_link(url_page2, "next"),}, + ) + requests_mock.get( + url_page2, + status_code=200, + text=collection_list_xml_page2, + headers={"Link": to_header_link(url_page1, "previous"),}, + ) + + expected_result_page1 = { + "count": "3", + "deposits": [EXPECTED_DEPOSIT, EXPECTED_DEPOSIT2], + "next": url_page2, + } + expected_result_page2 = { + "count": "3", + "deposits": [EXPECTED_DEPOSIT3], + "previous": url_page1, + } + + client = CollectionListDepositClient( + url="https://deposit.test.list/1", auth=("test", "test") + ) + client2 = PublicApiDepositClient(url=base_url, auth=("test", "test")) + + result = client.execute(collection, page=page1) + assert result == expected_result_page1 + + result2 = client.execute(collection, page=page2, page_size=page_size) + assert result2 == expected_result_page2 + + # The main public client should work the same way + result = client2.deposit_list(collection, page=page1) + assert result == expected_result_page1 + + result2 = client2.deposit_list(collection, page=page2, page_size=page_size) + assert result2 == expected_result_page2 + + assert requests_mock.called + request_history = [m.url for m in requests_mock.request_history] + assert request_history == [url_page1, url_page2] * 2 diff --git a/swh/deposit/utils.py b/swh/deposit/utils.py index 0bb94c86..2e01de8c 100644 --- a/swh/deposit/utils.py +++ b/swh/deposit/utils.py @@ -1,240 +1,253 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from types import GeneratorType from typing import Any, Dict, Optional, Union import iso8601 import xmltodict from swh.model.exceptions import ValidationError from swh.model.identifiers import ( ExtendedSWHID, ObjectType, QualifiedSWHID, normalize_timestamp, ) logger = logging.getLogger(__name__) def parse_xml(stream, encoding="utf-8"): namespaces = { "http://www.w3.org/2005/Atom": "atom", "http://www.w3.org/2007/app": "app", "http://purl.org/dc/terms/": "dc", "https://doi.org/10.5063/SCHEMA/CODEMETA-2.0": "codemeta", "http://purl.org/net/sword/terms/": "sword", "https://www.softwareheritage.org/schema/2018/deposit": "swh", } data = xmltodict.parse( stream, encoding=encoding, namespaces=namespaces, process_namespaces=True, dict_constructor=dict, ) if "atom:entry" in data: data = data["atom:entry"] return data def merge(*dicts): """Given an iterator of dicts, merge them losing no information. Args: *dicts: arguments are all supposed to be dict to merge into one Returns: dict merged without losing information """ def _extend(existing_val, value): """Given an existing value and a value (as potential lists), merge them together without repetition. """ if isinstance(value, (list, map, GeneratorType)): vals = value else: vals = [value] for v in vals: if v in existing_val: continue existing_val.append(v) return existing_val d = {} for data in dicts: if not isinstance(data, dict): raise ValueError("dicts is supposed to be a variable arguments of dict") for key, value in data.items(): existing_val = d.get(key) if not existing_val: d[key] = value continue if isinstance(existing_val, (list, map, GeneratorType)): new_val = _extend(existing_val, value) elif isinstance(existing_val, dict): if isinstance(value, dict): new_val = merge(existing_val, value) else: new_val = _extend([existing_val], value) else: new_val = _extend([existing_val], value) d[key] = new_val return d def normalize_date(date): """Normalize date fields as expected by swh workers. If date is a list, elect arbitrarily the first element of that list If date is (then) a string, parse it through dateutil.parser.parse to extract a datetime. Then normalize it through swh.model.identifiers.normalize_timestamp. Returns The swh date object """ if isinstance(date, list): date = date[0] if isinstance(date, str): date = iso8601.parse_date(date) return normalize_timestamp(date) def compute_metadata_context(swhid_reference: QualifiedSWHID) -> Dict[str, Any]: """Given a SWHID object, determine the context as a dict. """ metadata_context: Dict[str, Any] = {"origin": None} if swhid_reference.qualifiers(): metadata_context = { "origin": swhid_reference.origin, "path": swhid_reference.path, } snapshot = swhid_reference.visit if snapshot: metadata_context["snapshot"] = snapshot anchor = swhid_reference.anchor if anchor: metadata_context[anchor.object_type.name.lower()] = anchor return metadata_context ALLOWED_QUALIFIERS_NODE_TYPE = ( ObjectType.SNAPSHOT, ObjectType.REVISION, ObjectType.RELEASE, ObjectType.DIRECTORY, ) def parse_swh_reference(metadata: Dict,) -> Optional[Union[QualifiedSWHID, str]]: """Parse swh reference within the metadata dict (or origin) reference if found, None otherwise. .. code-block:: xml or: .. code-block:: xml Args: metadata: result of parsing an Atom document with :func:`parse_xml` Raises: ValidationError in case the swhid referenced (if any) is invalid Returns: Either swhid or origin reference if any. None otherwise. """ # noqa swh_deposit = metadata.get("swh:deposit") if not swh_deposit: return None swh_reference = swh_deposit.get("swh:reference") if not swh_reference: return None swh_origin = swh_reference.get("swh:origin") if swh_origin: url = swh_origin.get("@url") if url: return url swh_object = swh_reference.get("swh:object") if not swh_object: return None swhid = swh_object.get("@swhid") if not swhid: return None swhid_reference = QualifiedSWHID.from_string(swhid) if swhid_reference.qualifiers(): anchor = swhid_reference.anchor if anchor: if anchor.object_type not in ALLOWED_QUALIFIERS_NODE_TYPE: error_msg = ( "anchor qualifier should be a core SWHID with type one of " f"{', '.join(t.name.lower() for t in ALLOWED_QUALIFIERS_NODE_TYPE)}" ) raise ValidationError(error_msg) visit = swhid_reference.visit if visit: if visit.object_type != ObjectType.SNAPSHOT: raise ValidationError( f"visit qualifier should be a core SWHID with type snp, " f"not {visit.object_type.value}" ) if ( visit and anchor and visit.object_type == ObjectType.SNAPSHOT and anchor.object_type == ObjectType.SNAPSHOT ): logger.warn( "SWHID use of both anchor and visit targeting " f"a snapshot: {swhid_reference}" ) raise ValidationError( "'anchor=swh:1:snp:' is not supported when 'visit' is also provided." ) return swhid_reference def extended_swhid_from_qualified(swhid: QualifiedSWHID) -> ExtendedSWHID: """Used to get the target of a metadata object from a , as the latter uses a QualifiedSWHID.""" return ExtendedSWHID.from_string(str(swhid).split(";")[0]) + + +def to_header_link(link: str, link_name: str) -> str: + """Build a single header link. + + >>> link_next = to_header_link("next-url", "next") + >>> link_next + '; rel="next"' + >>> ','.join([link_next, to_header_link("prev-url", "prev")]) + '; rel="next",; rel="prev"' + + """ + return f'<{link}>; rel="{link_name}"'