diff --git a/swh/storage/api/client.py b/swh/storage/api/client.py index 937d7f6a..f8f9ad20 100644 --- a/swh/storage/api/client.py +++ b/swh/storage/api/client.py @@ -1,199 +1,178 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pickle - import requests from requests.exceptions import ConnectionError -from swh.core.serializers import msgpack_dumps, msgpack_loads, SWHJSONDecoder -from swh.storage.exc import StorageAPIError - - -def encode_data(data): - try: - return msgpack_dumps(data) - except OverflowError as e: - raise ValueError('Limits were reached. Please, check your input.\n' + - str(e)) - - -def decode_response(response): - content_type = response.headers['content-type'] - - if content_type.startswith('application/x-msgpack'): - r = msgpack_loads(response.content) - elif content_type.startswith('application/json'): - r = response.json(cls=SWHJSONDecoder) - else: - raise ValueError('Wrong content type `%s` for API response' - % content_type) - return r +from ..exc import StorageAPIError +from ..api.common import (decode_response, + encode_data_client as encode_data) class RemoteStorage(): """Proxy to a remote storage API""" def __init__(self, base_url): self.base_url = base_url self.session = requests.Session() def url(self, endpoint): return '%s%s' % (self.base_url, endpoint) def post(self, endpoint, data): try: response = self.session.post( self.url(endpoint), data=encode_data(data), headers={'content-type': 'application/x-msgpack'}, ) except ConnectionError as e: print(str(e)) raise StorageAPIError(e) # XXX: this breaks language-independence and should be # replaced by proper unserialization if response.status_code == 400: raise pickle.loads(decode_response(response)) return decode_response(response) def get(self, endpoint, data=None): try: response = self.session.get( self.url(endpoint), params=data, ) except ConnectionError as e: print(str(e)) raise StorageAPIError(e) if response.status_code == 404: return None # XXX: this breaks language-independence and should be # replaced by proper unserialization if response.status_code == 400: raise pickle.loads(decode_response(response)) else: return decode_response(response) def content_add(self, content): return self.post('content/add', {'content': content}) def content_missing(self, content, key_hash='sha1'): return self.post('content/missing', {'content': content, 'key_hash': key_hash}) def content_missing_per_sha1(self, contents): return self.post('content/missing/sha1', {'contents': contents}) def content_get(self, content): return self.post('content/data', {'content': content}) def content_find(self, content): return self.post('content/present', {'content': content}) def content_find_occurrence(self, content): return self.post('content/occurrence', {'content': content}) def directory_add(self, directories): return self.post('directory/add', {'directories': directories}) def directory_missing(self, directories): return self.post('directory/missing', {'directories': directories}) def directory_get(self, directories): return self.post('directory', dict(directories=directories)) def directory_ls(self, directory, recursive=False): return self.get('directory/ls', {'directory': directory, 'recursive': recursive}) def revision_get(self, revisions): return self.post('revision', {'revisions': revisions}) def revision_get_by(self, origin_id, branch_name, timestamp, limit=None): return self.post('revision/by', dict(origin_id=origin_id, branch_name=branch_name, timestamp=timestamp, limit=limit)) def revision_log(self, revisions, limit=None): return self.post('revision/log', {'revisions': revisions, 'limit': limit}) def revision_shortlog(self, revisions, limit=None): return self.post('revision/shortlog', {'revisions': revisions, 'limit': limit}) def revision_add(self, revisions): return self.post('revision/add', {'revisions': revisions}) def revision_missing(self, revisions): return self.post('revision/missing', {'revisions': revisions}) def release_add(self, releases): return self.post('release/add', {'releases': releases}) def release_get(self, releases): return self.post('release', {'releases': releases}) def release_get_by(self, origin_id, limit=None): return self.post('release/by', dict(origin_id=origin_id, limit=limit)) def release_missing(self, releases): return self.post('release/missing', {'releases': releases}) def object_find_by_sha1_git(self, ids): return self.post('object/find_by_sha1_git', {'ids': ids}) def occurrence_get(self, origin_id): return self.post('occurrence', {'origin_id': origin_id}) def occurrence_add(self, occurrences): return self.post('occurrence/add', {'occurrences': occurrences}) def origin_get(self, origin): return self.post('origin/get', {'origin': origin}) def origin_add_one(self, origin): return self.post('origin/add', {'origin': origin}) def person_get(self, person): return self.post('person', {'person': person}) def fetch_history_start(self, origin_id): return self.post('fetch_history/start', {'origin_id': origin_id}) def fetch_history_end(self, fetch_history_id, data): return self.post('fetch_history/end', {'fetch_history_id': fetch_history_id, 'data': data}) def fetch_history_get(self, fetch_history_id): return self.get('fetch_history', {'id': fetch_history_id}) def entity_add(self, entities): return self.post('entity/add', {'entities': entities}) def entity_get(self, uuid): return self.post('entity/get', {'uuid': uuid}) def entity_get_one(self, uuid): return self.get('entity', {'uuid': uuid}) def entity_get_from_lister_metadata(self, entities): return self.post('entity/from_lister_metadata', {'entities': entities}) def stat_counters(self): return self.get('stat/counters') def directory_entry_get_by_path(self, directory, paths): return self.post('directory/path', dict(directory=directory, paths=paths)) diff --git a/swh/storage/api/common.py b/swh/storage/api/common.py new file mode 100644 index 00000000..328d8260 --- /dev/null +++ b/swh/storage/api/common.py @@ -0,0 +1,69 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import json +import pickle + +from flask import Request, Response + +from swh.core.serializers import msgpack_dumps, msgpack_loads, SWHJSONDecoder + + +class BytesRequest(Request): + """Request with proper escaping of arbitrary byte sequences.""" + encoding = 'utf-8' + encoding_errors = 'surrogateescape' + + +def encode_data_server(data): + return Response( + msgpack_dumps(data), + mimetype='application/x-msgpack', + ) + + +def encode_data_client(data): + try: + return msgpack_dumps(data) + except OverflowError as e: + raise ValueError('Limits were reached. Please, check your input.\n' + + str(e)) + + +def decode_request(request): + content_type = request.mimetype + data = request.get_data() + + if content_type == 'application/x-msgpack': + r = msgpack_loads(data) + elif content_type == 'application/json': + r = json.loads(data, cls=SWHJSONDecoder) + else: + raise ValueError('Wrong content type `%s` for API request' + % content_type) + + return r + + +def decode_response(response): + content_type = response.headers['content-type'] + + if content_type.startswith('application/x-msgpack'): + r = msgpack_loads(response.content) + elif content_type.startswith('application/json'): + r = response.json(cls=SWHJSONDecoder) + else: + raise ValueError('Wrong content type `%s` for API response' + % content_type) + + return r + + +def error_handler(exception, encoder): + # XXX: this breaks language-independence and should be + # replaced by proper serialization of errors + response = encoder(pickle.dumps(exception)) + response.status_code = 400 + return response diff --git a/swh/storage/api/server.py b/swh/storage/api/server.py index bce183e7..de7bb332 100644 --- a/swh/storage/api/server.py +++ b/swh/storage/api/server.py @@ -1,278 +1,246 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import logging -import pickle -from flask import Flask, Request, Response, g, request +from flask import Flask, g, request from swh.core import config -from swh.core.serializers import msgpack_dumps, msgpack_loads, SWHJSONDecoder -from swh.storage import Storage +from .. import Storage +from ..api.common import (BytesRequest, decode_request, error_handler, + encode_data_server as encode_data) DEFAULT_CONFIG = { 'db': ('str', 'dbname=softwareheritage-dev'), 'storage_base': ('str', '/tmp/swh-storage/test'), } -class BytesRequest(Request): - """Request with proper escaping of arbitrary byte sequences.""" - encoding = 'utf-8' - encoding_errors = 'surrogateescape' - - app = Flask(__name__) app.request_class = BytesRequest -def encode_data(data): - return Response( - msgpack_dumps(data), - mimetype='application/x-msgpack', - ) - - -def decode_request(request): - content_type = request.mimetype - data = request.get_data() - - if content_type == 'application/x-msgpack': - r = msgpack_loads(data) - elif content_type == 'application/json': - r = json.loads(data, cls=SWHJSONDecoder) - else: - raise ValueError('Wrong content type `%s` for API request' - % content_type) - - return r - - @app.errorhandler(Exception) -def error_handler(exception): - # XXX: this breaks language-independence and should be - # replaced by proper serialization of errors - response = encode_data(pickle.dumps(exception)) - response.status_code = 400 - return response +def my_error_handler(exception): + return error_handler(exception, encode_data) @app.before_request def before_request(): g.storage = Storage(app.config['db'], app.config['storage_base']) @app.route('/') def index(): return 'Hello' @app.route('/content/missing', methods=['POST']) def content_missing(): return encode_data(g.storage.content_missing(**decode_request(request))) @app.route('/content/missing/sha1', methods=['POST']) def content_missing_per_sha1(): return encode_data(g.storage.content_missing_per_sha1( **decode_request(request))) @app.route('/content/present', methods=['POST']) def content_find(): return encode_data(g.storage.content_find(**decode_request(request))) @app.route('/content/occurrence', methods=['POST']) def content_find_occurrence(): res = g.storage.content_find_occurrence(**decode_request(request)) return encode_data(res) @app.route('/content/add', methods=['POST']) def content_add(): return encode_data(g.storage.content_add(**decode_request(request))) @app.route('/content/data', methods=['POST']) def content_get(): return encode_data(g.storage.content_get(**decode_request(request))) @app.route('/directory', methods=['POST']) def directory_get(): return encode_data(g.storage.directory_get(**decode_request(request))) @app.route('/directory/missing', methods=['POST']) def directory_missing(): return encode_data(g.storage.directory_missing(**decode_request(request))) @app.route('/directory/add', methods=['POST']) def directory_add(): return encode_data(g.storage.directory_add(**decode_request(request))) @app.route('/directory/path', methods=['POST']) def directory_entry_get_by_path(): return encode_data(g.storage.directory_entry_get_by_path( **decode_request(request))) @app.route('/directory/ls', methods=['GET']) def directory_ls(): dir = request.args['directory'].encode('utf-8', 'surrogateescape') rec = json.loads(request.args.get('recursive', 'False').lower()) return encode_data(g.storage.directory_ls(dir, recursive=rec)) @app.route('/revision/add', methods=['POST']) def revision_add(): return encode_data(g.storage.revision_add(**decode_request(request))) @app.route('/revision', methods=['POST']) def revision_get(): return encode_data(g.storage.revision_get(**decode_request(request))) @app.route('/revision/by', methods=['POST']) def revision_get_by(): return encode_data(g.storage.revision_get_by(**decode_request(request))) @app.route('/revision/log', methods=['POST']) def revision_log(): return encode_data(g.storage.revision_log(**decode_request(request))) @app.route('/revision/shortlog', methods=['POST']) def revision_shortlog(): return encode_data(g.storage.revision_shortlog(**decode_request(request))) @app.route('/revision/missing', methods=['POST']) def revision_missing(): return encode_data(g.storage.revision_missing(**decode_request(request))) @app.route('/release/add', methods=['POST']) def release_add(): return encode_data(g.storage.release_add(**decode_request(request))) @app.route('/release', methods=['POST']) def release_get(): return encode_data(g.storage.release_get(**decode_request(request))) @app.route('/release/by', methods=['POST']) def release_get_by(): return encode_data(g.storage.release_get_by(**decode_request(request))) @app.route('/release/missing', methods=['POST']) def release_missing(): return encode_data(g.storage.release_missing(**decode_request(request))) @app.route('/object/find_by_sha1_git', methods=['POST']) def object_find_by_sha1_git(): return encode_data(g.storage.object_find_by_sha1_git( **decode_request(request))) @app.route('/occurrence', methods=['POST']) def occurrence_get(): return encode_data(g.storage.occurrence_get(**decode_request(request))) @app.route('/occurrence/add', methods=['POST']) def occurrence_add(): return encode_data(g.storage.occurrence_add(**decode_request(request))) @app.route('/origin/get', methods=['POST']) def origin_get(): return encode_data(g.storage.origin_get(**decode_request(request))) @app.route('/origin/add', methods=['POST']) def origin_add_one(): return encode_data(g.storage.origin_add_one(**decode_request(request))) @app.route('/person', methods=['POST']) def person_get(): return encode_data(g.storage.person_get(**decode_request(request))) @app.route('/fetch_history', methods=['GET']) def fetch_history_get(): return encode_data(g.storage.fetch_history_get(request.args['id'])) @app.route('/fetch_history/start', methods=['POST']) def fetch_history_start(): return encode_data( g.storage.fetch_history_start(**decode_request(request))) @app.route('/fetch_history/end', methods=['POST']) def fetch_history_end(): return encode_data( g.storage.fetch_history_end(**decode_request(request))) @app.route('/entity/add', methods=['POST']) def entity_add(): return encode_data( g.storage.entity_add(**decode_request(request))) @app.route('/entity/get', methods=['POST']) def entity_get(): return encode_data( g.storage.entity_get(**decode_request(request))) @app.route('/entity', methods=['GET']) def entity_get_one(): return encode_data(g.storage.entity_get_one(request.args['uuid'])) @app.route('/entity/from_lister_metadata', methods=['POST']) def entity_from_lister_metadata(): return encode_data( g.storage.entity_get_from_lister_metadata(**decode_request(request))) @app.route('/stat/counters', methods=['GET']) def stat_counters(): return encode_data(g.storage.stat_counters()) def run_from_webserver(environ, start_response): """Run the WSGI app from the webserver, loading the configuration.""" config_path = '/etc/softwareheritage/storage/storage.ini' app.config.update(config.read(config_path, DEFAULT_CONFIG)) handler = logging.StreamHandler() app.logger.addHandler(handler) return app(environ, start_response) if __name__ == '__main__': import sys app.config.update(config.read(sys.argv[1], DEFAULT_CONFIG)) host = sys.argv[2] if len(sys.argv) >= 3 else '127.0.0.1' app.run(host, debug=True) diff --git a/swh/storage/objstorage/api/__init__.py b/swh/storage/objstorage/api/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/swh/storage/objstorage/api/client.py b/swh/storage/objstorage/api/client.py new file mode 100644 index 00000000..8f030865 --- /dev/null +++ b/swh/storage/objstorage/api/client.py @@ -0,0 +1,92 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +import pickle + +import requests + +from requests.exceptions import ConnectionError +from ...exc import StorageAPIError +from ...api.common import (decode_response, + encode_data_client as encode_data) + + +class RemoteObjStorage(): + """ Proxy to a remote object storage. + + This class allows to connect to an object storage server via + http protocol. + + Attributes: + base_url (string): The url of the server to connect. Must end + with a '/' + session: The session to send requests. + """ + def __init__(self, base_url): + self.base_url = base_url + self.session = requests.Session() + + def url(self, endpoint): + return '%s%s' % (self.base_url, endpoint) + + def post(self, endpoint, data): + try: + response = self.session.post( + self.url(endpoint), + data=encode_data(data), + headers={'content-type': 'application/x-msgpack'}, + ) + except ConnectionError as e: + print(str(e)) + raise StorageAPIError(e) + + # XXX: this breaks language-independence and should be + # replaced by proper unserialization + if response.status_code == 400: + raise pickle.loads(decode_response(response)) + + return decode_response(response) + + def content_add(self, bytes, obj_id=None): + """ Add a new object to the object storage. + + Args: + bytes: content of the object to be added to the storage. + obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When + given, obj_id will be trusted to match bytes. If missing, + obj_id will be computed on the fly. + + """ + return self.post('content/add', {'bytes': bytes, 'obj_id': obj_id}) + + def content_get(self, obj_id): + """ Retrieve the content of a given object. + + Args: + obj_id: The id of the object. + + Returns: + The content of the requested objects as bytes. + + Raises: + ObjNotFoundError: if the requested object is missing + """ + return self.post('content/get', {'obj_id': obj_id}) + + def content_check(self, obj_id): + """ Integrity check for a given object + + verify that the file object is in place, and that the gzipped content + matches the object id + + Args: + obj_id: The id of the object. + + Raises: + ObjNotFoundError: if the requested object is missing + Error: if the requested object is corrupt + """ + self.post('content/check', {'obj_id': obj_id}) diff --git a/swh/storage/objstorage/api/server.py b/swh/storage/objstorage/api/server.py new file mode 100644 index 00000000..1cd1f3eb --- /dev/null +++ b/swh/storage/objstorage/api/server.py @@ -0,0 +1,67 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +from flask import Flask, g, request + +from swh.core import config +from .. import ObjStorage +from ...api.common import (BytesRequest, decode_request, error_handler, + encode_data_server as encode_data) + +DEFAULT_CONFIG = { + 'storage_base': ('str', '/tmp/swh-storage/objects/'), + 'storage_depth': ('int', 3) +} + +app = Flask(__name__) +app.request_class = BytesRequest + + +@app.errorhandler(Exception) +def my_error_handler(exception): + return error_handler(exception, encode_data) + + +@app.before_request +def before_request(): + g.objstorage = ObjStorage(app.config['storage_base'], + app.config['storage_depth']) + + +@app.route('/') +def index(): + return "Helloworld!" + + +@app.route('/content') +def content(): + return str(list(g.storage)) + + +@app.route('/content/add', methods=['POST']) +def add_bytes(): + return encode_data(g.objstorage.add_bytes(**decode_request(request))) + + +@app.route('/content/get', methods=['POST']) +def get_bytes(): + return encode_data(g.objstorage.get_bytes(**decode_request(request))) + + +@app.route('/content/check', methods=['POST']) +def check(): + # TODO verify that an error on this content will be properly intercepted + # by @app.errorhandler and the answer will be sent to client. + return encode_data(g.objstorage.check(**decode_request(request))) + + +if __name__ == '__main__': + import sys + + app.config.update(config.read(sys.argv[1], DEFAULT_CONFIG)) + + host = sys.argv[2] if len(sys.argv) >= 3 else '0.0.0.0' + app.run(host, debug=True)