diff --git a/swh/objstorage/api/server.py b/swh/objstorage/api/server.py --- a/swh/objstorage/api/server.py +++ b/swh/objstorage/api/server.py @@ -3,50 +3,48 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import asyncio -import aiohttp.web import os +import aiohttp.web + from swh.core.config import read as config_read -from swh.core.api_async import (SWHRemoteAPI, decode_request, - encode_data_server as encode_data) +from swh.core.api.asynchronous import (SWHRemoteAPI, decode_request, + encode_data_server as encode_data) + + +from swh.core.api.serializers import msgpack_loads, SWHJSONDecoder + from swh.model import hashutil from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError -@asyncio.coroutine -def index(request): +async def index(request): return aiohttp.web.Response(body="SWH Objstorage API server") -@asyncio.coroutine -def check_config(request): - req = yield from decode_request(request) +async def check_config(request): + req = await decode_request(request) return encode_data(request.app['objstorage'].check_config(**req)) -@asyncio.coroutine -def contains(request): - req = yield from decode_request(request) +async def contains(request): + req = await decode_request(request) return encode_data(request.app['objstorage'].__contains__(**req)) -@asyncio.coroutine -def add_bytes(request): - req = yield from decode_request(request) +async def add_bytes(request): + req = await decode_request(request) return encode_data(request.app['objstorage'].add(**req)) -@asyncio.coroutine -def add_batch(request): - req = yield from decode_request(request) +async def add_batch(request): + req = await decode_request(request) return encode_data(request.app['objstorage'].add_batch(**req)) -@asyncio.coroutine -def get_bytes(request): - req = yield from decode_request(request) +async def get_bytes(request): + req = await decode_request(request) try: ret = request.app['objstorage'].get(**req) except ObjNotFoundError: @@ -59,36 +57,31 @@ return encode_data(ret) -@asyncio.coroutine -def get_batch(request): - req = yield from decode_request(request) +async def get_batch(request): + req = await decode_request(request) return encode_data(request.app['objstorage'].get_batch(**req)) -@asyncio.coroutine -def check(request): - req = yield from decode_request(request) +async def check(request): + req = await decode_request(request) return encode_data(request.app['objstorage'].check(**req)) -@asyncio.coroutine -def delete(request): - req = yield from decode_request(request) +async def delete(request): + req = await decode_request(request) return encode_data(request.app['objstorage'].delete(**req)) # Management methods -@asyncio.coroutine -def get_random_contents(request): - req = yield from decode_request(request) +async def get_random_contents(request): + req = await decode_request(request) return encode_data(request.app['objstorage'].get_random(**req)) # Streaming methods -@asyncio.coroutine -def add_stream(request): +async def add_stream(request): hex_id = request.match_info['hex_id'] obj_id = hashutil.hash_to_bytes(hex_id) check_pres = (request.query.get('check_presence', '').lower() == 'true') @@ -97,24 +90,38 @@ if check_pres and obj_id in objstorage: return encode_data(obj_id) + # XXX this really should go in a decode_stream_request coroutine in + # swh.core, but since py35 does not support async generators, it cannot + # easily be made for now + content_type = request.headers.get('Content-Type') + if content_type == 'application/x-msgpack': + decode = msgpack_loads + elif content_type == 'application/json': + decode = lambda x: json.loads(x, cls=SWHJSONDecoder) # noqa + else: + raise ValueError('Wrong content type `%s` for API request' + % content_type) + + buffer = b'' with objstorage.chunk_writer(obj_id) as write: - # XXX (3.5): use 'async for chunk in request.content.iter_any()' while not request.content.at_eof(): - chunk = yield from request.content.readany() - write(chunk) + data, eot = await request.content.readchunk() + buffer += data + if eot: + write(decode(buffer)) + buffer = b'' return encode_data(obj_id) -@asyncio.coroutine -def get_stream(request): +async def get_stream(request): hex_id = request.match_info['hex_id'] obj_id = hashutil.hash_to_bytes(hex_id) response = aiohttp.web.StreamResponse() - yield from response.prepare(request) + await response.prepare(request) for chunk in request.app['objstorage'].get_stream(obj_id, 2 << 20): response.write(chunk) - yield from response.drain() + await response.drain() return response