diff --git a/swh/deposit/client.py b/swh/deposit/client.py index 9418d03a..9cd1a4fd 100644 --- a/swh/deposit/client.py +++ b/swh/deposit/client.py @@ -1,658 +1,651 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Module in charge of defining an swh-deposit client """ import hashlib import os +import logging import requests import xmltodict -import logging from abc import ABCMeta, abstractmethod from typing import Any, Dict from urllib.parse import urljoin -from swh.core.config import SWHConfig - +from swh.core.config import read_raw_config, config_basepath logger = logging.getLogger(__name__) class MaintenanceError(ValueError): """Informational maintenance error exception """ pass def _parse(stream, encoding="utf-8"): """Given a xml stream, parse the result. Args: stream (bytes/text): The stream to parse encoding (str): The encoding to use if to decode the bytes stream Returns: A dict of values corresponding to the parsed xml """ if isinstance(stream, bytes): stream = stream.decode(encoding) data = xmltodict.parse(stream, encoding=encoding, process_namespaces=False) if "entry" in data: data = data["entry"] if "sword:error" in data: data = data["sword:error"] return dict(data) def _parse_with_filter(stream, encoding="utf-8", keys=[]): """Given a xml stream, parse the result and filter with keys. Args: stream (bytes/text): The stream to parse encoding (str): The encoding to use if to decode the bytes stream keys ([str]): Keys to filter the parsed result Returns: A dict of values corresponding to the parsed xml filtered by the keys provided. """ data = _parse(stream, encoding=encoding) m = {} for key in keys: m[key] = data.get(key) return m -class BaseApiDepositClient(SWHConfig): +class BaseApiDepositClient: """Deposit client base class """ - CONFIG_BASE_FILENAME = "deposit/client" - DEFAULT_CONFIG = { - "url": ("str", "http://localhost:5006"), - "auth": ("dict", {}), # with optional 'username'/'password' keys - } - def __init__(self, config=None, _client=requests): - super().__init__() if config is None: - self.config = super().parse_config_file() + config_file = os.environ["SWH_CONFIG_FILENAME"] + self.config: Dict[str, Any] = read_raw_config(config_basepath(config_file)) else: self.config = config self._client = _client self.base_url = self.config["url"].strip("/") + "/" auth = self.config["auth"] if auth == {}: self.auth = None else: self.auth = (auth["username"], auth["password"]) def do(self, method, url, *args, **kwargs): """Internal method to deal with requests, possibly with basic http authentication. Args: method (str): supported http methods as in self._methods' keys Returns: The request's execution """ if hasattr(self._client, method): method_fn = getattr(self._client, method) else: raise ValueError("Development error, unsupported method %s" % (method)) if self.auth: kwargs["auth"] = self.auth full_url = urljoin(self.base_url, url.lstrip("/")) return method_fn(full_url, *args, **kwargs) class PrivateApiDepositClient(BaseApiDepositClient): """Private API deposit client to: - read a given deposit's archive(s) - read a given deposit's metadata - update a given deposit's status """ def archive_get(self, archive_update_url, archive): """Retrieve the archive from the deposit to a local directory. Args: archive_update_url (str): The full deposit archive(s)'s raw content to retrieve locally archive (str): the local archive's path where to store the raw content Returns: The archive path to the local archive to load. Or None if any problem arose. """ r = self.do("get", archive_update_url, stream=True) if r.ok: with open(archive, "wb") as f: for chunk in r.iter_content(): f.write(chunk) return archive msg = "Problem when retrieving deposit archive at %s" % (archive_update_url,) logger.error(msg) raise ValueError(msg) def metadata_get(self, metadata_url): """Retrieve the metadata information on a given deposit. Args: metadata_url (str): The full deposit metadata url to retrieve locally Returns: The dictionary of metadata for that deposit or None if any problem arose. """ r = self.do("get", metadata_url) if r.ok: return r.json() msg = "Problem when retrieving metadata at %s" % metadata_url logger.error(msg) raise ValueError(msg) def status_update( self, update_status_url, status, revision_id=None, directory_id=None, origin_url=None, ): """Update the deposit's status. Args: update_status_url (str): the full deposit's archive status (str): The status to update the deposit with revision_id (str/None): the revision's identifier to update to directory_id (str/None): the directory's identifier to update to origin_url (str/None): deposit's associated origin url """ payload = {"status": status} if revision_id: payload["revision_id"] = revision_id if directory_id: payload["directory_id"] = directory_id if origin_url: payload["origin_url"] = origin_url self.do("put", update_status_url, json=payload) def check(self, check_url): """Check the deposit's associated data (metadata, archive(s)) Args: check_url (str): the full deposit's check url """ r = self.do("get", check_url) if r.ok: data = r.json() return data["status"] msg = "Problem when checking deposit %s" % check_url logger.error(msg) raise ValueError(msg) class BaseDepositClient(BaseApiDepositClient, metaclass=ABCMeta): """Base Deposit client to access the public api. """ def __init__(self, config, error_msg=None, empty_result={}): super().__init__(config) self.error_msg = error_msg self.empty_result = empty_result @abstractmethod def compute_url(self, *args, **kwargs): """Compute api url endpoint to query.""" pass @abstractmethod def compute_method(self, *args, **kwargs): """Http method to use on the url""" pass @abstractmethod def parse_result_ok(self, xml_content): """Given an xml result from the api endpoint, parse it and returns a dict. """ pass def compute_information(self, *args, **kwargs): """Compute some more information given the inputs (e.g http headers, ...) """ return {} def parse_result_error(self, xml_content): """Given an error response in xml, parse it into a dict. Returns: dict with following keys: 'error': The error message 'detail': Some more detail about the error if any """ return _parse_with_filter( xml_content, keys=["summary", "detail", "sword:verboseDescription"] ) def do_execute(self, method, url, info): """Execute the http query to url using method and info information. By default, execute a simple query to url with the http method. Override this in daughter class to improve the default behavior if needed. """ return self.do(method, url) def execute(self, *args, **kwargs) -> Dict[str, Any]: """Main endpoint to prepare and execute the http query to the api. Raises: MaintenanceError if some api maintenance is happening. Returns: Dict of computed api data """ url = self.compute_url(*args, **kwargs) method = self.compute_method(*args, **kwargs) info = self.compute_information(*args, **kwargs) try: r = self.do_execute(method, url, info) except Exception as e: msg = self.error_msg % (url, e) r = self.empty_result r.update( {"error": msg,} ) return r else: if r.ok: if int(r.status_code) == 204: # 204 returns no body return {"status": r.status_code} else: return self.parse_result_ok(r.text) else: error = self.parse_result_error(r.text) empty = self.empty_result error.update(empty) if r.status_code == 503: summary = error.get("summary") detail = error.get("sword:verboseDescription") # Maintenance error if summary and detail: raise MaintenanceError(f"{summary}: {detail}") error.update( {"status": r.status_code,} ) return error class ServiceDocumentDepositClient(BaseDepositClient): """Service Document information retrieval. """ def __init__(self, config): super().__init__( config, error_msg="Service document failure at %s: %s", empty_result={"collection": None}, ) def compute_url(self, *args, **kwargs): return "/servicedocument/" def compute_method(self, *args, **kwargs): return "get" def parse_result_ok(self, xml_content): """Parse service document's success response. """ return _parse(xml_content) class StatusDepositClient(BaseDepositClient): """Status information on a deposit. """ def __init__(self, config): super().__init__( config, error_msg="Status check failure at %s: %s", empty_result={ "deposit_status": None, "deposit_status_detail": None, "deposit_swh_id": None, }, ) def compute_url(self, collection, deposit_id): return "/%s/%s/status/" % (collection, deposit_id) def compute_method(self, *args, **kwargs): return "get" def parse_result_ok(self, xml_content): """Given an xml content as string, returns a deposit dict. """ return _parse_with_filter( xml_content, keys=[ "deposit_id", "deposit_status", "deposit_status_detail", "deposit_swh_id", "deposit_swh_id_context", "deposit_external_id", ], ) class BaseCreateDepositClient(BaseDepositClient): """Deposit client base class to post new deposit. """ def __init__(self, config): super().__init__( config, error_msg="Post Deposit failure at %s: %s", empty_result={"deposit_id": None, "deposit_status": None,}, ) def compute_url(self, collection, *args, **kwargs): return "/%s/" % collection def compute_method(self, *args, **kwargs): return "post" def parse_result_ok(self, xml_content): """Given an xml content as string, returns a deposit dict. """ return _parse_with_filter( xml_content, keys=[ "deposit_id", "deposit_status", "deposit_status_detail", "deposit_date", ], ) def _compute_information( self, collection, filepath, in_progress, slug, is_archive=True ): """Given a filepath, compute necessary information on that file. Args: filepath (str): Path to a file is_archive (bool): is it an archive or not? Returns: dict with keys: 'content-type': content type associated 'md5sum': md5 sum 'filename': filename """ filename = os.path.basename(filepath) if is_archive: md5sum = hashlib.md5(open(filepath, "rb").read()).hexdigest() extension = filename.split(".")[-1] if "zip" in extension: content_type = "application/zip" else: content_type = "application/x-tar" else: content_type = None md5sum = None return { "slug": slug, "in_progress": in_progress, "content-type": content_type, "md5sum": md5sum, "filename": filename, "filepath": filepath, } def compute_information( self, collection, filepath, in_progress, slug, is_archive=True, **kwargs ): info = self._compute_information( collection, filepath, in_progress, slug, is_archive=is_archive ) info["headers"] = self.compute_headers(info) return info def do_execute(self, method, url, info): with open(info["filepath"], "rb") as f: return self.do(method, url, data=f, headers=info["headers"]) class CreateArchiveDepositClient(BaseCreateDepositClient): """Post an archive (binary) deposit client.""" def compute_headers(self, info): return { "SLUG": info["slug"], "CONTENT_MD5": info["md5sum"], "IN-PROGRESS": str(info["in_progress"]), "CONTENT-TYPE": info["content-type"], "CONTENT-DISPOSITION": "attachment; filename=%s" % (info["filename"],), } class UpdateArchiveDepositClient(CreateArchiveDepositClient): """Update (add/replace) an archive (binary) deposit client.""" def compute_url(self, collection, *args, deposit_id=None, **kwargs): return "/%s/%s/media/" % (collection, deposit_id) def compute_method(self, *args, replace=False, **kwargs): return "put" if replace else "post" class CreateMetadataDepositClient(BaseCreateDepositClient): """Post a metadata deposit client.""" def compute_headers(self, info): return { "SLUG": info["slug"], "IN-PROGRESS": str(info["in_progress"]), "CONTENT-TYPE": "application/atom+xml;type=entry", } class UpdateMetadataDepositClient(CreateMetadataDepositClient): """Update (add/replace) a metadata deposit client.""" def compute_url(self, collection, *args, deposit_id=None, **kwargs): return "/%s/%s/metadata/" % (collection, deposit_id) def compute_method(self, *args, replace=False, **kwargs): return "put" if replace else "post" class CreateMultipartDepositClient(BaseCreateDepositClient): """Create a multipart deposit client.""" def _multipart_info(self, info, info_meta): files = [ ( "file", (info["filename"], open(info["filepath"], "rb"), info["content-type"]), ), ( "atom", ( info_meta["filename"], open(info_meta["filepath"], "rb"), "application/atom+xml", ), ), ] headers = { "SLUG": info["slug"], "CONTENT_MD5": info["md5sum"], "IN-PROGRESS": str(info["in_progress"]), } return files, headers def compute_information( self, collection, archive, metadata, in_progress, slug, **kwargs ): info = self._compute_information(collection, archive, in_progress, slug) info_meta = self._compute_information( collection, metadata, in_progress, slug, is_archive=False ) files, headers = self._multipart_info(info, info_meta) return {"files": files, "headers": headers} def do_execute(self, method, url, info): return self.do(method, url, files=info["files"], headers=info["headers"]) class UpdateMultipartDepositClient(CreateMultipartDepositClient): """Update a multipart deposit client.""" def compute_url(self, collection, *args, deposit_id=None, **kwargs): return "/%s/%s/metadata/" % (collection, deposit_id) def compute_method(self, *args, replace=False, **kwargs): return "put" if replace else "post" class PublicApiDepositClient(BaseApiDepositClient): """Public api deposit client.""" def service_document(self): """Retrieve service document endpoint's information.""" return ServiceDocumentDepositClient(self.config).execute() def deposit_status(self, collection, deposit_id): """Retrieve status information on a deposit.""" return StatusDepositClient(self.config).execute(collection, deposit_id) def deposit_create( self, collection, slug, archive=None, metadata=None, in_progress=False ): """Create a new deposit (archive, metadata, both as multipart).""" if archive and not metadata: return CreateArchiveDepositClient(self.config).execute( collection, archive, in_progress, slug ) elif not archive and metadata: return CreateMetadataDepositClient(self.config).execute( collection, metadata, in_progress, slug, is_archive=False ) else: return CreateMultipartDepositClient(self.config).execute( collection, archive, metadata, in_progress, slug ) def deposit_update( self, collection, deposit_id, slug, archive=None, metadata=None, in_progress=False, replace=False, ): """Update (add/replace) existing deposit (archive, metadata, both).""" r = self.deposit_status(collection, deposit_id) if "error" in r: return r status = r["deposit_status"] if status != "partial": return { "error": "You can only act on deposit with status 'partial'", "detail": "The deposit %s has status '%s'" % (deposit_id, status), "deposit_status": status, "deposit_id": deposit_id, } if archive and not metadata: r = UpdateArchiveDepositClient(self.config).execute( collection, archive, in_progress, slug, deposit_id=deposit_id, replace=replace, ) elif not archive and metadata: r = UpdateMetadataDepositClient(self.config).execute( collection, metadata, in_progress, slug, deposit_id=deposit_id, replace=replace, ) else: r = UpdateMultipartDepositClient(self.config).execute( collection, archive, metadata, in_progress, slug, deposit_id=deposit_id, replace=replace, ) if "error" in r: return r return self.deposit_status(collection, deposit_id) diff --git a/swh/deposit/loader/checker.py b/swh/deposit/loader/checker.py index bb054529..c9d9a9c2 100644 --- a/swh/deposit/loader/checker.py +++ b/swh/deposit/loader/checker.py @@ -1,51 +1,45 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import os import logging -from typing import Mapping +from typing import Any, Dict -from swh.core.config import SWHConfig +from swh.core import config from swh.deposit.client import PrivateApiDepositClient logger = logging.getLogger(__name__) -class DepositChecker(SWHConfig): +class DepositChecker: """Deposit checker implementation. Trigger deposit's checks through the private api. """ - CONFIG_BASE_FILENAME = "deposit/checker" - - DEFAULT_CONFIG = { - "deposit": ("dict", {"url": "http://localhost:5006/1/private/", "auth": {},}) - } - - def __init__(self, config=None): - super().__init__() - if config is None: - self.config = self.parse_config_file() - else: - self.config = config + def __init__(self): + config_file = os.environ["SWH_CONFIG_FILENAME"] + self.config: Dict[str, Any] = config.read_raw_config( + config.config_basepath(config_file) + ) self.client = PrivateApiDepositClient(config=self.config["deposit"]) - def check(self, collection: str, deposit_id: str) -> Mapping[str, str]: + def check(self, collection: str, deposit_id: str) -> Dict[str, str]: status = None deposit_check_url = f"/{collection}/{deposit_id}/check/" logger.debug("deposit-check-url: %s", deposit_check_url) try: r = self.client.check(deposit_check_url) logger.debug("Check result: %s", r) status = "eventful" if r == "verified" else "failed" except Exception: logger.exception("Failure during check on '%s'", deposit_check_url) status = "failed" logger.debug("Check status: %s", status) return {"status": status} diff --git a/swh/deposit/tests/loader/conftest.py b/swh/deposit/tests/loader/conftest.py index aa1bb4b2..d4642852 100644 --- a/swh/deposit/tests/loader/conftest.py +++ b/swh/deposit/tests/loader/conftest.py @@ -1,56 +1,37 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re -import os import pytest -import yaml from functools import partial from swh.core.pytest_plugin import get_response_cb from swh.deposit.loader.checker import DepositChecker @pytest.fixture -def swh_config(tmp_path, swh_storage_postgresql, monkeypatch): - storage_config = { - "url": "https://deposit.softwareheritage.org/", - "storage": { - "cls": "local", - "args": { - "db": swh_storage_postgresql.dsn, - "objstorage": {"cls": "memory", "args": {}}, - }, - }, +def deposit_config(tmp_path): + return { + "deposit": { + "url": "https://deposit.softwareheritage.org/1/private/", + "auth": {}, + } } - conffile = os.path.join(tmp_path, "deposit.yml") - with open(conffile, "w") as f: - f.write(yaml.dump(storage_config)) - monkeypatch.setenv("SWH_CONFIG_FILENAME", conffile) - return conffile - @pytest.fixture -def deposit_checker(): - return DepositChecker( - config={ - "deposit": { - "url": "https://deposit.softwareheritage.org/1/private/", - "auth": {}, - } - } - ) +def deposit_checker(deposit_config_path): + return DepositChecker() @pytest.fixture def requests_mock_datadir(datadir, requests_mock_datadir): """Override default behavior to deal with put method """ cb = partial(get_response_cb, datadir=datadir) requests_mock_datadir.put(re.compile("https://"), body=cb) return requests_mock_datadir diff --git a/swh/deposit/tests/loader/test_checker.py b/swh/deposit/tests/loader/test_checker.py index c299b3bd..60d451ef 100644 --- a/swh/deposit/tests/loader/test_checker.py +++ b/swh/deposit/tests/loader/test_checker.py @@ -1,32 +1,32 @@ -# Copyright (C) 2017-2019 The Software Heritage developers +# Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from unittest.mock import patch -def test_check_deposit_ready(swh_config, requests_mock_datadir, deposit_checker): +def test_checker_deposit_ready(requests_mock_datadir, deposit_checker): """Check on a valid 'deposited' deposit should result in 'verified' """ actual_result = deposit_checker.check(collection="test", deposit_id=1) assert actual_result == {"status": "eventful"} -def test_check_deposit_rejected(swh_config, requests_mock_datadir, deposit_checker): +def test_checker_deposit_rejected(requests_mock_datadir, deposit_checker): """Check on invalid 'deposited' deposit should result in 'rejected' """ actual_result = deposit_checker.check(collection="test", deposit_id=2) assert actual_result == {"status": "failed"} @patch("swh.deposit.client.requests.get") -def test_check_deposit_rejected_exception(mock_requests, swh_config, deposit_checker): +def test_checker_deposit_rejected_exception(mock_requests, deposit_checker): """Check on invalid 'deposited' deposit should result in 'rejected' """ mock_requests.side_effect = ValueError("simulated problem when checking") actual_result = deposit_checker.check(collection="test", deposit_id=3) assert actual_result == {"status": "failed"} diff --git a/swh/deposit/tests/loader/test_tasks.py b/swh/deposit/tests/loader/test_tasks.py index 90e20e6a..5f85ebcd 100644 --- a/swh/deposit/tests/loader/test_tasks.py +++ b/swh/deposit/tests/loader/test_tasks.py @@ -1,75 +1,75 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest @pytest.mark.db -def test_deposit_check_eventful( - mocker, swh_config, swh_scheduler_celery_app, swh_scheduler_celery_worker +def test_task_check_eventful( + mocker, deposit_config_path, swh_scheduler_celery_app, swh_scheduler_celery_worker ): """Successful check should make the check succeed """ client = mocker.patch("swh.deposit.loader.checker.PrivateApiDepositClient.check") client.return_value = "verified" collection = "collection" deposit_id = 42 res = swh_scheduler_celery_app.send_task( "swh.deposit.loader.tasks.ChecksDepositTsk", args=[collection, deposit_id] ) assert res res.wait() assert res.successful() assert res.result == {"status": "eventful"} client.assert_called_once_with(f"/{collection}/{deposit_id}/check/") @pytest.mark.db -def test_deposit_check_failure( - mocker, swh_config, swh_scheduler_celery_app, swh_scheduler_celery_worker +def test_task_check_failure( + mocker, deposit_config_path, swh_scheduler_celery_app, swh_scheduler_celery_worker ): """Unverified check status should make the check fail """ client = mocker.patch("swh.deposit.loader.checker.PrivateApiDepositClient.check") client.return_value = "not-verified" # will make the status "failed" collection = "collec" deposit_id = 666 res = swh_scheduler_celery_app.send_task( "swh.deposit.loader.tasks.ChecksDepositTsk", args=[collection, deposit_id] ) assert res res.wait() assert res.successful() assert res.result == {"status": "failed"} client.assert_called_once_with(f"/{collection}/{deposit_id}/check/") @pytest.mark.db -def test_deposit_check_3( - mocker, swh_config, swh_scheduler_celery_app, swh_scheduler_celery_worker +def test_task_check_3( + mocker, deposit_config_path, swh_scheduler_celery_app, swh_scheduler_celery_worker ): """Unexpected failures should fail the check """ client = mocker.patch("swh.deposit.loader.checker.PrivateApiDepositClient.check") client.side_effect = ValueError("unexpected failure will make it fail") collection = "another-collection" deposit_id = 999 res = swh_scheduler_celery_app.send_task( "swh.deposit.loader.tasks.ChecksDepositTsk", args=[collection, deposit_id] ) assert res res.wait() assert res.successful() assert res.result == {"status": "failed"} client.assert_called_once_with(f"/{collection}/{deposit_id}/check/")