diff --git a/requirements-swh.txt b/requirements-swh.txt index 9a0ef3e..49cb84d 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,5 +1,5 @@ -swh.core[db,http] >= 0.0.61 +swh.core[db,http] >= 0.0.65 swh.model >= 0.0.27 swh.objstorage >= 0.0.17 swh.scheduler >= 0.0.39 swh.storage >= 0.0.106 diff --git a/swh/vault/api/client.py b/swh/vault/api/client.py index 7c9f769..2d26fd2 100644 --- a/swh/vault/api/client.py +++ b/swh/vault/api/client.py @@ -1,58 +1,58 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.model import hashutil -from swh.core.api import SWHRemoteAPI +from swh.core.api import RPCClient -class RemoteVaultClient(SWHRemoteAPI): +class RemoteVaultClient(RPCClient): """Client to the Software Heritage vault cache.""" # Web API endpoints def fetch(self, obj_type, obj_id): hex_id = hashutil.hash_to_hex(obj_id) return self.get('fetch/{}/{}'.format(obj_type, hex_id)) def cook(self, obj_type, obj_id, email=None): hex_id = hashutil.hash_to_hex(obj_id) return self.post('cook/{}/{}'.format(obj_type, hex_id), data={}, params=({'email': email} if email else None)) def progress(self, obj_type, obj_id): hex_id = hashutil.hash_to_hex(obj_id) return self.get('progress/{}/{}'.format(obj_type, hex_id)) # Cookers endpoints def set_progress(self, obj_type, obj_id, progress): hex_id = hashutil.hash_to_hex(obj_id) return self.post('set_progress/{}/{}'.format(obj_type, hex_id), data=progress) def set_status(self, obj_type, obj_id, status): hex_id = hashutil.hash_to_hex(obj_id) return self.post('set_status/{}/{}' .format(obj_type, hex_id), data=status) # TODO: handle streaming properly def put_bundle(self, obj_type, obj_id, bundle): hex_id = hashutil.hash_to_hex(obj_id) return self.post('put_bundle/{}/{}' .format(obj_type, hex_id), data=bundle) def send_notif(self, obj_type, obj_id): hex_id = hashutil.hash_to_hex(obj_id) return self.post('send_notif/{}/{}' .format(obj_type, hex_id), data=None) # Batch endpoints def batch_cook(self, batch): return self.post('batch_cook', data=batch) def batch_progress(self, batch_id): return self.get('batch_progress/{}'.format(batch_id)) diff --git a/swh/vault/api/server.py b/swh/vault/api/server.py index c0b46b1..5d242eb 100644 --- a/swh/vault/api/server.py +++ b/swh/vault/api/server.py @@ -1,241 +1,241 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import aiohttp.web import asyncio import collections from swh.core import config from swh.core.api.asynchronous import ( - SWHRemoteAPI, + RPCServerApp, encode_data_server as encode_data, decode_request, ) from swh.model import hashutil from swh.vault import get_vault from swh.vault.cookers import COOKER_TYPES from swh.vault.backend import NotFoundExc DEFAULT_CONFIG_PATH = 'vault/server' DEFAULT_CONFIG = { 'storage': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5002/', }, }), 'cache': ('dict', { 'cls': 'pathslicing', 'args': { 'root': '/srv/softwareheritage/vault', 'slicing': '0:1/1:5', }, }), 'client_max_size': ('int', 1024 ** 3), 'vault': ('dict', { 'cls': 'local', 'args': { 'db': 'dbname=softwareheritage-vault-dev', }, }), 'scheduler': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5008/', } }), } @asyncio.coroutine def index(request): return aiohttp.web.Response(body="SWH Vault API server") # Web API endpoints @asyncio.coroutine def vault_fetch(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] if not request.app['backend'].is_available(obj_type, obj_id): raise aiohttp.web.HTTPNotFound return encode_data(request.app['backend'].fetch(obj_type, obj_id)) def user_info(task_info): return {'id': task_info['id'], 'status': task_info['task_status'], 'progress_message': task_info['progress_msg'], 'obj_type': task_info['type'], 'obj_id': hashutil.hash_to_hex(task_info['object_id'])} @asyncio.coroutine def vault_cook(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] email = request.query.get('email') sticky = request.query.get('sticky') in ('true', '1') if obj_type not in COOKER_TYPES: raise aiohttp.web.HTTPNotFound try: info = request.app['backend'].cook_request(obj_type, obj_id, email=email, sticky=sticky) except NotFoundExc: raise aiohttp.web.HTTPNotFound # TODO: return 201 status (Created) once the api supports it return encode_data(user_info(info)) @asyncio.coroutine def vault_progress(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] info = request.app['backend'].task_info(obj_type, obj_id) if not info: raise aiohttp.web.HTTPNotFound return encode_data(user_info(info)) # Cookers endpoints @asyncio.coroutine def set_progress(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] progress = yield from decode_request(request) request.app['backend'].set_progress(obj_type, obj_id, progress) return encode_data(True) # FIXME: success value? @asyncio.coroutine def set_status(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] status = yield from decode_request(request) request.app['backend'].set_status(obj_type, obj_id, status) return encode_data(True) # FIXME: success value? @asyncio.coroutine def put_bundle(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] # TODO: handle streaming properly content = yield from decode_request(request) request.app['backend'].cache.add(obj_type, obj_id, content) return encode_data(True) # FIXME: success value? @asyncio.coroutine def send_notif(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] request.app['backend'].send_all_notifications(obj_type, obj_id) return encode_data(True) # FIXME: success value? # Batch endpoints @asyncio.coroutine def batch_cook(request): batch = yield from decode_request(request) for obj_type, obj_id in batch: if obj_type not in COOKER_TYPES: raise aiohttp.web.HTTPNotFound batch_id = request.app['backend'].batch_cook(batch) return encode_data({'id': batch_id}) @asyncio.coroutine def batch_progress(request): batch_id = request.match_info['batch_id'] bundles = request.app['backend'].batch_info(batch_id) if not bundles: raise aiohttp.web.HTTPNotFound bundles = [user_info(bundle) for bundle in bundles] counter = collections.Counter(b['status'] for b in bundles) res = {'bundles': bundles, 'total': len(bundles), **{k: 0 for k in ('new', 'pending', 'done', 'failed')}, **dict(counter)} return encode_data(res) # Web server def make_app(backend, **kwargs): - app = SWHRemoteAPI(**kwargs) + app = RPCServerApp(**kwargs) app.router.add_route('GET', '/', index) # Endpoints used by the web API app.router.add_route('GET', '/fetch/{type}/{id}', vault_fetch) app.router.add_route('POST', '/cook/{type}/{id}', vault_cook) app.router.add_route('GET', '/progress/{type}/{id}', vault_progress) # Endpoints used by the Cookers app.router.add_route('POST', '/set_progress/{type}/{id}', set_progress) app.router.add_route('POST', '/set_status/{type}/{id}', set_status) app.router.add_route('POST', '/put_bundle/{type}/{id}', put_bundle) app.router.add_route('POST', '/send_notif/{type}/{id}', send_notif) # Endpoints for batch requests app.router.add_route('POST', '/batch_cook', batch_cook) app.router.add_route('GET', '/batch_progress/{batch_id}', batch_progress) app['backend'] = backend return app def get_local_backend(cfg): if 'vault' not in cfg: raise ValueError("missing '%vault' configuration") vcfg = cfg['vault'] if vcfg['cls'] != 'local': raise EnvironmentError( "The vault backend can only be started with a 'local' " "configuration", err=True) args = vcfg['args'] if 'cache' not in args: args['cache'] = cfg.get('cache') if 'storage' not in args: args['storage'] = cfg.get('storage') if 'scheduler' not in args: args['scheduler'] = cfg.get('scheduler') for key in ('cache', 'storage', 'scheduler'): if not args.get(key): raise ValueError( "invalid configuration; missing %s config entry." % key) return get_vault('local', args) def make_app_from_configfile(config_file=None, **kwargs): if config_file is None: config_file = DEFAULT_CONFIG_PATH config_file = os.environ.get('SWH_CONFIG_FILENAME', config_file) if os.path.isfile(config_file): cfg = config.read(config_file, DEFAULT_CONFIG) else: cfg = config.load_named_config(config_file, DEFAULT_CONFIG) vault = get_local_backend(cfg) return make_app(backend=vault, client_max_size=cfg['client_max_size'], **kwargs) if __name__ == '__main__': print('Deprecated. Use swh-vault ')