diff --git a/swh/objstorage/api/client.py b/swh/objstorage/api/client.py index 733458c..8f84336 100644 --- a/swh/objstorage/api/client.py +++ b/swh/objstorage/api/client.py @@ -1,103 +1,146 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pickle import requests from requests.exceptions import ConnectionError +from ..objstorage import ObjStorage from ..exc import ObjStorageAPIError from .common import (decode_response, encode_data_client as encode_data) -class RemoteObjStorage(): +class RemoteObjStorage(ObjStorage): """ Proxy to a remote object storage. This class allows to connect to an object storage server via http protocol. Attributes: base_url (string): The url of the server to connect. Must end with a '/' session: The session to send requests. """ def __init__(self, base_url): self.base_url = base_url self.session = requests.Session() def url(self, endpoint): return '%s%s' % (self.base_url, endpoint) def post(self, endpoint, data): try: response = self.session.post( self.url(endpoint), data=encode_data(data), headers={'content-type': 'application/x-msgpack'}, ) except ConnectionError as e: print(str(e)) raise ObjStorageAPIError(e) # XXX: this breaks language-independence and should be # replaced by proper unserialization if response.status_code == 400: raise pickle.loads(decode_response(response)) return decode_response(response) - def content_add(self, bytes, obj_id=None): + def __contains__(self, obj_id): + return self.post('content/contains', {'obj_id': obj_id}) + + def add(self, content, obj_id=None, check_presence=True): """ Add a new object to the object storage. Args: - bytes: content of the object to be added to the storage. + content: content of the object to be added to the storage. + obj_id: checksum of [bytes] using [ID_HASH_ALGO] algorithm. When + given, obj_id will be trusted to match the bytes. If missing, + obj_id will be computed on the fly. + check_presence: indicate if the presence of the content should be + verified before adding the file. + + Returns: + the id of the object into the storage. + """ + return self.post('content/add', {'bytes': content, 'obj_id': obj_id, + 'check_presence': check_presence}) + + def restore(self, content, obj_id=None): + """ Restore a content that have been corrupted. + + This function is identical to add_bytes but does not check if + the object id is already in the file system. + + Args: + content: content of the object to be added to the storage obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When given, obj_id will be trusted to match bytes. If missing, obj_id will be computed on the fly. - """ - return self.post('content/add', {'bytes': bytes, 'obj_id': obj_id}) + return self.add(content, obj_id, check_presence=False) - def content_get(self, obj_id): + def get(self, obj_id): """ Retrieve the content of a given object. Args: - obj_id: The id of the object. + obj_id: object id. Returns: - The content of the requested objects as bytes. + the content of the requested object as bytes. Raises: - ObjNotFoundError: if the requested object is missing + ObjNotFoundError: if the requested object is missing. """ return self.post('content/get', {'obj_id': obj_id}) - def content_get_random(self, batch_size): - """ Retrieve a random sample of existing content. + def get_batch(self, obj_ids): + """ Retrieve content in bulk. + + Note: This function does have a default implementation in ObjStorage + that is suitable for most cases. Args: - batch_size: Number of content requested. + obj_ids: list of object ids. Returns: - A list of random ids that represents existing contents. + list of resulting contents, or None if the content could not + be retrieved. Do not raise any exception as a fail for one content + will not cancel the whole request. """ - return self.post('content/get/random', {'batch_size': batch_size}) + return self.post('content/get/batch', {'obj_ids': obj_ids}) - def content_check(self, obj_id): - """ Integrity check for a given object + def check(self, obj_id): + """ Perform an integrity check for a given object. - verify that the file object is in place, and that the gzipped content - matches the object id + Verify that the file object is in place and that the gziped content + matches the object id. Args: - obj_id: The id of the object. + obj_id: object id. Raises: - ObjNotFoundError: if the requested object is missing - Error: if the requested object is corrupt + ObjNotFoundError: if the requested object is missing. + Error: if the request object is corrupted. """ self.post('content/check', {'obj_id': obj_id}) + + def get_random(self, batch_size): + """ Get random ids of existing contents + + This method is used in order to get random ids to perform + content integrity verifications on random contents. + + Attributes: + batch_size (int): Number of ids that will be given + + Yields: + An iterable of ids of contents that are in the current object + storage. + """ + return self.post('content/get/random', {'batch_size': batch_size}) diff --git a/swh/objstorage/api/server.py b/swh/objstorage/api/server.py index 120145e..764471f 100644 --- a/swh/objstorage/api/server.py +++ b/swh/objstorage/api/server.py @@ -1,97 +1,107 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import logging from flask import Flask, g, request from swh.core import config from swh.objstorage import PathSlicingObjStorage from swh.objstorage.api.common import (BytesRequest, decode_request, error_handler, encode_data_server as encode_data) DEFAULT_CONFIG = { 'storage_base': ('str', '/tmp/swh-storage/objects/'), 'storage_slicing': ('str', '0:2/2:4/4:6') } app = Flask(__name__) app.request_class = BytesRequest @app.errorhandler(Exception) def my_error_handler(exception): return error_handler(exception, encode_data) @app.before_request def before_request(): g.objstorage = PathSlicingObjStorage(app.config['storage_base'], app.config['storage_slicing']) @app.route('/') def index(): return "SWH Objstorage API server" @app.route('/content') def content(): return str(list(g.storage)) +@app.route('/content/contains', methods=['POST']) +def contains(): + return encode_data(g.objstorage.__contains__(**decode_request(request))) + + @app.route('/content/add', methods=['POST']) def add_bytes(): return encode_data(g.objstorage.add(**decode_request(request))) @app.route('/content/get', methods=['POST']) def get_bytes(): return encode_data(g.objstorage.get(**decode_request(request))) +@app.route('/content/get/batch', methods=['POST']) +def get_batch(): + return encode_data(g.objstorage.get_batch(**decode_request(request))) + + @app.route('/content/get/random', methods=['POST']) def get_random_contents(): return encode_data( g.objstorage.get_random(**decode_request(request)) ) @app.route('/content/check', methods=['POST']) def check(): return encode_data(g.objstorage.check(**decode_request(request))) def run_from_webserver(environ, start_response): """Run the WSGI app from the webserver, loading the configuration. """ config_path = '/etc/softwareheritage/storage/objstorage.ini' app.config.update(config.read(config_path, DEFAULT_CONFIG)) handler = logging.StreamHandler() app.logger.addHandler(handler) return app(environ, start_response) @click.command() @click.argument('config-path', required=1) @click.option('--host', default='0.0.0.0', help="Host to run the server") @click.option('--port', default=5000, type=click.INT, help="Binding port of the server") @click.option('--debug/--nodebug', default=True, help="Indicates if the server should run in debug mode") def launch(config_path, host, port, debug): app.config.update(config.read(config_path, DEFAULT_CONFIG)) app.run(host, port=int(port), debug=bool(debug)) if __name__ == '__main__': launch() diff --git a/swh/objstorage/tests/test_objstorage_api.py b/swh/objstorage/tests/test_objstorage_api.py index 4fef81b..6d27856 100644 --- a/swh/objstorage/tests/test_objstorage_api.py +++ b/swh/objstorage/tests/test_objstorage_api.py @@ -1,88 +1,28 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tempfile import unittest -from nose.tools import istest from nose.plugins.attrib import attr -from swh.core import hashutil -from swh.objstorage.exc import ObjNotFoundError, Error +from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture from swh.objstorage.tests.server_testing import ServerTestFixture from swh.objstorage.api.client import RemoteObjStorage from swh.objstorage.api.server import app @attr('db') -class TestRemoteObjStorage(ServerTestFixture, unittest.TestCase): +class TestRemoteObjStorage(ServerTestFixture, ObjStorageTestFixture, + unittest.TestCase): """ Test the remote archive API. """ def setUp(self): self.config = {'storage_base': tempfile.mkdtemp(), 'storage_slicing': '0:1/0:5'} self.app = app super().setUp() - self.objstorage = RemoteObjStorage(self.url()) - - def tearDown(self): - super().tearDown() - - @istest - def content_add(self): - content = bytes('Test content', 'utf8') - id = self.objstorage.content_add(content) - self.assertEquals(self.objstorage.content_get(id), content) - - @istest - def content_get_present(self): - content = bytes('content_get_present', 'utf8') - content_hash = hashutil.hashdata(content) - id = self.objstorage.content_add(content) - self.assertEquals(content_hash['sha1'], id) - - @istest - def content_get_missing(self): - content = bytes('content_get_missing', 'utf8') - content_hash = hashutil.hashdata(content) - with self.assertRaises(ObjNotFoundError): - self.objstorage.content_get(content_hash['sha1']) - - @istest - def content_get_random(self): - ids = [] - for i in range(100): - content = bytes('content_get_present', 'utf8') - id = self.objstorage.content_add(content) - ids.append(id) - for id in self.objstorage.content_get_random(50): - self.assertIn(id, ids) - - @istest - def content_check_invalid(self): - content = bytes('content_check_invalid', 'utf8') - invalid_id = hashutil.hashdata(b'invalid content')['sha1'] - # Add the content with an invalid id. - self.objstorage.content_add(content, invalid_id) - # Then check it and expect an error. - with self.assertRaises(Error): - self.objstorage.content_check(invalid_id) - - @istest - def content_check_valid(self): - content = bytes('content_check_valid', 'utf8') - id = self.objstorage.content_add(content) - try: - self.objstorage.content_check(id) - except: - self.fail('Integrity check failed') - - @istest - def content_check_missing(self): - content = bytes('content_check_valid', 'utf8') - content_hash = hashutil.hashdata(content) - with self.assertRaises(ObjNotFoundError): - self.objstorage.content_check(content_hash['sha1']) + self.storage = RemoteObjStorage(self.url())