diff --git a/swh/storage/api/client.py b/swh/storage/api/client.py --- a/swh/storage/api/client.py +++ b/swh/storage/api/client.py @@ -4,34 +4,13 @@ # See top-level LICENSE file for more information import pickle - import requests from requests.exceptions import ConnectionError -from swh.core.serializers import msgpack_dumps, msgpack_loads, SWHJSONDecoder -from swh.storage.exc import StorageAPIError - - -def encode_data(data): - try: - return msgpack_dumps(data) - except OverflowError as e: - raise ValueError('Limits were reached. Please, check your input.\n' + - str(e)) - - -def decode_response(response): - content_type = response.headers['content-type'] - - if content_type.startswith('application/x-msgpack'): - r = msgpack_loads(response.content) - elif content_type.startswith('application/json'): - r = response.json(cls=SWHJSONDecoder) - else: - raise ValueError('Wrong content type `%s` for API response' - % content_type) - return r +from ..exc import StorageAPIError +from ..api.common import (decode_response, + encode_data_client as encode_data) class RemoteStorage(): diff --git a/swh/storage/api/common.py b/swh/storage/api/common.py new file mode 100644 --- /dev/null +++ b/swh/storage/api/common.py @@ -0,0 +1,69 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import json +import pickle + +from flask import Request, Response + +from swh.core.serializers import msgpack_dumps, msgpack_loads, SWHJSONDecoder + + +class BytesRequest(Request): + """Request with proper escaping of arbitrary byte sequences.""" + encoding = 'utf-8' + encoding_errors = 'surrogateescape' + + +def encode_data_server(data): + return Response( + msgpack_dumps(data), + mimetype='application/x-msgpack', + ) + + +def encode_data_client(data): + try: + return msgpack_dumps(data) + except OverflowError as e: + raise ValueError('Limits were reached. Please, check your input.\n' + + str(e)) + + +def decode_request(request): + content_type = request.mimetype + data = request.get_data() + + if content_type == 'application/x-msgpack': + r = msgpack_loads(data) + elif content_type == 'application/json': + r = json.loads(data, cls=SWHJSONDecoder) + else: + raise ValueError('Wrong content type `%s` for API request' + % content_type) + + return r + + +def decode_response(response): + content_type = response.headers['content-type'] + + if content_type.startswith('application/x-msgpack'): + r = msgpack_loads(response.content) + elif content_type.startswith('application/json'): + r = response.json(cls=SWHJSONDecoder) + else: + raise ValueError('Wrong content type `%s` for API response' + % content_type) + + return r + + +def error_handler(exception, encoder): + # XXX: this breaks language-independence and should be + # replaced by proper serialization of errors + response = encoder(pickle.dumps(exception)) + response.status_code = 400 + return response diff --git a/swh/storage/api/server.py b/swh/storage/api/server.py --- a/swh/storage/api/server.py +++ b/swh/storage/api/server.py @@ -5,13 +5,13 @@ import json import logging -import pickle -from flask import Flask, Request, Response, g, request +from flask import Flask, g, request from swh.core import config -from swh.core.serializers import msgpack_dumps, msgpack_loads, SWHJSONDecoder -from swh.storage import Storage +from .. import Storage +from ..api.common import (BytesRequest, decode_request, error_handler, + encode_data_server as encode_data) DEFAULT_CONFIG = { 'db': ('str', 'dbname=softwareheritage-dev'), @@ -19,45 +19,13 @@ } -class BytesRequest(Request): - """Request with proper escaping of arbitrary byte sequences.""" - encoding = 'utf-8' - encoding_errors = 'surrogateescape' - - app = Flask(__name__) app.request_class = BytesRequest -def encode_data(data): - return Response( - msgpack_dumps(data), - mimetype='application/x-msgpack', - ) - - -def decode_request(request): - content_type = request.mimetype - data = request.get_data() - - if content_type == 'application/x-msgpack': - r = msgpack_loads(data) - elif content_type == 'application/json': - r = json.loads(data, cls=SWHJSONDecoder) - else: - raise ValueError('Wrong content type `%s` for API request' - % content_type) - - return r - - @app.errorhandler(Exception) -def error_handler(exception): - # XXX: this breaks language-independence and should be - # replaced by proper serialization of errors - response = encode_data(pickle.dumps(exception)) - response.status_code = 400 - return response +def my_error_handler(exception): + return error_handler(exception, encode_data) @app.before_request diff --git a/swh/storage/objstorage/__init__.py b/swh/storage/objstorage/__init__.py new file mode 100644 --- /dev/null +++ b/swh/storage/objstorage/__init__.py @@ -0,0 +1 @@ +from .objstorage import ObjStorage, DIR_MODE, FILE_MODE # NOQA diff --git a/swh/storage/objstorage/api/__init__.py b/swh/storage/objstorage/api/__init__.py new file mode 100644 diff --git a/swh/storage/objstorage/api/client.py b/swh/storage/objstorage/api/client.py new file mode 100644 --- /dev/null +++ b/swh/storage/objstorage/api/client.py @@ -0,0 +1,92 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +import pickle + +import requests + +from requests.exceptions import ConnectionError +from ...exc import StorageAPIError +from ...api.common import (decode_response, + encode_data_client as encode_data) + + +class RemoteObjStorage(): + """ Proxy to a remote object storage. + + This class allows to connect to an object storage server via + http protocol. + + Attributes: + base_url (string): The url of the server to connect. Must end + with a '/' + session: The session to send requests. + """ + def __init__(self, base_url): + self.base_url = base_url + self.session = requests.Session() + + def url(self, endpoint): + return '%s%s' % (self.base_url, endpoint) + + def post(self, endpoint, data): + try: + response = self.session.post( + self.url(endpoint), + data=encode_data(data), + headers={'content-type': 'application/x-msgpack'}, + ) + except ConnectionError as e: + print(str(e)) + raise StorageAPIError(e) + + # XXX: this breaks language-independence and should be + # replaced by proper unserialization + if response.status_code == 400: + raise pickle.loads(decode_response(response)) + + return decode_response(response) + + def content_add(self, bytes, obj_id=None): + """ Add a new object to the object storage. + + Args: + bytes: content of the object to be added to the storage. + obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When + given, obj_id will be trusted to match bytes. If missing, + obj_id will be computed on the fly. + + """ + return self.post('content/add', {'bytes': bytes, 'obj_id': obj_id}) + + def content_get(self, obj_id): + """ Retrieve the content of a given object. + + Args: + obj_id: The id of the object. + + Returns: + The content of the requested objects as bytes. + + Raises: + ObjNotFoundError: if the requested object is missing + """ + return self.post('content/get', {'obj_id': obj_id}) + + def content_check(self, obj_id): + """ Integrity check for a given object + + verify that the file object is in place, and that the gzipped content + matches the object id + + Args: + obj_id: The id of the object. + + Raises: + ObjNotFoundError: if the requested object is missing + Error: if the requested object is corrupt + """ + self.post('content/check', {'obj_id': obj_id}) diff --git a/swh/storage/objstorage/api/server.py b/swh/storage/objstorage/api/server.py new file mode 100644 --- /dev/null +++ b/swh/storage/objstorage/api/server.py @@ -0,0 +1,67 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +from flask import Flask, g, request + +from swh.core import config +from .. import ObjStorage +from ...api.common import (BytesRequest, decode_request, error_handler, + encode_data_server as encode_data) + +DEFAULT_CONFIG = { + 'storage_base': ('str', '/tmp/swh-storage/objects/'), + 'storage_depth': ('int', 3) +} + +app = Flask(__name__) +app.request_class = BytesRequest + + +@app.errorhandler(Exception) +def my_error_handler(exception): + return error_handler(exception, encode_data) + + +@app.before_request +def before_request(): + g.objstorage = ObjStorage(app.config['storage_base'], + app.config['storage_depth']) + + +@app.route('/') +def index(): + return "Helloworld!" + + +@app.route('/content') +def content(): + return str(list(g.storage)) + + +@app.route('/content/add', methods=['POST']) +def add_bytes(): + return encode_data(g.objstorage.add_bytes(**decode_request(request))) + + +@app.route('/content/get', methods=['POST']) +def get_bytes(): + return encode_data(g.objstorage.get_bytes(**decode_request(request))) + + +@app.route('/content/check', methods=['POST']) +def check(): + # TODO verify that an error on this content will be properly intercepted + # by @app.errorhandler and the answer will be sent to client. + return encode_data(g.objstorage.check(**decode_request(request))) + + +if __name__ == '__main__': + import sys + + app.config.update(config.read(sys.argv[1], DEFAULT_CONFIG)) + + host = sys.argv[2] if len(sys.argv) >= 3 else '0.0.0.0' + app.run(host, debug=True) diff --git a/swh/storage/objstorage.py b/swh/storage/objstorage/objstorage.py rename from swh/storage/objstorage.py rename to swh/storage/objstorage/objstorage.py --- a/swh/storage/objstorage.py +++ b/swh/storage/objstorage/objstorage.py @@ -10,7 +10,7 @@ from contextlib import contextmanager -from .exc import ObjNotFoundError, Error +from ..exc import ObjNotFoundError, Error from swh.core import hashutil diff --git a/swh/storage/tests/test_api_client.py b/swh/storage/tests/server_testing.py copy from swh/storage/tests/test_api_client.py copy to swh/storage/tests/server_testing.py --- a/swh/storage/tests/test_api_client.py +++ b/swh/storage/tests/server_testing.py @@ -6,52 +6,54 @@ import multiprocessing import socket import time -import unittest + from urllib.request import urlopen -from swh.storage.tests.test_storage import AbstractTestStorage -from swh.storage.api.client import RemoteStorage -from swh.storage.api.server import app +class ServerTestFixture(): + """ Base class for http client/server testing. + + Mix this in a test class in order to have access to an http flask + server running in background. + + Note that the subclass should define a dictionary in self.config + that contains the flask server config. + And a flask application in self.app that corresponds to the type of + server the tested client needs. -class TestRemoteStorage(AbstractTestStorage, unittest.TestCase): - """Test the remote storage API. + To ensure test isolation, each test will run in a different server + and a different repertory. - This class doesn't define any tests as we want identical - functionality between local and remote storage. All the tests are - therefore defined in AbstractTestStorage. + In order to correctly work, the subclass must call the parents class's + setUp() and tearDown() methods. """ def setUp(self): super().setUp() - self.start_server() - self.storage = RemoteStorage(self.url()) def tearDown(self): self.stop_server() - super().tearDown() def url(self): return 'http://127.0.0.1:%d/' % self.port def start_server(self): - """Spawn the API server using multiprocessing""" + """ Spawn the API server using multiprocessing. + """ self.process = None # WSGI app configuration - self.app = app - self.app.config['db'] = 'dbname=%s' % self.dbname - self.app.config['storage_base'] = self.objroot - + for key, value in self.config.items(): + self.app.config[key] = value # Get an available port number sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(('127.0.0.1', 0)) self.port = sock.getsockname()[1] sock.close() - # We need a worker function for multiprocessing + # Worker function for multiprocessing def worker(app, port): return app.run(port=port, use_reloader=False) @@ -60,7 +62,7 @@ ) self.process.start() - # Wait max. 5 seconds for server to spawn + # Wait max 5 seconds for server to spawn i = 0 while i < 20: try: @@ -69,9 +71,10 @@ i += 1 time.sleep(0.25) else: - break + return def stop_server(self): - """Terminate the API server""" + """ Terminate the API server's process. + """ if self.process: self.process.terminate() diff --git a/swh/storage/tests/test_api_client.py b/swh/storage/tests/test_api_client.py --- a/swh/storage/tests/test_api_client.py +++ b/swh/storage/tests/test_api_client.py @@ -3,18 +3,17 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import multiprocessing -import socket -import time import unittest -from urllib.request import urlopen +import tempfile from swh.storage.tests.test_storage import AbstractTestStorage +from swh.storage.tests.server_testing import ServerTestFixture from swh.storage.api.client import RemoteStorage from swh.storage.api.server import app -class TestRemoteStorage(AbstractTestStorage, unittest.TestCase): +class TestRemoteStorage(AbstractTestStorage, ServerTestFixture, + unittest.TestCase): """Test the remote storage API. This class doesn't define any tests as we want identical @@ -23,55 +22,15 @@ """ def setUp(self): + # ServerTestFixture needs to have self.objroot for + # setUp() method, but this field is defined in + # AbstractTestStorage's setUp() + # To avoid confusion, override the self.objroot to a + # one choosen in this class. + storage_base = tempfile.mkdtemp() + self.config = {'db': 'dbname=%s' % self.dbname, + 'storage_base': storage_base} + self.app = app super().setUp() - - self.start_server() self.storage = RemoteStorage(self.url()) - - def tearDown(self): - self.stop_server() - - super().tearDown() - - def url(self): - return 'http://127.0.0.1:%d/' % self.port - - def start_server(self): - """Spawn the API server using multiprocessing""" - self.process = None - - # WSGI app configuration - self.app = app - self.app.config['db'] = 'dbname=%s' % self.dbname - self.app.config['storage_base'] = self.objroot - - # Get an available port number - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.bind(('127.0.0.1', 0)) - self.port = sock.getsockname()[1] - sock.close() - - # We need a worker function for multiprocessing - def worker(app, port): - return app.run(port=port, use_reloader=False) - - self.process = multiprocessing.Process( - target=worker, args=(self.app, self.port) - ) - self.process.start() - - # Wait max. 5 seconds for server to spawn - i = 0 - while i < 20: - try: - urlopen(self.url()) - except Exception: - i += 1 - time.sleep(0.25) - else: - break - - def stop_server(self): - """Terminate the API server""" - if self.process: - self.process.terminate() + self.objroot = storage_base diff --git a/swh/storage/tests/test_objstorage_api.py b/swh/storage/tests/test_objstorage_api.py new file mode 100644 --- /dev/null +++ b/swh/storage/tests/test_objstorage_api.py @@ -0,0 +1,83 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import tempfile +import unittest + +from nose.tools import istest +from nose.plugins.attrib import attr + +from swh.core import hashutil +from swh.storage.exc import ObjNotFoundError, Error +from swh.storage.tests.server_testing import ServerTestFixture +from swh.storage.objstorage.objstorage import _obj_path +from swh.storage.objstorage.api.client import RemoteObjStorage +from swh.storage.objstorage.api.server import app + + +@attr('!db') +class TestRemoteObjStorage(ServerTestFixture, unittest.TestCase): + """ Test the remote archive API. + """ + + def setUp(self): + self.config = {'storage_base': tempfile.mkdtemp(), + 'storage_depth': 3} + self.app = app + super().setUp() + self.objstorage = RemoteObjStorage(self.url()) + + def tearDown(self): + super().tearDown() + + @istest + def content_add(self): + content = bytes('Test content', 'utf8') + id = self.objstorage.content_add(content) + self.assertEquals(self.objstorage.content_get(id), content) + + @istest + def content_get_present(self): + content = bytes('content_get_present', 'utf8') + content_hash = hashutil.hashdata(content) + id = self.objstorage.content_add(content) + self.assertEquals(content_hash['sha1'], id) + + @istest + def content_get_missing(self): + content = bytes('content_get_missing', 'utf8') + content_hash = hashutil.hashdata(content) + with self.assertRaises(ObjNotFoundError): + self.objstorage.content_get(content_hash['sha1']) + + @istest + def content_check_invalid(self): + content = bytes('content_check_invalid', 'utf8') + id = self.objstorage.content_add(content) + path = _obj_path(hashutil.hash_to_hex(id), + self.app.config['storage_base'], + self.app.config['storage_depth']) + content = list(content) + with open(path, 'bw') as f: + content[0] = (content[0] + 1) % 128 + f.write(bytes(content)) + with self.assertRaises(Error): + self.objstorage.content_check(id) + + @istest + def content_check_valid(self): + content = bytes('content_check_valid', 'utf8') + id = self.objstorage.content_add(content) + try: + self.objstorage.content_check(id) + except: + self.fail('Integrity check failed') + + @istest + def content_check_missing(self): + content = bytes('content_check_valid', 'utf8') + content_hash = hashutil.hashdata(content) + with self.assertRaises(ObjNotFoundError): + self.objstorage.content_check(content_hash['sha1'])