Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/api/server.py
# Copyright (C) 2016-2020 The Software Heritage developers | # Copyright (C) 2016-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import asyncio | import asyncio | ||||
import collections | |||||
import os | import os | ||||
from typing import Any, Dict, Optional | from typing import Any, Dict, Optional | ||||
import aiohttp.web | import aiohttp.web | ||||
from swh.core import config | from swh.core.api.asynchronous import RPCServerApp | ||||
from swh.core.api.asynchronous import RPCServerApp, decode_request | from swh.core.config import config_basepath, merge_configs, read_raw_config | ||||
from swh.core.api.asynchronous import encode_data_server as encode_data | from swh.vault import get_vault as get_swhvault | ||||
from swh.model import hashutil | |||||
from swh.vault import get_vault | |||||
from swh.vault.backend import NotFoundExc | from swh.vault.backend import NotFoundExc | ||||
from swh.vault.cookers import COOKER_TYPES | from swh.vault.interface import VaultInterface | ||||
DEFAULT_CONFIG_PATH = "vault/server" | |||||
DEFAULT_CONFIG = { | DEFAULT_CONFIG = { | ||||
"storage": ("dict", {"cls": "remote", "args": {"url": "http://localhost:5002/",},}), | "storage": {"cls": "remote", "args": {"url": "http://localhost:5002/",},}, | ||||
"cache": ( | "cache": { | ||||
"dict", | |||||
{ | |||||
"cls": "pathslicing", | "cls": "pathslicing", | ||||
"args": {"root": "/srv/softwareheritage/vault", "slicing": "0:1/1:5",}, | "args": {"root": "/srv/softwareheritage/vault", "slicing": "0:1/1:5",}, | ||||
}, | }, | ||||
), | "client_max_size": 1024 ** 3, | ||||
"client_max_size": ("int", 1024 ** 3), | "vault": {"cls": "local", "args": {"db": "dbname=softwareheritage-vault-dev",},}, | ||||
"vault": ( | "scheduler": {"cls": "remote", "url": "http://localhost:5008/",}, | ||||
"dict", | |||||
{"cls": "local", "args": {"db": "dbname=softwareheritage-vault-dev",},}, | |||||
), | |||||
"scheduler": ("dict", {"cls": "remote", "url": "http://localhost:5008/",},), | |||||
} | } | ||||
@asyncio.coroutine | vault = None | ||||
def index(request): | app = None | ||||
return aiohttp.web.Response(body="SWH Vault API server") | |||||
# Web API endpoints | |||||
@asyncio.coroutine | |||||
def vault_fetch(request): | |||||
obj_type = request.match_info["type"] | |||||
obj_id = request.match_info["id"] | |||||
if not request.app["backend"].is_available(obj_type, obj_id): | |||||
raise NotFoundExc(f"{obj_type} {obj_id} is not available.") | |||||
ardumont: @tenma, this got moved within the backend implementation because that needs to be done within… | |||||
return encode_data(request.app["backend"].fetch(obj_type, obj_id)) | |||||
def user_info(task_info): | |||||
return { | |||||
"id": task_info["id"], | |||||
"status": task_info["task_status"], | |||||
"progress_message": task_info["progress_msg"], | |||||
"obj_type": task_info["type"], | |||||
"obj_id": hashutil.hash_to_hex(task_info["object_id"]), | |||||
} | |||||
@asyncio.coroutine | |||||
def vault_cook(request): | |||||
obj_type = request.match_info["type"] | |||||
obj_id = request.match_info["id"] | |||||
email = request.query.get("email") | |||||
sticky = request.query.get("sticky") in ("true", "1") | |||||
if obj_type not in COOKER_TYPES: | |||||
raise NotFoundExc(f"{obj_type} is an unknown type.") | |||||
info = request.app["backend"].cook_request( | |||||
obj_type, obj_id, email=email, sticky=sticky | |||||
) | |||||
# TODO: return 201 status (Created) once the api supports it | |||||
return encode_data(user_info(info)) | |||||
@asyncio.coroutine | |||||
def vault_progress(request): | |||||
obj_type = request.match_info["type"] | |||||
obj_id = request.match_info["id"] | |||||
info = request.app["backend"].task_info(obj_type, obj_id) | |||||
if not info: | |||||
raise NotFoundExc(f"{obj_type} {obj_id} was not found.") | |||||
return encode_data(user_info(info)) | |||||
# Cookers endpoints | |||||
@asyncio.coroutine | |||||
def set_progress(request): | |||||
obj_type = request.match_info["type"] | |||||
obj_id = request.match_info["id"] | |||||
progress = yield from decode_request(request) | |||||
request.app["backend"].set_progress(obj_type, obj_id, progress) | |||||
return encode_data(True) # FIXME: success value? | |||||
@asyncio.coroutine | |||||
def set_status(request): | |||||
obj_type = request.match_info["type"] | |||||
obj_id = request.match_info["id"] | |||||
status = yield from decode_request(request) | |||||
request.app["backend"].set_status(obj_type, obj_id, status) | |||||
return encode_data(True) # FIXME: success value? | |||||
@asyncio.coroutine | |||||
def put_bundle(request): | |||||
obj_type = request.match_info["type"] | |||||
obj_id = request.match_info["id"] | |||||
# TODO: handle streaming properly | |||||
content = yield from decode_request(request) | |||||
request.app["backend"].cache.add(obj_type, obj_id, content) | |||||
return encode_data(True) # FIXME: success value? | |||||
@asyncio.coroutine | |||||
def send_notif(request): | |||||
obj_type = request.match_info["type"] | |||||
obj_id = request.match_info["id"] | |||||
request.app["backend"].send_all_notifications(obj_type, obj_id) | |||||
return encode_data(True) # FIXME: success value? | |||||
def get_vault(config: Optional[Dict[str, Any]] = None) -> VaultInterface: | |||||
global vault | |||||
if not vault: | |||||
assert config is not None | |||||
vault = get_swhvault(**config) | |||||
return vault | |||||
# Batch endpoints | |||||
class VaultServerApp(RPCServerApp): | |||||
@asyncio.coroutine | client_exception_classes = (NotFoundExc,) | ||||
def batch_cook(request): | |||||
batch = yield from decode_request(request) | |||||
for obj_type, obj_id in batch: | |||||
if obj_type not in COOKER_TYPES: | |||||
raise NotFoundExc(f"{obj_type} is an unknown type.") | |||||
batch_id = request.app["backend"].batch_cook(batch) | |||||
return encode_data({"id": batch_id}) | |||||
@asyncio.coroutine | @asyncio.coroutine | ||||
def batch_progress(request): | def index(request): | ||||
batch_id = request.match_info["batch_id"] | return aiohttp.web.Response(body="SWH Vault API server") | ||||
bundles = request.app["backend"].batch_info(batch_id) | |||||
if not bundles: | |||||
raise NotFoundExc(f"Batch {batch_id} does not exist.") | |||||
bundles = [user_info(bundle) for bundle in bundles] | |||||
counter = collections.Counter(b["status"] for b in bundles) | |||||
res = { | |||||
"bundles": bundles, | |||||
"total": len(bundles), | |||||
**{k: 0 for k in ("new", "pending", "done", "failed")}, | |||||
**dict(counter), | |||||
} | |||||
return encode_data(res) | |||||
# Web server | |||||
def make_app(backend, **kwargs): | |||||
app = RPCServerApp(**kwargs) | |||||
app.router.add_route("GET", "/", index) | |||||
app.client_exception_classes = (NotFoundExc,) | |||||
# Endpoints used by the web API | def check_config(cfg: Dict[str, Any]) -> Dict[str, Any]: | ||||
app.router.add_route("GET", "/fetch/{type}/{id}", vault_fetch) | """Ensure the configuration is ok to run a local vault server | ||||
ardumontAuthorUnsubmitted Done Inline Actionsi've diverged here, all methods are now post. But i noticed i have failure in docker when triggering cooking from the webapp so, let's be iso for now. ardumont: i've diverged here, all methods are now post.
That's supposedly a detail...
I'm gonna fix it, i… | |||||
ardumontAuthorUnsubmitted Done Inline Actionsardumont: D4337 | |||||
app.router.add_route("POST", "/cook/{type}/{id}", vault_cook) | |||||
app.router.add_route("GET", "/progress/{type}/{id}", vault_progress) | |||||
# Endpoints used by the Cookers | |||||
app.router.add_route("POST", "/set_progress/{type}/{id}", set_progress) | |||||
app.router.add_route("POST", "/set_status/{type}/{id}", set_status) | |||||
app.router.add_route("POST", "/put_bundle/{type}/{id}", put_bundle) | |||||
app.router.add_route("POST", "/send_notif/{type}/{id}", send_notif) | |||||
# Endpoints for batch requests | |||||
app.router.add_route("POST", "/batch_cook", batch_cook) | |||||
app.router.add_route("GET", "/batch_progress/{batch_id}", batch_progress) | |||||
app["backend"] = backend | Raises: | ||||
return app | EnvironmentError if the configuration is not for local instance | ||||
ValueError if one of the following keys is missing: vault, cache, storage, | |||||
scheduler | |||||
Returns: | |||||
Configuration dict to instantiate a local vault server instance | |||||
def check_config(cfg: Dict[str, Any]) -> Dict[str, Any]: | """ | ||||
if "vault" not in cfg: | if "vault" not in cfg: | ||||
raise ValueError("missing 'vault' configuration") | raise ValueError("missing 'vault' configuration") | ||||
vcfg = cfg["vault"] | vcfg = cfg["vault"] | ||||
if vcfg["cls"] != "local": | if vcfg["cls"] != "local": | ||||
raise EnvironmentError( | raise EnvironmentError( | ||||
"The vault backend can only be started with a 'local' configuration", | "The vault backend can only be started with a 'local' configuration", | ||||
) | ) | ||||
args = vcfg["args"] | args = vcfg["args"] | ||||
if "cache" not in args: | if "cache" not in args: | ||||
args["cache"] = cfg.get("cache") | args["cache"] = cfg.get("cache") | ||||
if "storage" not in args: | if "storage" not in args: | ||||
args["storage"] = cfg.get("storage") | args["storage"] = cfg.get("storage") | ||||
if "scheduler" not in args: | if "scheduler" not in args: | ||||
args["scheduler"] = cfg.get("scheduler") | args["scheduler"] = cfg.get("scheduler") | ||||
for key in ("cache", "storage", "scheduler"): | for key in ("cache", "storage", "scheduler"): | ||||
if not args.get(key): | if not args.get(key): | ||||
raise ValueError(f"invalid configuration: missing {key} config entry.") | raise ValueError(f"invalid configuration: missing {key} config entry.") | ||||
return args | return cfg | ||||
def make_app(config_to_check: Dict[str, Any]) -> VaultServerApp: | |||||
"""Ensure the configuration is ok, then instantiate the server application | |||||
def make_app_from_configfile(config_file: Optional[str] = None, **kwargs): | """ | ||||
if config_file is None: | config_ok = check_config(config_to_check) | ||||
config_file = DEFAULT_CONFIG_PATH | app = VaultServerApp( | ||||
config_file = os.environ.get("SWH_CONFIG_FILENAME", config_file) | __name__, | ||||
assert config_file is not None | backend_class=VaultInterface, | ||||
if os.path.isfile(config_file): | backend_factory=lambda: get_vault(config_ok["vault"]), | ||||
cfg = config.read(config_file, DEFAULT_CONFIG) | client_max_size=config_ok["client_max_size"], | ||||
else: | ) | ||||
cfg = config.load_named_config(config_file, DEFAULT_CONFIG) | app.router.add_route("GET", "/", index) | ||||
kwargs = check_config(cfg) | return app | ||||
vault = get_vault("local", **kwargs) | |||||
return make_app(backend=vault, client_max_size=cfg["client_max_size"], **kwargs) | |||||
def make_app_from_configfile( | |||||
config_path: Optional[str] = None, **kwargs | |||||
) -> VaultServerApp: | |||||
"""Load and check configuration if ok, then instantiate (once) a vault server | |||||
application. | |||||
""" | |||||
global app | |||||
if not app: | |||||
config_path = os.environ.get("SWH_CONFIG_FILENAME", config_path) | |||||
if not config_path: | |||||
raise ValueError("Missing configuration path.") | |||||
if not os.path.isfile(config_path): | |||||
raise ValueError(f"Configuration path {config_path} should exist.") | |||||
app_config = read_raw_config(config_basepath(config_path)) | |||||
app_config = merge_configs(DEFAULT_CONFIG, app_config) | |||||
app = make_app(app_config) | |||||
return app | |||||
if __name__ == "__main__": | if __name__ == "__main__": | ||||
print("Deprecated. Use swh-vault ") | print("Deprecated. Use swh-vault ") |
@tenma, this got moved within the backend implementation because that needs to be done within the backend now.
The rpc server RPCServerApp is just a decorator in charge of serializing/deserializing data at the right moment.
So nothing more happens here any longer.
To keep the functionality iso, this kind of enriched behavior centralized here (in the prior implementation) needs to go back within the backend.
That's why you see now the ifs, raise NotFoundExc and the returned values in the backend.py code now.
So the existing tests continue passing (almost unchanged).