diff --git a/requirements-swh.txt b/requirements-swh.txt index f1b77b7..d9966ae 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,5 +1,5 @@ swh.core[db,http] >= 0.3 swh.model >= 0.3 swh.objstorage >= 0.0.17 -swh.scheduler >= 0.0.39 +swh.scheduler >= 0.7.0 swh.storage >= 0.0.106 diff --git a/swh/vault/api/server.py b/swh/vault/api/server.py index 8837fa2..6c178e0 100644 --- a/swh/vault/api/server.py +++ b/swh/vault/api/server.py @@ -1,236 +1,233 @@ # Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import collections import os import aiohttp.web from swh.core import config from swh.core.api.asynchronous import RPCServerApp, decode_request from swh.core.api.asynchronous import encode_data_server as encode_data from swh.model import hashutil from swh.vault import get_vault from swh.vault.backend import NotFoundExc from swh.vault.cookers import COOKER_TYPES DEFAULT_CONFIG_PATH = "vault/server" DEFAULT_CONFIG = { "storage": ("dict", {"cls": "remote", "args": {"url": "http://localhost:5002/",},}), "cache": ( "dict", { "cls": "pathslicing", "args": {"root": "/srv/softwareheritage/vault", "slicing": "0:1/1:5",}, }, ), "client_max_size": ("int", 1024 ** 3), "vault": ( "dict", {"cls": "local", "args": {"db": "dbname=softwareheritage-vault-dev",},}, ), - "scheduler": ( - "dict", - {"cls": "remote", "args": {"url": "http://localhost:5008/",}}, - ), + "scheduler": ("dict", {"cls": "remote", "url": "http://localhost:5008/",},), } @asyncio.coroutine def index(request): return aiohttp.web.Response(body="SWH Vault API server") # Web API endpoints @asyncio.coroutine def vault_fetch(request): obj_type = request.match_info["type"] obj_id = request.match_info["id"] if not request.app["backend"].is_available(obj_type, obj_id): raise NotFoundExc(f"{obj_type} {obj_id} is not available.") return encode_data(request.app["backend"].fetch(obj_type, obj_id)) def user_info(task_info): return { "id": task_info["id"], "status": task_info["task_status"], "progress_message": task_info["progress_msg"], "obj_type": task_info["type"], "obj_id": hashutil.hash_to_hex(task_info["object_id"]), } @asyncio.coroutine def vault_cook(request): obj_type = request.match_info["type"] obj_id = request.match_info["id"] email = request.query.get("email") sticky = request.query.get("sticky") in ("true", "1") if obj_type not in COOKER_TYPES: raise NotFoundExc(f"{obj_type} is an unknown type.") info = request.app["backend"].cook_request( obj_type, obj_id, email=email, sticky=sticky ) # TODO: return 201 status (Created) once the api supports it return encode_data(user_info(info)) @asyncio.coroutine def vault_progress(request): obj_type = request.match_info["type"] obj_id = request.match_info["id"] info = request.app["backend"].task_info(obj_type, obj_id) if not info: raise NotFoundExc(f"{obj_type} {obj_id} was not found.") return encode_data(user_info(info)) # Cookers endpoints @asyncio.coroutine def set_progress(request): obj_type = request.match_info["type"] obj_id = request.match_info["id"] progress = yield from decode_request(request) request.app["backend"].set_progress(obj_type, obj_id, progress) return encode_data(True) # FIXME: success value? @asyncio.coroutine def set_status(request): obj_type = request.match_info["type"] obj_id = request.match_info["id"] status = yield from decode_request(request) request.app["backend"].set_status(obj_type, obj_id, status) return encode_data(True) # FIXME: success value? @asyncio.coroutine def put_bundle(request): obj_type = request.match_info["type"] obj_id = request.match_info["id"] # TODO: handle streaming properly content = yield from decode_request(request) request.app["backend"].cache.add(obj_type, obj_id, content) return encode_data(True) # FIXME: success value? @asyncio.coroutine def send_notif(request): obj_type = request.match_info["type"] obj_id = request.match_info["id"] request.app["backend"].send_all_notifications(obj_type, obj_id) return encode_data(True) # FIXME: success value? # Batch endpoints @asyncio.coroutine def batch_cook(request): batch = yield from decode_request(request) for obj_type, obj_id in batch: if obj_type not in COOKER_TYPES: raise NotFoundExc(f"{obj_type} is an unknown type.") batch_id = request.app["backend"].batch_cook(batch) return encode_data({"id": batch_id}) @asyncio.coroutine def batch_progress(request): batch_id = request.match_info["batch_id"] bundles = request.app["backend"].batch_info(batch_id) if not bundles: raise NotFoundExc(f"Batch {batch_id} does not exist.") bundles = [user_info(bundle) for bundle in bundles] counter = collections.Counter(b["status"] for b in bundles) res = { "bundles": bundles, "total": len(bundles), **{k: 0 for k in ("new", "pending", "done", "failed")}, **dict(counter), } return encode_data(res) # Web server def make_app(backend, **kwargs): app = RPCServerApp(**kwargs) app.router.add_route("GET", "/", index) app.client_exception_classes = (NotFoundExc,) # Endpoints used by the web API app.router.add_route("GET", "/fetch/{type}/{id}", vault_fetch) app.router.add_route("POST", "/cook/{type}/{id}", vault_cook) app.router.add_route("GET", "/progress/{type}/{id}", vault_progress) # Endpoints used by the Cookers app.router.add_route("POST", "/set_progress/{type}/{id}", set_progress) app.router.add_route("POST", "/set_status/{type}/{id}", set_status) app.router.add_route("POST", "/put_bundle/{type}/{id}", put_bundle) app.router.add_route("POST", "/send_notif/{type}/{id}", send_notif) # Endpoints for batch requests app.router.add_route("POST", "/batch_cook", batch_cook) app.router.add_route("GET", "/batch_progress/{batch_id}", batch_progress) app["backend"] = backend return app def get_local_backend(cfg): if "vault" not in cfg: raise ValueError("missing '%vault' configuration") vcfg = cfg["vault"] if vcfg["cls"] != "local": raise EnvironmentError( "The vault backend can only be started with a 'local' " "configuration", err=True, ) args = vcfg["args"] if "cache" not in args: args["cache"] = cfg.get("cache") if "storage" not in args: args["storage"] = cfg.get("storage") if "scheduler" not in args: args["scheduler"] = cfg.get("scheduler") for key in ("cache", "storage", "scheduler"): if not args.get(key): raise ValueError("invalid configuration; missing %s config entry." % key) return get_vault("local", args) def make_app_from_configfile(config_file=None, **kwargs): if config_file is None: config_file = DEFAULT_CONFIG_PATH config_file = os.environ.get("SWH_CONFIG_FILENAME", config_file) if os.path.isfile(config_file): cfg = config.read(config_file, DEFAULT_CONFIG) else: cfg = config.load_named_config(config_file, DEFAULT_CONFIG) vault = get_local_backend(cfg) return make_app(backend=vault, client_max_size=cfg["client_max_size"], **kwargs) if __name__ == "__main__": print("Deprecated. Use swh-vault ") diff --git a/swh/vault/tests/conftest.py b/swh/vault/tests/conftest.py index bbdb652..9090e46 100644 --- a/swh/vault/tests/conftest.py +++ b/swh/vault/tests/conftest.py @@ -1,88 +1,88 @@ import glob import os import subprocess import pkg_resources.extern.packaging.version import pytest from pytest_postgresql import factories from swh.core.utils import numfile_sortkey as sortkey from swh.storage.tests import SQL_DIR as STORAGE_SQL_DIR from swh.vault import get_vault from swh.vault.tests import SQL_DIR os.environ["LC_ALL"] = "C.UTF-8" pytest_v = pkg_resources.get_distribution("pytest").parsed_version if pytest_v < pkg_resources.extern.packaging.version.parse("3.9"): @pytest.fixture def tmp_path(request): import pathlib import tempfile with tempfile.TemporaryDirectory() as tmpdir: yield pathlib.Path(tmpdir) def db_url(name, postgresql_proc): return "postgresql://{user}@{host}:{port}/{dbname}".format( host=postgresql_proc.host, port=postgresql_proc.port, user="postgres", dbname=name, ) postgresql2 = factories.postgresql("postgresql_proc", "tests2") @pytest.fixture def swh_vault(request, postgresql_proc, postgresql, postgresql2, tmp_path): for sql_dir, pg in ((SQL_DIR, postgresql), (STORAGE_SQL_DIR, postgresql2)): dump_files = os.path.join(sql_dir, "*.sql") all_dump_files = sorted(glob.glob(dump_files), key=sortkey) for fname in all_dump_files: subprocess.check_call( [ "psql", "--quiet", "--no-psqlrc", "-v", "ON_ERROR_STOP=1", "-d", pg.dsn, "-f", fname, ] ) vault_config = { "db": db_url("tests", postgresql_proc), "storage": { "cls": "local", "db": db_url("tests2", postgresql_proc), "objstorage": { "cls": "pathslicing", "args": {"root": str(tmp_path), "slicing": "0:1/1:5",}, }, }, "cache": { "cls": "pathslicing", "args": { "root": str(tmp_path), "slicing": "0:1/1:5", "allow_delete": True, }, }, - "scheduler": {"cls": "remote", "args": {"url": "http://swh-scheduler:5008",},}, + "scheduler": {"cls": "remote", "url": "http://swh-scheduler:5008",}, } return get_vault("local", vault_config) @pytest.fixture def swh_storage(swh_vault): return swh_vault.storage