Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/requirements-swh.txt b/requirements-swh.txt
index 9a0ef3e..49cb84d 100644
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,5 +1,5 @@
-swh.core[db,http] >= 0.0.61
+swh.core[db,http] >= 0.0.65
swh.model >= 0.0.27
swh.objstorage >= 0.0.17
swh.scheduler >= 0.0.39
swh.storage >= 0.0.106
diff --git a/swh/vault/api/client.py b/swh/vault/api/client.py
index 7c9f769..2d26fd2 100644
--- a/swh/vault/api/client.py
+++ b/swh/vault/api/client.py
@@ -1,58 +1,58 @@
# Copyright (C) 2016-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from swh.model import hashutil
-from swh.core.api import SWHRemoteAPI
+from swh.core.api import RPCClient
-class RemoteVaultClient(SWHRemoteAPI):
+class RemoteVaultClient(RPCClient):
"""Client to the Software Heritage vault cache."""
# Web API endpoints
def fetch(self, obj_type, obj_id):
hex_id = hashutil.hash_to_hex(obj_id)
return self.get('fetch/{}/{}'.format(obj_type, hex_id))
def cook(self, obj_type, obj_id, email=None):
hex_id = hashutil.hash_to_hex(obj_id)
return self.post('cook/{}/{}'.format(obj_type, hex_id),
data={},
params=({'email': email} if email else None))
def progress(self, obj_type, obj_id):
hex_id = hashutil.hash_to_hex(obj_id)
return self.get('progress/{}/{}'.format(obj_type, hex_id))
# Cookers endpoints
def set_progress(self, obj_type, obj_id, progress):
hex_id = hashutil.hash_to_hex(obj_id)
return self.post('set_progress/{}/{}'.format(obj_type, hex_id),
data=progress)
def set_status(self, obj_type, obj_id, status):
hex_id = hashutil.hash_to_hex(obj_id)
return self.post('set_status/{}/{}' .format(obj_type, hex_id),
data=status)
# TODO: handle streaming properly
def put_bundle(self, obj_type, obj_id, bundle):
hex_id = hashutil.hash_to_hex(obj_id)
return self.post('put_bundle/{}/{}' .format(obj_type, hex_id),
data=bundle)
def send_notif(self, obj_type, obj_id):
hex_id = hashutil.hash_to_hex(obj_id)
return self.post('send_notif/{}/{}' .format(obj_type, hex_id),
data=None)
# Batch endpoints
def batch_cook(self, batch):
return self.post('batch_cook', data=batch)
def batch_progress(self, batch_id):
return self.get('batch_progress/{}'.format(batch_id))
diff --git a/swh/vault/api/server.py b/swh/vault/api/server.py
index c0b46b1..5d242eb 100644
--- a/swh/vault/api/server.py
+++ b/swh/vault/api/server.py
@@ -1,241 +1,241 @@
# Copyright (C) 2016-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import aiohttp.web
import asyncio
import collections
from swh.core import config
from swh.core.api.asynchronous import (
- SWHRemoteAPI,
+ RPCServerApp,
encode_data_server as encode_data,
decode_request,
)
from swh.model import hashutil
from swh.vault import get_vault
from swh.vault.cookers import COOKER_TYPES
from swh.vault.backend import NotFoundExc
DEFAULT_CONFIG_PATH = 'vault/server'
DEFAULT_CONFIG = {
'storage': ('dict', {
'cls': 'remote',
'args': {
'url': 'http://localhost:5002/',
},
}),
'cache': ('dict', {
'cls': 'pathslicing',
'args': {
'root': '/srv/softwareheritage/vault',
'slicing': '0:1/1:5',
},
}),
'client_max_size': ('int', 1024 ** 3),
'vault': ('dict', {
'cls': 'local',
'args': {
'db': 'dbname=softwareheritage-vault-dev',
},
}),
'scheduler': ('dict', {
'cls': 'remote',
'args': {
'url': 'http://localhost:5008/',
}
}),
}
@asyncio.coroutine
def index(request):
return aiohttp.web.Response(body="SWH Vault API server")
# Web API endpoints
@asyncio.coroutine
def vault_fetch(request):
obj_type = request.match_info['type']
obj_id = request.match_info['id']
if not request.app['backend'].is_available(obj_type, obj_id):
raise aiohttp.web.HTTPNotFound
return encode_data(request.app['backend'].fetch(obj_type, obj_id))
def user_info(task_info):
return {'id': task_info['id'],
'status': task_info['task_status'],
'progress_message': task_info['progress_msg'],
'obj_type': task_info['type'],
'obj_id': hashutil.hash_to_hex(task_info['object_id'])}
@asyncio.coroutine
def vault_cook(request):
obj_type = request.match_info['type']
obj_id = request.match_info['id']
email = request.query.get('email')
sticky = request.query.get('sticky') in ('true', '1')
if obj_type not in COOKER_TYPES:
raise aiohttp.web.HTTPNotFound
try:
info = request.app['backend'].cook_request(obj_type, obj_id,
email=email, sticky=sticky)
except NotFoundExc:
raise aiohttp.web.HTTPNotFound
# TODO: return 201 status (Created) once the api supports it
return encode_data(user_info(info))
@asyncio.coroutine
def vault_progress(request):
obj_type = request.match_info['type']
obj_id = request.match_info['id']
info = request.app['backend'].task_info(obj_type, obj_id)
if not info:
raise aiohttp.web.HTTPNotFound
return encode_data(user_info(info))
# Cookers endpoints
@asyncio.coroutine
def set_progress(request):
obj_type = request.match_info['type']
obj_id = request.match_info['id']
progress = yield from decode_request(request)
request.app['backend'].set_progress(obj_type, obj_id, progress)
return encode_data(True) # FIXME: success value?
@asyncio.coroutine
def set_status(request):
obj_type = request.match_info['type']
obj_id = request.match_info['id']
status = yield from decode_request(request)
request.app['backend'].set_status(obj_type, obj_id, status)
return encode_data(True) # FIXME: success value?
@asyncio.coroutine
def put_bundle(request):
obj_type = request.match_info['type']
obj_id = request.match_info['id']
# TODO: handle streaming properly
content = yield from decode_request(request)
request.app['backend'].cache.add(obj_type, obj_id, content)
return encode_data(True) # FIXME: success value?
@asyncio.coroutine
def send_notif(request):
obj_type = request.match_info['type']
obj_id = request.match_info['id']
request.app['backend'].send_all_notifications(obj_type, obj_id)
return encode_data(True) # FIXME: success value?
# Batch endpoints
@asyncio.coroutine
def batch_cook(request):
batch = yield from decode_request(request)
for obj_type, obj_id in batch:
if obj_type not in COOKER_TYPES:
raise aiohttp.web.HTTPNotFound
batch_id = request.app['backend'].batch_cook(batch)
return encode_data({'id': batch_id})
@asyncio.coroutine
def batch_progress(request):
batch_id = request.match_info['batch_id']
bundles = request.app['backend'].batch_info(batch_id)
if not bundles:
raise aiohttp.web.HTTPNotFound
bundles = [user_info(bundle) for bundle in bundles]
counter = collections.Counter(b['status'] for b in bundles)
res = {'bundles': bundles, 'total': len(bundles),
**{k: 0 for k in ('new', 'pending', 'done', 'failed')},
**dict(counter)}
return encode_data(res)
# Web server
def make_app(backend, **kwargs):
- app = SWHRemoteAPI(**kwargs)
+ app = RPCServerApp(**kwargs)
app.router.add_route('GET', '/', index)
# Endpoints used by the web API
app.router.add_route('GET', '/fetch/{type}/{id}', vault_fetch)
app.router.add_route('POST', '/cook/{type}/{id}', vault_cook)
app.router.add_route('GET', '/progress/{type}/{id}', vault_progress)
# Endpoints used by the Cookers
app.router.add_route('POST', '/set_progress/{type}/{id}', set_progress)
app.router.add_route('POST', '/set_status/{type}/{id}', set_status)
app.router.add_route('POST', '/put_bundle/{type}/{id}', put_bundle)
app.router.add_route('POST', '/send_notif/{type}/{id}', send_notif)
# Endpoints for batch requests
app.router.add_route('POST', '/batch_cook', batch_cook)
app.router.add_route('GET', '/batch_progress/{batch_id}', batch_progress)
app['backend'] = backend
return app
def get_local_backend(cfg):
if 'vault' not in cfg:
raise ValueError("missing '%vault' configuration")
vcfg = cfg['vault']
if vcfg['cls'] != 'local':
raise EnvironmentError(
"The vault backend can only be started with a 'local' "
"configuration", err=True)
args = vcfg['args']
if 'cache' not in args:
args['cache'] = cfg.get('cache')
if 'storage' not in args:
args['storage'] = cfg.get('storage')
if 'scheduler' not in args:
args['scheduler'] = cfg.get('scheduler')
for key in ('cache', 'storage', 'scheduler'):
if not args.get(key):
raise ValueError(
"invalid configuration; missing %s config entry." % key)
return get_vault('local', args)
def make_app_from_configfile(config_file=None, **kwargs):
if config_file is None:
config_file = DEFAULT_CONFIG_PATH
config_file = os.environ.get('SWH_CONFIG_FILENAME', config_file)
if os.path.isfile(config_file):
cfg = config.read(config_file, DEFAULT_CONFIG)
else:
cfg = config.load_named_config(config_file, DEFAULT_CONFIG)
vault = get_local_backend(cfg)
return make_app(backend=vault, client_max_size=cfg['client_max_size'],
**kwargs)
if __name__ == '__main__':
print('Deprecated. Use swh-vault ')

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 1:14 PM (6 d, 22 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3294220

Event Timeline