diff --git a/swh/objstorage/__init__.py b/swh/objstorage/__init__.py --- a/swh/objstorage/__init__.py +++ b/swh/objstorage/__init__.py @@ -1,4 +1,52 @@ from .objstorage import ObjStorage from .objstorage_pathslicing import PathSlicingObjStorage +from .api.client import RemoteObjStorage +from .multiplexer import MultiplexerObjStorage +from .multiplexer.filter import add_filters -__all__ = [ObjStorage, PathSlicingObjStorage] +# TODO remove PathSlicingObjStorage from this list once the config +# loading will be updated and no hardcoded objstorage types should +# remains. +__all__ = ['get_objstorage', 'ObjStorage', 'PathSlicingObjStorage'] + +_STORAGE_CLASSES = { + 'pathslicing': PathSlicingObjStorage, + 'remote': RemoteObjStorage, +} + + +def get_objstorage(storage_class, storage_args): + """ Create an ObjStorage using the given implementation class. + + Args: + storage_class (str): objstorage class unique key contained in the + _STORAGE_CLASSES dict. + storage_args (dict): arguments for the required class of objstorage + that must match exactly the one in the `__init__` method of the + class. + Returns: + subclass of ObjStorage that match the given `storage_class` argument. + Raises: + ValueError: if the given storage class is not a valid objstorage + key. + """ + try: + return _STORAGE_CLASSES[storage_class](**storage_args) + except KeyError: + raise ValueError('Storage class %s does not exists' % storage_class) + + +def _construct_filtered_objstorage(filtered_storage_args): + storage_conf = filtered_storage_args['storage'] + filters_conf = filtered_storage_args['filters_conf'] + return add_filters( + get_objstorage(storage_conf['class'], storage_conf['args']), + filters_conf + ) +_STORAGE_CLASSES['filtered'] = _construct_filtered_objstorage + + +def _construct_multiplexer_objstorage(multiplexer_objstorage_args): + storages = [get_objstorage(conf) for conf in multiplexer_objstorage_args] + return MultiplexerObjStorage(storages) +_STORAGE_CLASSES['multiplexer'] = _construct_multiplexer_objstorage diff --git a/swh/objstorage/api/client.py b/swh/objstorage/api/client.py --- a/swh/objstorage/api/client.py +++ b/swh/objstorage/api/client.py @@ -9,12 +9,13 @@ import requests from requests.exceptions import ConnectionError +from ..objstorage import ObjStorage from ..exc import ObjStorageAPIError from .common import (decode_response, encode_data_client as encode_data) -class RemoteObjStorage(): +class RemoteObjStorage(ObjStorage): """ Proxy to a remote object storage. This class allows to connect to an object storage server via @@ -50,54 +51,96 @@ return decode_response(response) - def content_add(self, bytes, obj_id=None): + def __contains__(self, obj_id): + return self.post('content/contains', {'obj_id': obj_id}) + + def add(self, content, obj_id=None, check_presence=True): """ Add a new object to the object storage. Args: - bytes: content of the object to be added to the storage. + content: content of the object to be added to the storage. + obj_id: checksum of [bytes] using [ID_HASH_ALGO] algorithm. When + given, obj_id will be trusted to match the bytes. If missing, + obj_id will be computed on the fly. + check_presence: indicate if the presence of the content should be + verified before adding the file. + + Returns: + the id of the object into the storage. + """ + return self.post('content/add', {'bytes': content, 'obj_id': obj_id, + 'check_presence': check_presence}) + + def restore(self, content, obj_id=None): + """ Restore a content that have been corrupted. + + This function is identical to add_bytes but does not check if + the object id is already in the file system. + + Args: + content: content of the object to be added to the storage obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When given, obj_id will be trusted to match bytes. If missing, obj_id will be computed on the fly. - """ - return self.post('content/add', {'bytes': bytes, 'obj_id': obj_id}) + return self.add(content, obj_id, check_presence=False) - def content_get(self, obj_id): + def get(self, obj_id): """ Retrieve the content of a given object. Args: - obj_id: The id of the object. + obj_id: object id. Returns: - The content of the requested objects as bytes. + the content of the requested object as bytes. Raises: - ObjNotFoundError: if the requested object is missing + ObjNotFoundError: if the requested object is missing. """ return self.post('content/get', {'obj_id': obj_id}) - def content_get_random(self, batch_size): - """ Retrieve a random sample of existing content. + def get_batch(self, obj_ids): + """ Retrieve content in bulk. + + Note: This function does have a default implementation in ObjStorage + that is suitable for most cases. Args: - batch_size: Number of content requested. + obj_ids: list of object ids. Returns: - A list of random ids that represents existing contents. + list of resulting contents, or None if the content could not + be retrieved. Do not raise any exception as a fail for one content + will not cancel the whole request. """ - return self.post('content/get/random', {'batch_size': batch_size}) + return self.post('content/get/batch', {'obj_ids': obj_ids}) - def content_check(self, obj_id): - """ Integrity check for a given object + def check(self, obj_id): + """ Perform an integrity check for a given object. - verify that the file object is in place, and that the gzipped content - matches the object id + Verify that the file object is in place and that the gziped content + matches the object id. Args: - obj_id: The id of the object. + obj_id: object id. Raises: - ObjNotFoundError: if the requested object is missing - Error: if the requested object is corrupt + ObjNotFoundError: if the requested object is missing. + Error: if the request object is corrupted. """ self.post('content/check', {'obj_id': obj_id}) + + def get_random(self, batch_size): + """ Get random ids of existing contents + + This method is used in order to get random ids to perform + content integrity verifications on random contents. + + Attributes: + batch_size (int): Number of ids that will be given + + Yields: + An iterable of ids of contents that are in the current object + storage. + """ + return self.post('content/get/random', {'batch_size': batch_size}) diff --git a/swh/objstorage/api/server.py b/swh/objstorage/api/server.py --- a/swh/objstorage/api/server.py +++ b/swh/objstorage/api/server.py @@ -45,6 +45,11 @@ return str(list(g.storage)) +@app.route('/content/contains', methods=['POST']) +def contains(): + return encode_data(g.objstorage.__contains__(**decode_request(request))) + + @app.route('/content/add', methods=['POST']) def add_bytes(): return encode_data(g.objstorage.add(**decode_request(request))) @@ -55,6 +60,11 @@ return encode_data(g.objstorage.get(**decode_request(request))) +@app.route('/content/get/batch', methods=['POST']) +def get_batch(): + return encode_data(g.objstorage.get_batch(**decode_request(request))) + + @app.route('/content/get/random', methods=['POST']) def get_random_contents(): return encode_data( diff --git a/swh/objstorage/objstorage.py b/swh/objstorage/objstorage.py --- a/swh/objstorage/objstorage.py +++ b/swh/objstorage/objstorage.py @@ -3,6 +3,8 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from .exc import ObjNotFoundError + ID_HASH_ALGO = 'sha1' ID_HASH_LENGTH = 40 # Size in bytes of the hash hexadecimal representation. @@ -83,6 +85,26 @@ "Implementations of ObjStorage must have a 'get' method" ) + def get_batch(self, obj_ids, *args, **kwargs): + """ Retrieve content in bulk. + + Note: This function does have a default implementation in ObjStorage + that is suitable for most cases. + + Args: + obj_ids: list of object ids. + + Returns: + list of resulting contents, or None if the content could not + be retrieved. Do not raise any exception as a fail for one content + will not cancel the whole request. + """ + for obj_id in obj_ids: + try: + yield self.get(obj_id) + except ObjNotFoundError: + yield None + def check(self, obj_id, *args, **kwargs): """ Perform an integrity check for a given object. diff --git a/swh/objstorage/tests/objstorage_testing.py b/swh/objstorage/tests/objstorage_testing.py --- a/swh/objstorage/tests/objstorage_testing.py +++ b/swh/objstorage/tests/objstorage_testing.py @@ -37,6 +37,23 @@ self.assertContentMatch(obj_id, content) @istest + def add_get_batch(self): + content1, obj_id1 = self.hash_content(b'add_get_batch_1') + content2, obj_id2 = self.hash_content(b'add_get_batch_2') + self.storage.add(content1, obj_id1) + self.storage.add(content2, obj_id2) + cr1, cr2 = self.storage.get_batch([obj_id1, obj_id2]) + self.assertEqual(cr1, content1) + self.assertEqual(cr2, content2) + + @istest + def get_batch_unexisting_content(self): + content, obj_id = self.hash_content(b'get_batch_unexisting_content') + result = list(self.storage.get_batch([obj_id])) + self.assertTrue(len(result) == 1) + self.assertIsNone(result[0]) + + @istest def restore_content(self): valid_content, valid_obj_id = self.hash_content(b'restore_content') invalid_content = b'unexpected content' diff --git a/swh/objstorage/tests/test_objstorage_api.py b/swh/objstorage/tests/test_objstorage_api.py --- a/swh/objstorage/tests/test_objstorage_api.py +++ b/swh/objstorage/tests/test_objstorage_api.py @@ -6,18 +6,17 @@ import tempfile import unittest -from nose.tools import istest from nose.plugins.attrib import attr -from swh.core import hashutil -from swh.objstorage.exc import ObjNotFoundError, Error +from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture from swh.objstorage.tests.server_testing import ServerTestFixture from swh.objstorage.api.client import RemoteObjStorage from swh.objstorage.api.server import app @attr('db') -class TestRemoteObjStorage(ServerTestFixture, unittest.TestCase): +class TestRemoteObjStorage(ServerTestFixture, ObjStorageTestFixture, + unittest.TestCase): """ Test the remote archive API. """ @@ -26,63 +25,4 @@ 'storage_slicing': '0:1/0:5'} self.app = app super().setUp() - self.objstorage = RemoteObjStorage(self.url()) - - def tearDown(self): - super().tearDown() - - @istest - def content_add(self): - content = bytes('Test content', 'utf8') - id = self.objstorage.content_add(content) - self.assertEquals(self.objstorage.content_get(id), content) - - @istest - def content_get_present(self): - content = bytes('content_get_present', 'utf8') - content_hash = hashutil.hashdata(content) - id = self.objstorage.content_add(content) - self.assertEquals(content_hash['sha1'], id) - - @istest - def content_get_missing(self): - content = bytes('content_get_missing', 'utf8') - content_hash = hashutil.hashdata(content) - with self.assertRaises(ObjNotFoundError): - self.objstorage.content_get(content_hash['sha1']) - - @istest - def content_get_random(self): - ids = [] - for i in range(100): - content = bytes('content_get_present', 'utf8') - id = self.objstorage.content_add(content) - ids.append(id) - for id in self.objstorage.content_get_random(50): - self.assertIn(id, ids) - - @istest - def content_check_invalid(self): - content = bytes('content_check_invalid', 'utf8') - invalid_id = hashutil.hashdata(b'invalid content')['sha1'] - # Add the content with an invalid id. - self.objstorage.content_add(content, invalid_id) - # Then check it and expect an error. - with self.assertRaises(Error): - self.objstorage.content_check(invalid_id) - - @istest - def content_check_valid(self): - content = bytes('content_check_valid', 'utf8') - id = self.objstorage.content_add(content) - try: - self.objstorage.content_check(id) - except: - self.fail('Integrity check failed') - - @istest - def content_check_missing(self): - content = bytes('content_check_valid', 'utf8') - content_hash = hashutil.hashdata(content) - with self.assertRaises(ObjNotFoundError): - self.objstorage.content_check(content_hash['sha1']) + self.storage = RemoteObjStorage(self.url())