Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/api/server.py
Show All 12 Lines | |||||
from swh.core.api.serializers import msgpack_loads, SWHJSONDecoder | from swh.core.api.serializers import msgpack_loads, SWHJSONDecoder | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.objstorage import get_objstorage | from swh.objstorage import get_objstorage | ||||
from swh.objstorage.objstorage import DEFAULT_LIMIT | from swh.objstorage.objstorage import DEFAULT_LIMIT | ||||
from swh.objstorage.exc import ObjNotFoundError | from swh.objstorage.exc import ObjNotFoundError | ||||
from swh.core.statsd import statsd | |||||
def timed(f): | |||||
async def w(*a, **kw): | |||||
with statsd.timed( | |||||
'swh_objstorage_request_duration_seconds', | |||||
tags={'endpoint': f.__name__}): | |||||
return await f(*a, **kw) | |||||
return w | |||||
@timed | |||||
async def index(request): | async def index(request): | ||||
return aiohttp.web.Response(body="SWH Objstorage API server") | return aiohttp.web.Response(body="SWH Objstorage API server") | ||||
@timed | |||||
async def check_config(request): | async def check_config(request): | ||||
req = await decode_request(request) | req = await decode_request(request) | ||||
return encode_data(request.app['objstorage'].check_config(**req)) | return encode_data(request.app['objstorage'].check_config(**req)) | ||||
@timed | |||||
async def contains(request): | async def contains(request): | ||||
req = await decode_request(request) | req = await decode_request(request) | ||||
return encode_data(request.app['objstorage'].__contains__(**req)) | return encode_data(request.app['objstorage'].__contains__(**req)) | ||||
@timed | |||||
async def add_bytes(request): | async def add_bytes(request): | ||||
req = await decode_request(request) | req = await decode_request(request) | ||||
statsd.increment('swh_objstorage_in_bytes_total', | |||||
len(req['content']), | |||||
tags={'endpoint': 'add_bytes'}) | |||||
return encode_data(request.app['objstorage'].add(**req)) | return encode_data(request.app['objstorage'].add(**req)) | ||||
@timed | |||||
async def add_batch(request): | async def add_batch(request): | ||||
req = await decode_request(request) | req = await decode_request(request) | ||||
return encode_data(request.app['objstorage'].add_batch(**req)) | return encode_data(request.app['objstorage'].add_batch(**req)) | ||||
@timed | |||||
async def get_bytes(request): | async def get_bytes(request): | ||||
req = await decode_request(request) | req = await decode_request(request) | ||||
try: | try: | ||||
ret = request.app['objstorage'].get(**req) | ret = request.app['objstorage'].get(**req) | ||||
except ObjNotFoundError: | except ObjNotFoundError: | ||||
ret = { | ret = { | ||||
'error': 'object_not_found', | 'error': 'object_not_found', | ||||
'request': req, | 'request': req, | ||||
} | } | ||||
return encode_data(ret, status=404) | return encode_data(ret, status=404) | ||||
else: | else: | ||||
statsd.increment('swh_objstorage_out_bytes_total', | |||||
len(ret), | |||||
tags={'endpoint': 'get_bytes'}) | |||||
return encode_data(ret) | return encode_data(ret) | ||||
@timed | |||||
async def get_batch(request): | async def get_batch(request): | ||||
req = await decode_request(request) | req = await decode_request(request) | ||||
return encode_data(request.app['objstorage'].get_batch(**req)) | return encode_data(request.app['objstorage'].get_batch(**req)) | ||||
@timed | |||||
async def check(request): | async def check(request): | ||||
req = await decode_request(request) | req = await decode_request(request) | ||||
return encode_data(request.app['objstorage'].check(**req)) | return encode_data(request.app['objstorage'].check(**req)) | ||||
@timed | |||||
async def delete(request): | async def delete(request): | ||||
req = await decode_request(request) | req = await decode_request(request) | ||||
return encode_data(request.app['objstorage'].delete(**req)) | return encode_data(request.app['objstorage'].delete(**req)) | ||||
# Management methods | # Management methods | ||||
@timed | |||||
async def get_random_contents(request): | async def get_random_contents(request): | ||||
req = await decode_request(request) | req = await decode_request(request) | ||||
return encode_data(request.app['objstorage'].get_random(**req)) | return encode_data(request.app['objstorage'].get_random(**req)) | ||||
# Streaming methods | # Streaming methods | ||||
@timed | |||||
async def add_stream(request): | async def add_stream(request): | ||||
hex_id = request.match_info['hex_id'] | hex_id = request.match_info['hex_id'] | ||||
obj_id = hashutil.hash_to_bytes(hex_id) | obj_id = hashutil.hash_to_bytes(hex_id) | ||||
check_pres = (request.query.get('check_presence', '').lower() == 'true') | check_pres = (request.query.get('check_presence', '').lower() == 'true') | ||||
objstorage = request.app['objstorage'] | objstorage = request.app['objstorage'] | ||||
if check_pres and obj_id in objstorage: | if check_pres and obj_id in objstorage: | ||||
return encode_data(obj_id) | return encode_data(obj_id) | ||||
Show All 17 Lines | with objstorage.chunk_writer(obj_id) as write: | ||||
buffer += data | buffer += data | ||||
if eot: | if eot: | ||||
write(decode(buffer)) | write(decode(buffer)) | ||||
buffer = b'' | buffer = b'' | ||||
return encode_data(obj_id) | return encode_data(obj_id) | ||||
@timed | |||||
async def get_stream(request): | async def get_stream(request): | ||||
hex_id = request.match_info['hex_id'] | hex_id = request.match_info['hex_id'] | ||||
obj_id = hashutil.hash_to_bytes(hex_id) | obj_id = hashutil.hash_to_bytes(hex_id) | ||||
response = aiohttp.web.StreamResponse() | response = aiohttp.web.StreamResponse() | ||||
await response.prepare(request) | await response.prepare(request) | ||||
for chunk in request.app['objstorage'].get_stream(obj_id, 2 << 20): | for chunk in request.app['objstorage'].get_stream(obj_id, 2 << 20): | ||||
response.write(chunk) | response.write(chunk) | ||||
await response.drain() | await response.drain() | ||||
return response | return response | ||||
@timed | |||||
async def list_content(request): | async def list_content(request): | ||||
last_obj_id = request.query.get('last_obj_id') | last_obj_id = request.query.get('last_obj_id') | ||||
if last_obj_id: | if last_obj_id: | ||||
last_obj_id = bytes.fromhex(last_obj_id) | last_obj_id = bytes.fromhex(last_obj_id) | ||||
limit = int(request.query.get('limit', DEFAULT_LIMIT)) | limit = int(request.query.get('limit', DEFAULT_LIMIT)) | ||||
response = aiohttp.web.StreamResponse() | response = aiohttp.web.StreamResponse() | ||||
response.enable_chunked_encoding() | response.enable_chunked_encoding() | ||||
await response.prepare(request) | await response.prepare(request) | ||||
▲ Show 20 Lines • Show All 102 Lines • Show Last 20 Lines |