diff --git a/swh/storage/objstorage/api/client.py b/swh/storage/objstorage/api/client.py index 8f0308651..2daabeec0 100644 --- a/swh/storage/objstorage/api/client.py +++ b/swh/storage/objstorage/api/client.py @@ -1,92 +1,103 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pickle import requests from requests.exceptions import ConnectionError from ...exc import StorageAPIError from ...api.common import (decode_response, encode_data_client as encode_data) class RemoteObjStorage(): """ Proxy to a remote object storage. This class allows to connect to an object storage server via http protocol. Attributes: base_url (string): The url of the server to connect. Must end with a '/' session: The session to send requests. """ def __init__(self, base_url): self.base_url = base_url self.session = requests.Session() def url(self, endpoint): return '%s%s' % (self.base_url, endpoint) def post(self, endpoint, data): try: response = self.session.post( self.url(endpoint), data=encode_data(data), headers={'content-type': 'application/x-msgpack'}, ) except ConnectionError as e: print(str(e)) raise StorageAPIError(e) # XXX: this breaks language-independence and should be # replaced by proper unserialization if response.status_code == 400: raise pickle.loads(decode_response(response)) return decode_response(response) def content_add(self, bytes, obj_id=None): """ Add a new object to the object storage. Args: bytes: content of the object to be added to the storage. obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When given, obj_id will be trusted to match bytes. If missing, obj_id will be computed on the fly. """ return self.post('content/add', {'bytes': bytes, 'obj_id': obj_id}) def content_get(self, obj_id): """ Retrieve the content of a given object. Args: obj_id: The id of the object. Returns: The content of the requested objects as bytes. Raises: ObjNotFoundError: if the requested object is missing """ return self.post('content/get', {'obj_id': obj_id}) + def content_get_random(self, batch_size): + """ Retrieve a random sample of existing content. + + Args: + batch_size: Number of content requested. + + Returns: + A list of random ids that represents existing contents. + """ + return self.post('content/get/random', {'batch_size': batch_size}) + def content_check(self, obj_id): """ Integrity check for a given object verify that the file object is in place, and that the gzipped content matches the object id Args: obj_id: The id of the object. Raises: ObjNotFoundError: if the requested object is missing Error: if the requested object is corrupt """ self.post('content/check', {'obj_id': obj_id}) diff --git a/swh/storage/objstorage/api/server.py b/swh/storage/objstorage/api/server.py index cfb3d25da..16a672bf9 100644 --- a/swh/storage/objstorage/api/server.py +++ b/swh/storage/objstorage/api/server.py @@ -1,89 +1,96 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import logging from flask import Flask, g, request from swh.core import config from swh.storage.objstorage import ObjStorage from swh.storage.api.common import (BytesRequest, decode_request, error_handler, encode_data_server as encode_data) DEFAULT_CONFIG = { 'storage_base': ('str', '/tmp/swh-storage/objects/'), 'storage_depth': ('int', 3) } app = Flask(__name__) app.request_class = BytesRequest @app.errorhandler(Exception) def my_error_handler(exception): return error_handler(exception, encode_data) @app.before_request def before_request(): g.objstorage = ObjStorage(app.config['storage_base'], app.config['storage_depth']) @app.route('/') def index(): return "SWH Objstorage API server" @app.route('/content') def content(): return str(list(g.storage)) @app.route('/content/add', methods=['POST']) def add_bytes(): return encode_data(g.objstorage.add_bytes(**decode_request(request))) @app.route('/content/get', methods=['POST']) def get_bytes(): return encode_data(g.objstorage.get_bytes(**decode_request(request))) +@app.route('/content/get/random', methods=['POST']) +def get_random_contents(): + return encode_data( + g.objstorage.get_random_contents(**decode_request(request)) + ) + + @app.route('/content/check', methods=['POST']) def check(): return encode_data(g.objstorage.check(**decode_request(request))) def run_from_webserver(environ, start_response): """Run the WSGI app from the webserver, loading the configuration. """ config_path = '/etc/softwareheritage/storage/objstorage.ini' app.config.update(config.read(config_path, DEFAULT_CONFIG)) handler = logging.StreamHandler() app.logger.addHandler(handler) return app(environ, start_response) @click.command() @click.argument('config-path', required=1) @click.option('--host', default='0.0.0.0', help="Host to run the server") @click.option('--port', default=5000, type=click.INT, help="Binding port of the server") @click.option('--debug/--nodebug', default=True, help="Indicates if the server should run in debug mode") def launch(config_path, host, port, debug): app.config.update(config.read(config_path, DEFAULT_CONFIG)) app.run(host, port=int(port), debug=bool(debug)) if __name__ == '__main__': launch() diff --git a/swh/storage/tests/test_objstorage_api.py b/swh/storage/tests/test_objstorage_api.py index 284e1834b..b57bbd666 100644 --- a/swh/storage/tests/test_objstorage_api.py +++ b/swh/storage/tests/test_objstorage_api.py @@ -1,83 +1,94 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tempfile import unittest from nose.tools import istest from nose.plugins.attrib import attr from swh.core import hashutil from swh.storage.exc import ObjNotFoundError, Error from swh.storage.tests.server_testing import ServerTestFixture from swh.storage.objstorage.objstorage import _obj_path from swh.storage.objstorage.api.client import RemoteObjStorage from swh.storage.objstorage.api.server import app @attr('db') class TestRemoteObjStorage(ServerTestFixture, unittest.TestCase): """ Test the remote archive API. """ def setUp(self): self.config = {'storage_base': tempfile.mkdtemp(), 'storage_depth': 3} self.app = app super().setUp() self.objstorage = RemoteObjStorage(self.url()) def tearDown(self): super().tearDown() @istest def content_add(self): content = bytes('Test content', 'utf8') id = self.objstorage.content_add(content) self.assertEquals(self.objstorage.content_get(id), content) @istest def content_get_present(self): content = bytes('content_get_present', 'utf8') content_hash = hashutil.hashdata(content) id = self.objstorage.content_add(content) self.assertEquals(content_hash['sha1'], id) @istest def content_get_missing(self): content = bytes('content_get_missing', 'utf8') content_hash = hashutil.hashdata(content) with self.assertRaises(ObjNotFoundError): self.objstorage.content_get(content_hash['sha1']) + @istest + def content_get_random(self): + ids = [] + for i in range(100): + content = bytes('content_get_present', 'utf8') + id = self.objstorage.content_add(content) + ids.append(id) + for id in self.objstorage.content_get_random(50): + id = hashutil.hex_to_hash(id) + self.assertIn(id, ids) + @istest def content_check_invalid(self): content = bytes('content_check_invalid', 'utf8') id = self.objstorage.content_add(content) path = _obj_path(hashutil.hash_to_hex(id), self.app.config['storage_base'], self.app.config['storage_depth']) content = list(content) with open(path, 'bw') as f: content[0] = (content[0] + 1) % 128 f.write(bytes(content)) with self.assertRaises(Error): self.objstorage.content_check(id) @istest def content_check_valid(self): content = bytes('content_check_valid', 'utf8') id = self.objstorage.content_add(content) try: self.objstorage.content_check(id) except: self.fail('Integrity check failed') @istest def content_check_missing(self): content = bytes('content_check_valid', 'utf8') content_hash = hashutil.hashdata(content) with self.assertRaises(ObjNotFoundError): self.objstorage.content_check(content_hash['sha1'])