diff --git a/swh/vault/api/server.py b/swh/vault/api/server.py index 6440fc2..10336bc 100644 --- a/swh/vault/api/server.py +++ b/swh/vault/api/server.py @@ -1,233 +1,235 @@ # Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import collections import os +from typing import Any, Dict, Optional import aiohttp.web from swh.core import config from swh.core.api.asynchronous import RPCServerApp, decode_request from swh.core.api.asynchronous import encode_data_server as encode_data from swh.model import hashutil from swh.vault import get_vault from swh.vault.backend import NotFoundExc from swh.vault.cookers import COOKER_TYPES DEFAULT_CONFIG_PATH = "vault/server" DEFAULT_CONFIG = { "storage": ("dict", {"cls": "remote", "args": {"url": "http://localhost:5002/",},}), "cache": ( "dict", { "cls": "pathslicing", "args": {"root": "/srv/softwareheritage/vault", "slicing": "0:1/1:5",}, }, ), "client_max_size": ("int", 1024 ** 3), "vault": ( "dict", {"cls": "local", "args": {"db": "dbname=softwareheritage-vault-dev",},}, ), "scheduler": ("dict", {"cls": "remote", "url": "http://localhost:5008/",},), } @asyncio.coroutine def index(request): return aiohttp.web.Response(body="SWH Vault API server") # Web API endpoints @asyncio.coroutine def vault_fetch(request): obj_type = request.match_info["type"] obj_id = request.match_info["id"] if not request.app["backend"].is_available(obj_type, obj_id): raise NotFoundExc(f"{obj_type} {obj_id} is not available.") return encode_data(request.app["backend"].fetch(obj_type, obj_id)) def user_info(task_info): return { "id": task_info["id"], "status": task_info["task_status"], "progress_message": task_info["progress_msg"], "obj_type": task_info["type"], "obj_id": hashutil.hash_to_hex(task_info["object_id"]), } @asyncio.coroutine def vault_cook(request): obj_type = request.match_info["type"] obj_id = request.match_info["id"] email = request.query.get("email") sticky = request.query.get("sticky") in ("true", "1") if obj_type not in COOKER_TYPES: raise NotFoundExc(f"{obj_type} is an unknown type.") info = request.app["backend"].cook_request( obj_type, obj_id, email=email, sticky=sticky ) # TODO: return 201 status (Created) once the api supports it return encode_data(user_info(info)) @asyncio.coroutine def vault_progress(request): obj_type = request.match_info["type"] obj_id = request.match_info["id"] info = request.app["backend"].task_info(obj_type, obj_id) if not info: raise NotFoundExc(f"{obj_type} {obj_id} was not found.") return encode_data(user_info(info)) # Cookers endpoints @asyncio.coroutine def set_progress(request): obj_type = request.match_info["type"] obj_id = request.match_info["id"] progress = yield from decode_request(request) request.app["backend"].set_progress(obj_type, obj_id, progress) return encode_data(True) # FIXME: success value? @asyncio.coroutine def set_status(request): obj_type = request.match_info["type"] obj_id = request.match_info["id"] status = yield from decode_request(request) request.app["backend"].set_status(obj_type, obj_id, status) return encode_data(True) # FIXME: success value? @asyncio.coroutine def put_bundle(request): obj_type = request.match_info["type"] obj_id = request.match_info["id"] # TODO: handle streaming properly content = yield from decode_request(request) request.app["backend"].cache.add(obj_type, obj_id, content) return encode_data(True) # FIXME: success value? @asyncio.coroutine def send_notif(request): obj_type = request.match_info["type"] obj_id = request.match_info["id"] request.app["backend"].send_all_notifications(obj_type, obj_id) return encode_data(True) # FIXME: success value? # Batch endpoints @asyncio.coroutine def batch_cook(request): batch = yield from decode_request(request) for obj_type, obj_id in batch: if obj_type not in COOKER_TYPES: raise NotFoundExc(f"{obj_type} is an unknown type.") batch_id = request.app["backend"].batch_cook(batch) return encode_data({"id": batch_id}) @asyncio.coroutine def batch_progress(request): batch_id = request.match_info["batch_id"] bundles = request.app["backend"].batch_info(batch_id) if not bundles: raise NotFoundExc(f"Batch {batch_id} does not exist.") bundles = [user_info(bundle) for bundle in bundles] counter = collections.Counter(b["status"] for b in bundles) res = { "bundles": bundles, "total": len(bundles), **{k: 0 for k in ("new", "pending", "done", "failed")}, **dict(counter), } return encode_data(res) # Web server def make_app(backend, **kwargs): app = RPCServerApp(**kwargs) app.router.add_route("GET", "/", index) app.client_exception_classes = (NotFoundExc,) # Endpoints used by the web API app.router.add_route("GET", "/fetch/{type}/{id}", vault_fetch) app.router.add_route("POST", "/cook/{type}/{id}", vault_cook) app.router.add_route("GET", "/progress/{type}/{id}", vault_progress) # Endpoints used by the Cookers app.router.add_route("POST", "/set_progress/{type}/{id}", set_progress) app.router.add_route("POST", "/set_status/{type}/{id}", set_status) app.router.add_route("POST", "/put_bundle/{type}/{id}", put_bundle) app.router.add_route("POST", "/send_notif/{type}/{id}", send_notif) # Endpoints for batch requests app.router.add_route("POST", "/batch_cook", batch_cook) app.router.add_route("GET", "/batch_progress/{batch_id}", batch_progress) app["backend"] = backend return app -def get_local_backend(cfg): +def check_config(cfg: Dict[str, Any]) -> Dict[str, Any]: if "vault" not in cfg: - raise ValueError("missing '%vault' configuration") + raise ValueError("missing 'vault' configuration") vcfg = cfg["vault"] if vcfg["cls"] != "local": raise EnvironmentError( - "The vault backend can only be started with a 'local' " "configuration", - err=True, + "The vault backend can only be started with a 'local' configuration", ) args = vcfg["args"] if "cache" not in args: args["cache"] = cfg.get("cache") if "storage" not in args: args["storage"] = cfg.get("storage") if "scheduler" not in args: args["scheduler"] = cfg.get("scheduler") for key in ("cache", "storage", "scheduler"): if not args.get(key): - raise ValueError("invalid configuration; missing %s config entry." % key) + raise ValueError(f"invalid configuration: missing {key} config entry.") - return get_vault("local", **args) + return args -def make_app_from_configfile(config_file=None, **kwargs): +def make_app_from_configfile(config_file: Optional[str] = None, **kwargs): if config_file is None: config_file = DEFAULT_CONFIG_PATH config_file = os.environ.get("SWH_CONFIG_FILENAME", config_file) + assert config_file is not None if os.path.isfile(config_file): cfg = config.read(config_file, DEFAULT_CONFIG) else: cfg = config.load_named_config(config_file, DEFAULT_CONFIG) - vault = get_local_backend(cfg) + kwargs = check_config(cfg) + vault = get_vault("local", **kwargs) return make_app(backend=vault, client_max_size=cfg["client_max_size"], **kwargs) if __name__ == "__main__": print("Deprecated. Use swh-vault ") diff --git a/swh/vault/tests/test_server.py b/swh/vault/tests/test_server.py index 5bbbf14..8aba030 100644 --- a/swh/vault/tests/test_server.py +++ b/swh/vault/tests/test_server.py @@ -1,55 +1,91 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import copy + import pytest from swh.core.api.serializers import msgpack_dumps, msgpack_loads -from swh.vault.api.server import make_app +from swh.vault.api.server import check_config, make_app @pytest.fixture def client(swh_vault, loop, aiohttp_client): app = make_app(backend=swh_vault) return loop.run_until_complete(aiohttp_client(app)) async def test_index(client): resp = await client.get("/") assert resp.status == 200 async def test_cook_notfound(client): resp = await client.post("/cook/directory/000000") assert resp.status == 400 content = msgpack_loads(await resp.content.read()) assert content["exception"]["type"] == "NotFoundExc" assert content["exception"]["args"] == ["Object 000000 was not found."] async def test_progress_notfound(client): resp = await client.get("/progress/directory/000000") assert resp.status == 400 content = msgpack_loads(await resp.content.read()) assert content["exception"]["type"] == "NotFoundExc" assert content["exception"]["args"] == ["directory 000000 was not found."] async def test_batch_cook_invalid_type(client): data = msgpack_dumps([("foobar", [])]) resp = await client.post( "/batch_cook", data=data, headers={"Content-Type": "application/x-msgpack"} ) assert resp.status == 400 content = msgpack_loads(await resp.content.read()) assert content["exception"]["type"] == "NotFoundExc" assert content["exception"]["args"] == ["foobar is an unknown type."] async def test_batch_progress_notfound(client): resp = await client.get("/batch_progress/1") assert resp.status == 400 content = msgpack_loads(await resp.content.read()) assert content["exception"]["type"] == "NotFoundExc" assert content["exception"]["args"] == ["Batch 1 does not exist."] + + +def test_check_config_missing_vault_configuration() -> None: + """Irrelevant configuration file path raises""" + with pytest.raises(ValueError, match="missing 'vault' configuration"): + check_config({}) + + +def test_check_config_not_local() -> None: + """Wrong configuration raises""" + expected_error = ( + "The vault backend can only be started with a 'local' configuration" + ) + with pytest.raises(EnvironmentError, match=expected_error): + check_config({"vault": {"cls": "remote"}}) + + +@pytest.mark.parametrize("missing_key", ["storage", "cache", "scheduler"]) +def test_check_config_missing_key(missing_key, swh_vault_config) -> None: + """Any other configuration than 'local' (the default) is rejected""" + config_ok = {"vault": {"cls": "local", "args": swh_vault_config}} + config_ko = copy.deepcopy(config_ok) + config_ko["vault"]["args"].pop(missing_key, None) + + expected_error = f"invalid configuration: missing {missing_key} config entry" + with pytest.raises(ValueError, match=expected_error): + check_config(config_ko) + + +@pytest.mark.parametrize("missing_key", ["storage", "cache", "scheduler"]) +def test_check_config_ok(missing_key, swh_vault_config) -> None: + """Any other configuration than 'local' (the default) is rejected""" + config_ok = {"vault": {"cls": "local", "args": swh_vault_config}} + assert check_config(config_ok) is not None