diff --git a/swh/storage/objstorage/__init__.py b/swh/storage/objstorage/__init__.py new file mode 100644 --- /dev/null +++ b/swh/storage/objstorage/__init__.py @@ -0,0 +1 @@ +from .objstorage import ObjStorage, DIR_MODE, FILE_MODE # NOQA diff --git a/swh/storage/objstorage/api/__init__.py b/swh/storage/objstorage/api/__init__.py new file mode 100644 diff --git a/swh/storage/objstorage/api/client.py b/swh/storage/objstorage/api/client.py new file mode 100644 --- /dev/null +++ b/swh/storage/objstorage/api/client.py @@ -0,0 +1,104 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +import pickle + +import requests + +from requests.exceptions import ConnectionError +from swh.core.serializers import msgpack_dumps, msgpack_loads, SWHJSONDecoder +from swh.storage.exc import StorageAPIError + + +def encode_data(data): + try: + return msgpack_dumps(data) + except OverflowError as e: + raise ValueError('Limits were reached. Please, check your input.\n' + + str(e)) + + +def decode_response(response): + content_type = response.headers['content-type'] + + if content_type.startswith('application/x-msgpack'): + r = msgpack_loads(response.content) + elif content_type.startswith('application/json'): + r = response.json(cls=SWHJSONDecoder) + else: + raise ValueError('Wrong content type `%s` for API response' + % content_type) + + return r + + +class RemoteArchive(): + """ proxy to a remote archive storage. """ + def __init__(self, base_url): + self.base_url = base_url + self.session = requests.Session() + + def url(self, endpoint): + return '%s%s' % (self.base_url, endpoint) + + def post(self, endpoint, data): + try: + response = self.session.post( + self.url(endpoint), + data=encode_data(data), + headers={'content-type': 'application/x-msgpack'}, + ) + except ConnectionError as e: + print(str(e)) + raise StorageAPIError(e) + + # XXX: this breaks language-independence and should be + # replaced by proper unserialization + if response.status_code == 400: + raise pickle.loads(decode_response(response)) + + return decode_response(response) + + def content_add(self, bytes, obj_id=None): + """ Add a new object to the object storage. + + Args: + bytes: content of the object to be added to the storage. + obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When + given, obj_id will be trusted to match bytes. If missing, + obj_id will be computed on the fly. + + """ + return self.post('content/add', {'bytes': bytes, 'obj_id': obj_id}) + + def content_get(self, obj_id): + """ Retrieve the content of a given object. + + Args: + obj_id: The id of the object. + + Returns: + The content of the requested objects as bytes. + + Raises: + ObjNotFoundError: if the requested object is missing + """ + return self.post('content/get', {'obj_id': obj_id}) + + def content_check(self, obj_id): + """ Integrity check for a given object + + verify that the file object is in place, and that the gzipped content + matches the object id + + Args: + obj_id: The id of the object. + + Raises: + ObjNotFoundError: if the requested object is missing + Error: if the requested object is corrupt + """ + self.post('content/check', {'obj_id': obj_id}) diff --git a/swh/storage/objstorage/api/server.py b/swh/storage/objstorage/api/server.py new file mode 100644 --- /dev/null +++ b/swh/storage/objstorage/api/server.py @@ -0,0 +1,101 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pickle +import json + +from flask import Flask, Request, Response, g, request + +from swh.core import config +from swh.core.serializers import msgpack_dumps, msgpack_loads, SWHJSONDecoder +from swh.storage.objstorage import ObjStorage + +DEFAULT_CONFIG = { + 'storage_base': ('str', '/tmp/swh/backup/'), + 'storage_depth': ('int', 3), + 'port': ('int', 8080), +} + + +class BytesRequest(Request): + """Request with proper escaping of arbitrary byte sequences.""" + encoding = 'utf-8' + encoding_errors = 'surrogateescape' + +app = Flask(__name__) +app.request_class = BytesRequest + + +def encode_data(data): + return Response( + msgpack_dumps(data), + mimetype='application/x-msgpack', + ) + + +def decode_request(request): + content_type = request.mimetype + data = request.get_data() + + if content_type == 'application/x-msgpack': + r = msgpack_loads(data) + elif content_type == 'application/json': + r = json.loads(data, cls=SWHJSONDecoder) + else: + raise ValueError('Wrong content type `%s` for API request' + % content_type) + + return r + + +@app.errorhandler(Exception) +def error_handler(exception): + # XXX: this breaks language-independence and should be + # replaced by proper serialization of errors + response = encode_data(pickle.dumps(exception)) + response.status_code = 400 + return response + + +@app.before_request +def before_request(): + g.objstorage = ObjStorage(app.config['storage_base'], + app.config['storage_depth']) + + +@app.route('/') +def index(): + return "Helloworld!" + + +@app.route('/content') +def content(): + return str(list(g.storage)) + + +@app.route('/content/add', methods=['POST']) +def add_bytes(): + return encode_data(g.objstorage.add_bytes(**decode_request(request))) + + +@app.route('/content/get', methods=['POST']) +def get_bytes(): + return encode_data(g.objstorage.get_bytes(**decode_request(request))) + + +@app.route('/content/check', methods=['POST']) +def check(): + # TODO verify that an error on this content will be properly intercepted + # by @app.errorhandler and the answer will be sent to client. + return encode_data(g.objstorage.check(**decode_request(request))) + + +if __name__ == '__main__': + import sys + + app.config.update(config.read(sys.argv[1], DEFAULT_CONFIG)) + + host = sys.argv[2] if len(sys.argv) >= 3 else '0.0.0.0' + app.run(host, port=app.config['port'], debug=True) diff --git a/swh/storage/objstorage.py b/swh/storage/objstorage/objstorage.py rename from swh/storage/objstorage.py rename to swh/storage/objstorage/objstorage.py --- a/swh/storage/objstorage.py +++ b/swh/storage/objstorage/objstorage.py @@ -10,7 +10,7 @@ from contextlib import contextmanager -from .exc import ObjNotFoundError, Error +from swh.storage.exc import ObjNotFoundError, Error from swh.core import hashutil diff --git a/swh/storage/tests/test_objstorage_api.py b/swh/storage/tests/test_objstorage_api.py new file mode 100644 --- /dev/null +++ b/swh/storage/tests/test_objstorage_api.py @@ -0,0 +1,128 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import multiprocessing +import socket +import time +import tempfile +import unittest + +from urllib.request import urlopen + +from nose.tools import istest +from nose.plugins.attrib import attr + +from swh.core import hashutil +from swh.storage.exc import ObjNotFoundError, Error +from swh.storage.objstorage.objstorage import _obj_path +from swh.storage.objstorage.api.client import RemoteArchive +from swh.storage.objstorage.api.server import app + + +@attr('!db') +class TestRemoteArchive(unittest.TestCase): + """ Test the remote archive API. + """ + + def setUp(self): + self.start_server() + self.objstorage = RemoteArchive(self.url()) + + def tearDown(self): + self.stop_server() + + def url(self): + return 'http://127.0.0.1:%d/' % self.port + + def start_server(self): + """ Spawn the API server using multiprocessing. + """ + self.process = None + + # WSGI app configuration + self.app = app + self.app.config['storage_base'] = tempfile.mkdtemp() + self.app.config['storage_depth'] = 3 + # Get an available port number + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.bind(('127.0.0.1', 0)) + self.port = sock.getsockname()[1] + sock.close() + + # Worker function for multiprocessing + def worker(app, port): + return app.run(port=port, use_reloader=False) + + self.process = multiprocessing.Process( + target=worker, args=(self.app, self.port) + ) + self.process.start() + + # Wait max 5 seconds for server to spawn + i = 0 + while i < 20: + try: + urlopen(self.url()) + except Exception: + i += 1 + time.sleep(0.25) + else: + return + + def stop_server(self): + """ Terminate the API server's process. + """ + if self.process: + self.process.terminate() + + @istest + def content_add(self): + content = bytes('Test content', 'utf8') + id = self.objstorage.content_add(content) + self.assertEquals(self.objstorage.content_get(id), content) + + @istest + def content_get_present(self): + content = bytes('content_get_present', 'utf8') + content_hash = hashutil.hashdata(content) + id = self.objstorage.content_add(content) + self.assertEquals(content_hash['sha1'], id) + + @istest + def content_get_missing(self): + content = bytes('content_get_missing', 'utf8') + content_hash = hashutil.hashdata(content) + with self.assertRaises(ObjNotFoundError): + self.objstorage.content_get(content_hash['sha1']) + + @istest + def content_check_invalid(self): + content = bytes('content_check_invalid', 'utf8') + id = self.objstorage.content_add(content) + path = _obj_path(hashutil.hash_to_hex(id), + self.app.config['storage_base'], + self.app.config['storage_depth']) + content = list(content) + with open(path, 'bw') as f: + content[0] = (content[0] + 1) % 128 + f.write(bytes(content)) + with self.assertRaises(Error): + self.objstorage.content_check(id) + + @istest + def content_check_valid(self): + content = bytes('content_check_valid', 'utf8') + id = self.objstorage.content_add(content) + try: + self.objstorage.content_check(id) + except: + self.fail('Integrity check failed') + + @istest + def content_check_missing(self): + content = bytes('content_check_valid', 'utf8') + content_hash = hashutil.hashdata(content) + with self.assertRaises(ObjNotFoundError): + self.objstorage.content_check(content_hash['sha1'])