diff --git a/requirements-swh-server.txt b/requirements-swh-server.txt index 5e81fabe..a54273e7 100644 --- a/requirements-swh-server.txt +++ b/requirements-swh-server.txt @@ -1,4 +1,4 @@ -swh.core[http] +swh.core[http] >= 0.4 swh.loader.core >= 0.0.71 swh.scheduler >= 0.0.39 swh.model >= 0.3.8 diff --git a/requirements-swh.txt b/requirements-swh.txt index 9bc67248..d6d8f166 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1 +1 @@ -swh.core[http] >= 0.3 +swh.core[http] >= 0.4 diff --git a/swh/deposit/client.py b/swh/deposit/client.py index a27c166b..edc5f111 100644 --- a/swh/deposit/client.py +++ b/swh/deposit/client.py @@ -1,651 +1,646 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Module in charge of defining an swh-deposit client """ from abc import ABCMeta, abstractmethod import hashlib import logging import os from typing import Any, Dict from urllib.parse import urljoin import requests import xmltodict -from swh.core.config import config_basepath, read_raw_config +from swh.core.config import load_from_envvar logger = logging.getLogger(__name__) class MaintenanceError(ValueError): """Informational maintenance error exception """ pass def _parse(stream, encoding="utf-8"): """Given a xml stream, parse the result. Args: stream (bytes/text): The stream to parse encoding (str): The encoding to use if to decode the bytes stream Returns: A dict of values corresponding to the parsed xml """ if isinstance(stream, bytes): stream = stream.decode(encoding) data = xmltodict.parse(stream, encoding=encoding, process_namespaces=False) if "entry" in data: data = data["entry"] if "sword:error" in data: data = data["sword:error"] return dict(data) def _parse_with_filter(stream, encoding="utf-8", keys=[]): """Given a xml stream, parse the result and filter with keys. Args: stream (bytes/text): The stream to parse encoding (str): The encoding to use if to decode the bytes stream keys ([str]): Keys to filter the parsed result Returns: A dict of values corresponding to the parsed xml filtered by the keys provided. """ data = _parse(stream, encoding=encoding) m = {} for key in keys: m[key] = data.get(key) return m class BaseApiDepositClient: """Deposit client base class """ def __init__(self, config=None, _client=requests): - if config is None: - config_file = os.environ["SWH_CONFIG_FILENAME"] - self.config: Dict[str, Any] = read_raw_config(config_basepath(config_file)) - else: - self.config = config - + self.config: Dict[str, Any] = config or load_from_envvar() self._client = _client self.base_url = self.config["url"].strip("/") + "/" auth = self.config["auth"] if auth == {}: self.auth = None else: self.auth = (auth["username"], auth["password"]) def do(self, method, url, *args, **kwargs): """Internal method to deal with requests, possibly with basic http authentication. Args: method (str): supported http methods as in self._methods' keys Returns: The request's execution """ if hasattr(self._client, method): method_fn = getattr(self._client, method) else: raise ValueError("Development error, unsupported method %s" % (method)) if self.auth: kwargs["auth"] = self.auth full_url = urljoin(self.base_url, url.lstrip("/")) return method_fn(full_url, *args, **kwargs) class PrivateApiDepositClient(BaseApiDepositClient): """Private API deposit client to: - read a given deposit's archive(s) - read a given deposit's metadata - update a given deposit's status """ def archive_get(self, archive_update_url, archive): """Retrieve the archive from the deposit to a local directory. Args: archive_update_url (str): The full deposit archive(s)'s raw content to retrieve locally archive (str): the local archive's path where to store the raw content Returns: The archive path to the local archive to load. Or None if any problem arose. """ r = self.do("get", archive_update_url, stream=True) if r.ok: with open(archive, "wb") as f: for chunk in r.iter_content(): f.write(chunk) return archive msg = "Problem when retrieving deposit archive at %s" % (archive_update_url,) logger.error(msg) raise ValueError(msg) def metadata_get(self, metadata_url): """Retrieve the metadata information on a given deposit. Args: metadata_url (str): The full deposit metadata url to retrieve locally Returns: The dictionary of metadata for that deposit or None if any problem arose. """ r = self.do("get", metadata_url) if r.ok: return r.json() msg = "Problem when retrieving metadata at %s" % metadata_url logger.error(msg) raise ValueError(msg) def status_update( self, update_status_url, status, revision_id=None, directory_id=None, origin_url=None, ): """Update the deposit's status. Args: update_status_url (str): the full deposit's archive status (str): The status to update the deposit with revision_id (str/None): the revision's identifier to update to directory_id (str/None): the directory's identifier to update to origin_url (str/None): deposit's associated origin url """ payload = {"status": status} if revision_id: payload["revision_id"] = revision_id if directory_id: payload["directory_id"] = directory_id if origin_url: payload["origin_url"] = origin_url self.do("put", update_status_url, json=payload) def check(self, check_url): """Check the deposit's associated data (metadata, archive(s)) Args: check_url (str): the full deposit's check url """ r = self.do("get", check_url) if r.ok: data = r.json() return data["status"] msg = "Problem when checking deposit %s" % check_url logger.error(msg) raise ValueError(msg) class BaseDepositClient(BaseApiDepositClient, metaclass=ABCMeta): """Base Deposit client to access the public api. """ def __init__(self, config, error_msg=None, empty_result={}): super().__init__(config) self.error_msg = error_msg self.empty_result = empty_result @abstractmethod def compute_url(self, *args, **kwargs): """Compute api url endpoint to query.""" pass @abstractmethod def compute_method(self, *args, **kwargs): """Http method to use on the url""" pass @abstractmethod def parse_result_ok(self, xml_content): """Given an xml result from the api endpoint, parse it and returns a dict. """ pass def compute_information(self, *args, **kwargs): """Compute some more information given the inputs (e.g http headers, ...) """ return {} def parse_result_error(self, xml_content): """Given an error response in xml, parse it into a dict. Returns: dict with following keys: 'error': The error message 'detail': Some more detail about the error if any """ return _parse_with_filter( xml_content, keys=["summary", "detail", "sword:verboseDescription"] ) def do_execute(self, method, url, info): """Execute the http query to url using method and info information. By default, execute a simple query to url with the http method. Override this in daughter class to improve the default behavior if needed. """ return self.do(method, url) def execute(self, *args, **kwargs) -> Dict[str, Any]: """Main endpoint to prepare and execute the http query to the api. Raises: MaintenanceError if some api maintenance is happening. Returns: Dict of computed api data """ url = self.compute_url(*args, **kwargs) method = self.compute_method(*args, **kwargs) info = self.compute_information(*args, **kwargs) try: r = self.do_execute(method, url, info) except Exception as e: msg = self.error_msg % (url, e) r = self.empty_result r.update( {"error": msg,} ) return r else: if r.ok: if int(r.status_code) == 204: # 204 returns no body return {"status": r.status_code} else: return self.parse_result_ok(r.text) else: error = self.parse_result_error(r.text) empty = self.empty_result error.update(empty) if r.status_code == 503: summary = error.get("summary") detail = error.get("sword:verboseDescription") # Maintenance error if summary and detail: raise MaintenanceError(f"{summary}: {detail}") error.update( {"status": r.status_code,} ) return error class ServiceDocumentDepositClient(BaseDepositClient): """Service Document information retrieval. """ def __init__(self, config): super().__init__( config, error_msg="Service document failure at %s: %s", empty_result={"collection": None}, ) def compute_url(self, *args, **kwargs): return "/servicedocument/" def compute_method(self, *args, **kwargs): return "get" def parse_result_ok(self, xml_content): """Parse service document's success response. """ return _parse(xml_content) class StatusDepositClient(BaseDepositClient): """Status information on a deposit. """ def __init__(self, config): super().__init__( config, error_msg="Status check failure at %s: %s", empty_result={ "deposit_status": None, "deposit_status_detail": None, "deposit_swh_id": None, }, ) def compute_url(self, collection, deposit_id): return "/%s/%s/status/" % (collection, deposit_id) def compute_method(self, *args, **kwargs): return "get" def parse_result_ok(self, xml_content): """Given an xml content as string, returns a deposit dict. """ return _parse_with_filter( xml_content, keys=[ "deposit_id", "deposit_status", "deposit_status_detail", "deposit_swh_id", "deposit_swh_id_context", "deposit_external_id", ], ) class BaseCreateDepositClient(BaseDepositClient): """Deposit client base class to post new deposit. """ def __init__(self, config): super().__init__( config, error_msg="Post Deposit failure at %s: %s", empty_result={"deposit_id": None, "deposit_status": None,}, ) def compute_url(self, collection, *args, **kwargs): return "/%s/" % collection def compute_method(self, *args, **kwargs): return "post" def parse_result_ok(self, xml_content): """Given an xml content as string, returns a deposit dict. """ return _parse_with_filter( xml_content, keys=[ "deposit_id", "deposit_status", "deposit_status_detail", "deposit_date", ], ) def _compute_information( self, collection, filepath, in_progress, slug, is_archive=True ): """Given a filepath, compute necessary information on that file. Args: filepath (str): Path to a file is_archive (bool): is it an archive or not? Returns: dict with keys: 'content-type': content type associated 'md5sum': md5 sum 'filename': filename """ filename = os.path.basename(filepath) if is_archive: md5sum = hashlib.md5(open(filepath, "rb").read()).hexdigest() extension = filename.split(".")[-1] if "zip" in extension: content_type = "application/zip" else: content_type = "application/x-tar" else: content_type = None md5sum = None return { "slug": slug, "in_progress": in_progress, "content-type": content_type, "md5sum": md5sum, "filename": filename, "filepath": filepath, } def compute_information( self, collection, filepath, in_progress, slug, is_archive=True, **kwargs ): info = self._compute_information( collection, filepath, in_progress, slug, is_archive=is_archive ) info["headers"] = self.compute_headers(info) return info def do_execute(self, method, url, info): with open(info["filepath"], "rb") as f: return self.do(method, url, data=f, headers=info["headers"]) class CreateArchiveDepositClient(BaseCreateDepositClient): """Post an archive (binary) deposit client.""" def compute_headers(self, info): return { "SLUG": info["slug"], "CONTENT_MD5": info["md5sum"], "IN-PROGRESS": str(info["in_progress"]), "CONTENT-TYPE": info["content-type"], "CONTENT-DISPOSITION": "attachment; filename=%s" % (info["filename"],), } class UpdateArchiveDepositClient(CreateArchiveDepositClient): """Update (add/replace) an archive (binary) deposit client.""" def compute_url(self, collection, *args, deposit_id=None, **kwargs): return "/%s/%s/media/" % (collection, deposit_id) def compute_method(self, *args, replace=False, **kwargs): return "put" if replace else "post" class CreateMetadataDepositClient(BaseCreateDepositClient): """Post a metadata deposit client.""" def compute_headers(self, info): return { "SLUG": info["slug"], "IN-PROGRESS": str(info["in_progress"]), "CONTENT-TYPE": "application/atom+xml;type=entry", } class UpdateMetadataDepositClient(CreateMetadataDepositClient): """Update (add/replace) a metadata deposit client.""" def compute_url(self, collection, *args, deposit_id=None, **kwargs): return "/%s/%s/metadata/" % (collection, deposit_id) def compute_method(self, *args, replace=False, **kwargs): return "put" if replace else "post" class CreateMultipartDepositClient(BaseCreateDepositClient): """Create a multipart deposit client.""" def _multipart_info(self, info, info_meta): files = [ ( "file", (info["filename"], open(info["filepath"], "rb"), info["content-type"]), ), ( "atom", ( info_meta["filename"], open(info_meta["filepath"], "rb"), "application/atom+xml", ), ), ] headers = { "SLUG": info["slug"], "CONTENT_MD5": info["md5sum"], "IN-PROGRESS": str(info["in_progress"]), } return files, headers def compute_information( self, collection, archive, metadata, in_progress, slug, **kwargs ): info = self._compute_information(collection, archive, in_progress, slug) info_meta = self._compute_information( collection, metadata, in_progress, slug, is_archive=False ) files, headers = self._multipart_info(info, info_meta) return {"files": files, "headers": headers} def do_execute(self, method, url, info): return self.do(method, url, files=info["files"], headers=info["headers"]) class UpdateMultipartDepositClient(CreateMultipartDepositClient): """Update a multipart deposit client.""" def compute_url(self, collection, *args, deposit_id=None, **kwargs): return "/%s/%s/metadata/" % (collection, deposit_id) def compute_method(self, *args, replace=False, **kwargs): return "put" if replace else "post" class PublicApiDepositClient(BaseApiDepositClient): """Public api deposit client.""" def service_document(self): """Retrieve service document endpoint's information.""" return ServiceDocumentDepositClient(self.config).execute() def deposit_status(self, collection, deposit_id): """Retrieve status information on a deposit.""" return StatusDepositClient(self.config).execute(collection, deposit_id) def deposit_create( self, collection, slug, archive=None, metadata=None, in_progress=False ): """Create a new deposit (archive, metadata, both as multipart).""" if archive and not metadata: return CreateArchiveDepositClient(self.config).execute( collection, archive, in_progress, slug ) elif not archive and metadata: return CreateMetadataDepositClient(self.config).execute( collection, metadata, in_progress, slug, is_archive=False ) else: return CreateMultipartDepositClient(self.config).execute( collection, archive, metadata, in_progress, slug ) def deposit_update( self, collection, deposit_id, slug, archive=None, metadata=None, in_progress=False, replace=False, ): """Update (add/replace) existing deposit (archive, metadata, both).""" r = self.deposit_status(collection, deposit_id) if "error" in r: return r status = r["deposit_status"] if status != "partial": return { "error": "You can only act on deposit with status 'partial'", "detail": "The deposit %s has status '%s'" % (deposit_id, status), "deposit_status": status, "deposit_id": deposit_id, } if archive and not metadata: r = UpdateArchiveDepositClient(self.config).execute( collection, archive, in_progress, slug, deposit_id=deposit_id, replace=replace, ) elif not archive and metadata: r = UpdateMetadataDepositClient(self.config).execute( collection, metadata, in_progress, slug, deposit_id=deposit_id, replace=replace, ) else: r = UpdateMultipartDepositClient(self.config).execute( collection, archive, metadata, in_progress, slug, deposit_id=deposit_id, replace=replace, ) if "error" in r: return r return self.deposit_status(collection, deposit_id) diff --git a/swh/deposit/config.py b/swh/deposit/config.py index e67dd030..ec1e0248 100644 --- a/swh/deposit/config.py +++ b/swh/deposit/config.py @@ -1,105 +1,103 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os from typing import Any, Dict from swh.core import config from swh.deposit import __version__ from swh.scheduler import get_scheduler from swh.scheduler.interface import SchedulerInterface # IRIs (Internationalized Resource identifier) sword 2.0 specified EDIT_SE_IRI = "edit_se_iri" EM_IRI = "em_iri" CONT_FILE_IRI = "cont_file_iri" SD_IRI = "servicedocument" COL_IRI = "upload" STATE_IRI = "state_iri" PRIVATE_GET_RAW_CONTENT = "private-download" PRIVATE_CHECK_DEPOSIT = "check-deposit" PRIVATE_PUT_DEPOSIT = "private-update" PRIVATE_GET_DEPOSIT_METADATA = "private-read" PRIVATE_LIST_DEPOSITS = "private-deposit-list" ARCHIVE_KEY = "archive" METADATA_KEY = "metadata" RAW_METADATA_KEY = "raw-metadata" ARCHIVE_TYPE = "archive" METADATA_TYPE = "metadata" AUTHORIZED_PLATFORMS = ["development", "production", "testing"] DEPOSIT_STATUS_REJECTED = "rejected" DEPOSIT_STATUS_PARTIAL = "partial" DEPOSIT_STATUS_DEPOSITED = "deposited" DEPOSIT_STATUS_VERIFIED = "verified" DEPOSIT_STATUS_LOAD_SUCCESS = "done" DEPOSIT_STATUS_LOAD_FAILURE = "failed" # Revision author for deposit SWH_PERSON = { "name": "Software Heritage", "fullname": "Software Heritage", "email": "robot@softwareheritage.org", } DEFAULT_CONFIG = { "max_upload_size": 209715200, "checks": True, } def setup_django_for(platform=None, config_file=None): """Setup function for command line tools (swh.deposit.create_user) to initialize the needed db access. Note: Do not import any django related module prior to this function call. Otherwise, this will raise an django.core.exceptions.ImproperlyConfigured error message. Args: platform (str): the platform the scheduling is running config_file (str): Extra configuration file (typically for the production platform) Raises: ValueError in case of wrong platform inputs. """ if platform is not None: if platform not in AUTHORIZED_PLATFORMS: raise ValueError("Platform should be one of %s" % AUTHORIZED_PLATFORMS) if "DJANGO_SETTINGS_MODULE" not in os.environ: os.environ["DJANGO_SETTINGS_MODULE"] = "swh.deposit.settings.%s" % platform if config_file: os.environ.setdefault("SWH_CONFIG_FILENAME", config_file) import django django.setup() class APIConfig: """API Configuration centralized class. This loads explicitly the configuration file out of the SWH_CONFIG_FILENAME environment variable. """ def __init__(self): - config_file = os.environ["SWH_CONFIG_FILENAME"] - conf = config.read_raw_config(config.config_basepath(config_file)) - self.config: Dict[str, Any] = config.merge_configs(DEFAULT_CONFIG, conf) + self.config: Dict[str, Any] = config.load_from_envvar(DEFAULT_CONFIG) self.scheduler: SchedulerInterface = get_scheduler(**self.config["scheduler"]) self.tool = { "name": "swh-deposit", "version": __version__, "configuration": {"sword_version": "2"}, } diff --git a/swh/deposit/loader/checker.py b/swh/deposit/loader/checker.py index 5e239083..9f491033 100644 --- a/swh/deposit/loader/checker.py +++ b/swh/deposit/loader/checker.py @@ -1,42 +1,38 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging -import os from typing import Any, Dict from swh.core import config from swh.deposit.client import PrivateApiDepositClient logger = logging.getLogger(__name__) class DepositChecker: """Deposit checker implementation. Trigger deposit's checks through the private api. """ def __init__(self): - config_file = os.environ["SWH_CONFIG_FILENAME"] - self.config: Dict[str, Any] = config.read_raw_config( - config.config_basepath(config_file) - ) + self.config: Dict[str, Any] = config.load_from_envvar() self.client = PrivateApiDepositClient(config=self.config["deposit"]) def check(self, collection: str, deposit_id: str) -> Dict[str, str]: status = None deposit_check_url = f"/{collection}/{deposit_id}/check/" logger.debug("deposit-check-url: %s", deposit_check_url) try: r = self.client.check(deposit_check_url) logger.debug("Check result: %s", r) status = "eventful" if r == "verified" else "failed" except Exception: logger.exception("Failure during check on '%s'", deposit_check_url) status = "failed" logger.debug("Check status: %s", status) return {"status": status} diff --git a/swh/deposit/tests/loader/test_client.py b/swh/deposit/tests/loader/test_client.py index 55edd2c7..02f4b495 100644 --- a/swh/deposit/tests/loader/test_client.py +++ b/swh/deposit/tests/loader/test_client.py @@ -1,246 +1,262 @@ -# Copyright (C) 2017-2019 The Software Heritage developers +# Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import os from typing import Any, Callable, Optional import unittest from urllib.parse import urlparse import pytest from swh.deposit.client import PrivateApiDepositClient from swh.deposit.config import DEPOSIT_STATUS_LOAD_FAILURE, DEPOSIT_STATUS_LOAD_SUCCESS + CLIENT_TEST_CONFIG = { "url": "https://nowhere.org/", "auth": {}, # no authentication in test scenario } +@pytest.fixture +def deposit_config(): + return CLIENT_TEST_CONFIG + + +def test_client_config(deposit_config_path): + for client in [ + # config passed as constructor parameter + PrivateApiDepositClient(config=CLIENT_TEST_CONFIG), + # config loaded from environment + PrivateApiDepositClient() + ]: + assert client.config == CLIENT_TEST_CONFIG + + def build_expected_path(datadir, base_url: str, api_url: str) -> str: """Build expected path from api to served file """ url = urlparse(base_url) dirname = "%s_%s" % (url.scheme, url.hostname) if api_url.endswith("/"): api_url = api_url[:-1] if api_url.startswith("/"): api_url = api_url[1:] suffix_path = api_url.replace("/", "_") return os.path.join(datadir, dirname, suffix_path) def test_build_expected_path(datadir): actual_path = build_expected_path(datadir, "http://example.org", "/hello/you/") assert actual_path == os.path.join(datadir, "http_example.org", "hello_you") def read_served_path( datadir, base_url: str, api_url: str, convert_fn: Optional[Callable[[str], Any]] = None, ) -> bytes: """Read served path """ archive_path = build_expected_path(datadir, base_url, api_url) with open(archive_path, "rb") as f: content = f.read() if convert_fn: content = convert_fn(content.decode("utf-8")) return content def test_read_served_path(datadir): actual_content = read_served_path(datadir, "http://example.org", "/hello/you/") assert actual_content == b"hello people\n" actual_content2 = read_served_path( datadir, "http://example.org", "/hello.json", convert_fn=json.loads ) assert actual_content2 == {"a": [1, 3]} # private api to retrieve archive def test_archive_get(tmp_path, datadir, requests_mock_datadir): """Retrieving archive data through private api should stream data """ api_url = "/1/private/test/1/raw/" - client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG) + client = PrivateApiDepositClient(CLIENT_TEST_CONFIG) expected_content = read_served_path(datadir, client.base_url, api_url) archive_path = os.path.join(tmp_path, "test.archive") archive_path = client.archive_get(api_url, archive_path) assert os.path.exists(archive_path) is True with open(archive_path, "rb") as f: actual_content = f.read() assert actual_content == expected_content assert client.base_url == CLIENT_TEST_CONFIG["url"] assert client.auth is None def test_archive_get_auth(tmp_path, datadir, requests_mock_datadir): """Retrieving archive data through private api should stream data """ api_url = "/1/private/test/1/raw/" config = CLIENT_TEST_CONFIG.copy() config["auth"] = { # add authentication setup "username": "user", "password": "pass", } client = PrivateApiDepositClient(config) expected_content = read_served_path(datadir, client.base_url, api_url) archive_path = os.path.join(tmp_path, "test.archive") archive_path = client.archive_get(api_url, archive_path) assert os.path.exists(archive_path) is True with open(archive_path, "rb") as f: actual_content = f.read() assert actual_content == expected_content assert client.base_url == CLIENT_TEST_CONFIG["url"] assert client.auth == ("user", "pass") def test_archive_get_ko(tmp_path, datadir, requests_mock_datadir): """Reading archive can fail for some reasons """ unknown_api_url = "/1/private/unknown/deposit-id/raw/" client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG) with pytest.raises(ValueError, match="Problem when retrieving deposit"): client.archive_get(unknown_api_url, "some/path") # private api read metadata def test_metadata_get(datadir, requests_mock_datadir): """Reading archive should write data in temporary directory """ api_url = "/1/private/test/1/metadata" client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG) actual_metadata = client.metadata_get(api_url) assert isinstance(actual_metadata, str) is False expected_content = read_served_path( datadir, client.base_url, api_url, convert_fn=json.loads ) assert actual_metadata == expected_content def test_metadata_get_ko(requests_mock_datadir): """Reading metadata can fail for some reasons """ unknown_api_url = "/1/private/unknown/deposit-id/metadata/" client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG) with pytest.raises(ValueError, match="Problem when retrieving metadata"): client.metadata_get(unknown_api_url) # private api check def test_check(requests_mock_datadir): """When check ok, this should return the deposit's status """ api_url = "/1/private/test/1/check" client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG) r = client.check(api_url) assert r == "something" def test_check_fails(requests_mock_datadir): """Checking deposit can fail for some reason """ unknown_api_url = "/1/private/test/10/check" client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG) with pytest.raises(ValueError, match="Problem when checking deposit"): client.check(unknown_api_url) # private api update status class FakeRequestClientPut: """Fake Request client dedicated to put request method calls. """ args = None kwargs = None def put(self, *args, **kwargs): self.args = args self.kwargs = kwargs class PrivateApiDepositClientStatusUpdateTest(unittest.TestCase): def test_status_update(self): """Update status """ _client = FakeRequestClientPut() deposit_client = PrivateApiDepositClient( config=CLIENT_TEST_CONFIG, _client=_client ) deposit_client.status_update( "/update/status", DEPOSIT_STATUS_LOAD_SUCCESS, revision_id="some-revision-id", ) self.assertEqual(_client.args, ("https://nowhere.org/update/status",)) self.assertEqual( _client.kwargs, { "json": { "status": DEPOSIT_STATUS_LOAD_SUCCESS, "revision_id": "some-revision-id", } }, ) def test_status_update_with_no_revision_id(self): """Reading metadata can fail for some reasons """ _client = FakeRequestClientPut() deposit_client = PrivateApiDepositClient( config=CLIENT_TEST_CONFIG, _client=_client ) deposit_client.status_update("/update/status/fail", DEPOSIT_STATUS_LOAD_FAILURE) self.assertEqual(_client.args, ("https://nowhere.org/update/status/fail",)) self.assertEqual( _client.kwargs, {"json": {"status": DEPOSIT_STATUS_LOAD_FAILURE,}} )