diff --git a/swh/objstorage/api/client.py b/swh/objstorage/api/client.py index cf89eda..733458c 100644 --- a/swh/objstorage/api/client.py +++ b/swh/objstorage/api/client.py @@ -1,103 +1,103 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pickle import requests from requests.exceptions import ConnectionError from ..exc import ObjStorageAPIError -from ...storage.api.common import (decode_response, - encode_data_client as encode_data) +from .common import (decode_response, + encode_data_client as encode_data) class RemoteObjStorage(): """ Proxy to a remote object storage. This class allows to connect to an object storage server via http protocol. Attributes: base_url (string): The url of the server to connect. Must end with a '/' session: The session to send requests. """ def __init__(self, base_url): self.base_url = base_url self.session = requests.Session() def url(self, endpoint): return '%s%s' % (self.base_url, endpoint) def post(self, endpoint, data): try: response = self.session.post( self.url(endpoint), data=encode_data(data), headers={'content-type': 'application/x-msgpack'}, ) except ConnectionError as e: print(str(e)) raise ObjStorageAPIError(e) # XXX: this breaks language-independence and should be # replaced by proper unserialization if response.status_code == 400: raise pickle.loads(decode_response(response)) return decode_response(response) def content_add(self, bytes, obj_id=None): """ Add a new object to the object storage. Args: bytes: content of the object to be added to the storage. obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When given, obj_id will be trusted to match bytes. If missing, obj_id will be computed on the fly. """ return self.post('content/add', {'bytes': bytes, 'obj_id': obj_id}) def content_get(self, obj_id): """ Retrieve the content of a given object. Args: obj_id: The id of the object. Returns: The content of the requested objects as bytes. Raises: ObjNotFoundError: if the requested object is missing """ return self.post('content/get', {'obj_id': obj_id}) def content_get_random(self, batch_size): """ Retrieve a random sample of existing content. Args: batch_size: Number of content requested. Returns: A list of random ids that represents existing contents. """ return self.post('content/get/random', {'batch_size': batch_size}) def content_check(self, obj_id): """ Integrity check for a given object verify that the file object is in place, and that the gzipped content matches the object id Args: obj_id: The id of the object. Raises: ObjNotFoundError: if the requested object is missing Error: if the requested object is corrupt """ self.post('content/check', {'obj_id': obj_id}) diff --git a/swh/objstorage/api/common.py b/swh/objstorage/api/common.py new file mode 100644 index 0000000..328d826 --- /dev/null +++ b/swh/objstorage/api/common.py @@ -0,0 +1,69 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import json +import pickle + +from flask import Request, Response + +from swh.core.serializers import msgpack_dumps, msgpack_loads, SWHJSONDecoder + + +class BytesRequest(Request): + """Request with proper escaping of arbitrary byte sequences.""" + encoding = 'utf-8' + encoding_errors = 'surrogateescape' + + +def encode_data_server(data): + return Response( + msgpack_dumps(data), + mimetype='application/x-msgpack', + ) + + +def encode_data_client(data): + try: + return msgpack_dumps(data) + except OverflowError as e: + raise ValueError('Limits were reached. Please, check your input.\n' + + str(e)) + + +def decode_request(request): + content_type = request.mimetype + data = request.get_data() + + if content_type == 'application/x-msgpack': + r = msgpack_loads(data) + elif content_type == 'application/json': + r = json.loads(data, cls=SWHJSONDecoder) + else: + raise ValueError('Wrong content type `%s` for API request' + % content_type) + + return r + + +def decode_response(response): + content_type = response.headers['content-type'] + + if content_type.startswith('application/x-msgpack'): + r = msgpack_loads(response.content) + elif content_type.startswith('application/json'): + r = response.json(cls=SWHJSONDecoder) + else: + raise ValueError('Wrong content type `%s` for API response' + % content_type) + + return r + + +def error_handler(exception, encoder): + # XXX: this breaks language-independence and should be + # replaced by proper serialization of errors + response = encoder(pickle.dumps(exception)) + response.status_code = 400 + return response diff --git a/swh/objstorage/api/server.py b/swh/objstorage/api/server.py index 8d9787f..120145e 100644 --- a/swh/objstorage/api/server.py +++ b/swh/objstorage/api/server.py @@ -1,96 +1,97 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import logging from flask import Flask, g, request from swh.core import config from swh.objstorage import PathSlicingObjStorage -from swh.storage.api.common import (BytesRequest, decode_request, - error_handler, - encode_data_server as encode_data) + +from swh.objstorage.api.common import (BytesRequest, decode_request, + error_handler, + encode_data_server as encode_data) DEFAULT_CONFIG = { 'storage_base': ('str', '/tmp/swh-storage/objects/'), 'storage_slicing': ('str', '0:2/2:4/4:6') } app = Flask(__name__) app.request_class = BytesRequest @app.errorhandler(Exception) def my_error_handler(exception): return error_handler(exception, encode_data) @app.before_request def before_request(): g.objstorage = PathSlicingObjStorage(app.config['storage_base'], app.config['storage_slicing']) @app.route('/') def index(): return "SWH Objstorage API server" @app.route('/content') def content(): return str(list(g.storage)) @app.route('/content/add', methods=['POST']) def add_bytes(): return encode_data(g.objstorage.add(**decode_request(request))) @app.route('/content/get', methods=['POST']) def get_bytes(): return encode_data(g.objstorage.get(**decode_request(request))) @app.route('/content/get/random', methods=['POST']) def get_random_contents(): return encode_data( g.objstorage.get_random(**decode_request(request)) ) @app.route('/content/check', methods=['POST']) def check(): return encode_data(g.objstorage.check(**decode_request(request))) def run_from_webserver(environ, start_response): """Run the WSGI app from the webserver, loading the configuration. """ config_path = '/etc/softwareheritage/storage/objstorage.ini' app.config.update(config.read(config_path, DEFAULT_CONFIG)) handler = logging.StreamHandler() app.logger.addHandler(handler) return app(environ, start_response) @click.command() @click.argument('config-path', required=1) @click.option('--host', default='0.0.0.0', help="Host to run the server") @click.option('--port', default=5000, type=click.INT, help="Binding port of the server") @click.option('--debug/--nodebug', default=True, help="Indicates if the server should run in debug mode") def launch(config_path, host, port, debug): app.config.update(config.read(config_path, DEFAULT_CONFIG)) app.run(host, port=int(port), debug=bool(debug)) if __name__ == '__main__': launch()