diff --git a/swh/objstorage/__init__.py b/swh/objstorage/__init__.py --- a/swh/objstorage/__init__.py +++ b/swh/objstorage/__init__.py @@ -1,4 +1,51 @@ from .objstorage import ObjStorage from .objstorage_pathslicing import PathSlicingObjStorage +from .api.client import RemoteObjStorage +from .multiplexer import MultiplexerObjStorage +from .multiplexer.filter import add_filters -__all__ = [ObjStorage, PathSlicingObjStorage] +# TODO remove PathSlicingObjStorage from this list once the config +# loading will be updated and no hardcoded objstorage types should +# remains. +__all__ = ['get_objstorage', 'ObjStorage', 'PathSlicingObjStorage'] + +_STORAGE_CLASSES = { + 'pathslicing': PathSlicingObjStorage, + 'remote': RemoteObjStorage, +} + + +def get_objstorage(cls, args): + """ Create an ObjStorage using the given implementation class. + + Args: + cls (str): objstorage class unique key contained in the + _STORAGE_CLASSES dict. + args (dict): arguments for the required class of objstorage + that must match exactly the one in the `__init__` method of the + class. + Returns: + subclass of ObjStorage that match the given `storage_class` argument. + Raises: + ValueError: if the given storage class is not a valid objstorage + key. + """ + try: + return _STORAGE_CLASSES[cls](**args) + except KeyError: + raise ValueError('Storage class %s does not exists' % cls) + + +def _construct_filtered_objstorage(storage_conf, filters_conf): + return add_filters( + get_objstorage(**storage_conf), + filters_conf + ) +_STORAGE_CLASSES['filtered'] = _construct_filtered_objstorage + + +def _construct_multiplexer_objstorage(objstorages): + storages = [get_objstorage(**conf) + for conf in objstorages] + return MultiplexerObjStorage(storages) +_STORAGE_CLASSES['multiplexer'] = _construct_multiplexer_objstorage diff --git a/swh/objstorage/api/client.py b/swh/objstorage/api/client.py --- a/swh/objstorage/api/client.py +++ b/swh/objstorage/api/client.py @@ -9,12 +9,13 @@ import requests from requests.exceptions import ConnectionError +from ..objstorage import ObjStorage from ..exc import ObjStorageAPIError from .common import (decode_response, encode_data_client as encode_data) -class RemoteObjStorage(): +class RemoteObjStorage(ObjStorage): """ Proxy to a remote object storage. This class allows to connect to an object storage server via @@ -50,54 +51,96 @@ return decode_response(response) - def content_add(self, bytes, obj_id=None): + def __contains__(self, obj_id): + return self.post('content/contains', {'obj_id': obj_id}) + + def add(self, content, obj_id=None, check_presence=True): """ Add a new object to the object storage. Args: - bytes: content of the object to be added to the storage. + content: content of the object to be added to the storage. + obj_id: checksum of [bytes] using [ID_HASH_ALGO] algorithm. When + given, obj_id will be trusted to match the bytes. If missing, + obj_id will be computed on the fly. + check_presence: indicate if the presence of the content should be + verified before adding the file. + + Returns: + the id of the object into the storage. + """ + return self.post('content/add', {'bytes': content, 'obj_id': obj_id, + 'check_presence': check_presence}) + + def restore(self, content, obj_id=None): + """ Restore a content that have been corrupted. + + This function is identical to add_bytes but does not check if + the object id is already in the file system. + + Args: + content: content of the object to be added to the storage obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When given, obj_id will be trusted to match bytes. If missing, obj_id will be computed on the fly. - """ - return self.post('content/add', {'bytes': bytes, 'obj_id': obj_id}) + return self.add(content, obj_id, check_presence=False) - def content_get(self, obj_id): + def get(self, obj_id): """ Retrieve the content of a given object. Args: - obj_id: The id of the object. + obj_id: object id. Returns: - The content of the requested objects as bytes. + the content of the requested object as bytes. Raises: - ObjNotFoundError: if the requested object is missing + ObjNotFoundError: if the requested object is missing. """ return self.post('content/get', {'obj_id': obj_id}) - def content_get_random(self, batch_size): - """ Retrieve a random sample of existing content. + def get_batch(self, obj_ids): + """ Retrieve content in bulk. + + Note: This function does have a default implementation in ObjStorage + that is suitable for most cases. Args: - batch_size: Number of content requested. + obj_ids: list of object ids. Returns: - A list of random ids that represents existing contents. + list of resulting contents, or None if the content could not + be retrieved. Do not raise any exception as a fail for one content + will not cancel the whole request. """ - return self.post('content/get/random', {'batch_size': batch_size}) + return self.post('content/get/batch', {'obj_ids': obj_ids}) - def content_check(self, obj_id): - """ Integrity check for a given object + def check(self, obj_id): + """ Perform an integrity check for a given object. - verify that the file object is in place, and that the gzipped content - matches the object id + Verify that the file object is in place and that the gziped content + matches the object id. Args: - obj_id: The id of the object. + obj_id: object id. Raises: - ObjNotFoundError: if the requested object is missing - Error: if the requested object is corrupt + ObjNotFoundError: if the requested object is missing. + Error: if the request object is corrupted. """ self.post('content/check', {'obj_id': obj_id}) + + def get_random(self, batch_size): + """ Get random ids of existing contents + + This method is used in order to get random ids to perform + content integrity verifications on random contents. + + Attributes: + batch_size (int): Number of ids that will be given + + Yields: + An iterable of ids of contents that are in the current object + storage. + """ + return self.post('content/get/random', {'batch_size': batch_size}) diff --git a/swh/objstorage/api/server.py b/swh/objstorage/api/server.py --- a/swh/objstorage/api/server.py +++ b/swh/objstorage/api/server.py @@ -45,6 +45,11 @@ return str(list(g.storage)) +@app.route('/content/contains', methods=['POST']) +def contains(): + return encode_data(g.objstorage.__contains__(**decode_request(request))) + + @app.route('/content/add', methods=['POST']) def add_bytes(): return encode_data(g.objstorage.add(**decode_request(request))) @@ -55,6 +60,11 @@ return encode_data(g.objstorage.get(**decode_request(request))) +@app.route('/content/get/batch', methods=['POST']) +def get_batch(): + return encode_data(g.objstorage.get_batch(**decode_request(request))) + + @app.route('/content/get/random', methods=['POST']) def get_random_contents(): return encode_data( diff --git a/swh/objstorage/multiplexer/filter/__init__.py b/swh/objstorage/multiplexer/filter/__init__.py --- a/swh/objstorage/multiplexer/filter/__init__.py +++ b/swh/objstorage/multiplexer/filter/__init__.py @@ -30,7 +30,7 @@ return {'type': 'regex', 'regex': regex} -def _filter_priority(self, filter_type): +def _filter_priority(filter_type): """ Get the priority of this filter. Priority is a value that indicates if the operation of the @@ -69,7 +69,7 @@ return filter -def add_filters(storage, *filter_confs): +def add_filters(storage, filter_confs): """ Add multiple filters to the given storage. (See filter.add_filter) diff --git a/swh/objstorage/objstorage.py b/swh/objstorage/objstorage.py --- a/swh/objstorage/objstorage.py +++ b/swh/objstorage/objstorage.py @@ -3,6 +3,8 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from .exc import ObjNotFoundError + ID_HASH_ALGO = 'sha1' ID_HASH_LENGTH = 40 # Size in bytes of the hash hexadecimal representation. @@ -83,6 +85,26 @@ "Implementations of ObjStorage must have a 'get' method" ) + def get_batch(self, obj_ids, *args, **kwargs): + """ Retrieve content in bulk. + + Note: This function does have a default implementation in ObjStorage + that is suitable for most cases. + + Args: + obj_ids: list of object ids. + + Returns: + list of resulting contents, or None if the content could not + be retrieved. Do not raise any exception as a fail for one content + will not cancel the whole request. + """ + for obj_id in obj_ids: + try: + yield self.get(obj_id) + except ObjNotFoundError: + yield None + def check(self, obj_id, *args, **kwargs): """ Perform an integrity check for a given object. diff --git a/swh/objstorage/tests/objstorage_testing.py b/swh/objstorage/tests/objstorage_testing.py --- a/swh/objstorage/tests/objstorage_testing.py +++ b/swh/objstorage/tests/objstorage_testing.py @@ -37,6 +37,23 @@ self.assertContentMatch(obj_id, content) @istest + def add_get_batch(self): + content1, obj_id1 = self.hash_content(b'add_get_batch_1') + content2, obj_id2 = self.hash_content(b'add_get_batch_2') + self.storage.add(content1, obj_id1) + self.storage.add(content2, obj_id2) + cr1, cr2 = self.storage.get_batch([obj_id1, obj_id2]) + self.assertEqual(cr1, content1) + self.assertEqual(cr2, content2) + + @istest + def get_batch_unexisting_content(self): + content, obj_id = self.hash_content(b'get_batch_unexisting_content') + result = list(self.storage.get_batch([obj_id])) + self.assertTrue(len(result) == 1) + self.assertIsNone(result[0]) + + @istest def restore_content(self): valid_content, valid_obj_id = self.hash_content(b'restore_content') invalid_content = b'unexpected content' diff --git a/swh/objstorage/tests/test_multiplexer_filter.py b/swh/objstorage/tests/test_multiplexer_filter.py --- a/swh/objstorage/tests/test_multiplexer_filter.py +++ b/swh/objstorage/tests/test_multiplexer_filter.py @@ -3,8 +3,9 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import random +import tempfile import unittest +import random from string import ascii_lowercase @@ -13,85 +14,39 @@ from swh.core import hashutil from swh.objstorage.exc import ObjNotFoundError, Error -from swh.objstorage import ObjStorage -from swh.objstorage.multiplexer.filter import (add_filter, read_only, - id_prefix, id_regex) +from swh.objstorage import get_objstorage +from swh.objstorage.multiplexer.filter import read_only, id_prefix, id_regex def get_random_content(): return bytes(''.join(random.sample(ascii_lowercase, 10)), 'utf8') -class MockObjStorage(ObjStorage): - """ Mock an object storage for testing the filters. - """ - def __init__(self): - self.objects = {} - - def __contains__(self, obj_id): - return obj_id in self.objects - - def __len__(self): - return len(self.objects) - - def __iter__(self): - return iter(self.objects) - - def id(self, content): - # Id is the content itself for easily choose the id of - # a content for filtering. - return hashutil.hashdata(content)['sha1'] - - def add(self, content, obj_id=None, check_presence=True): - if obj_id is None: - obj_id = self.id(content) - - if check_presence and obj_id in self.objects: - return obj_id - - self.objects[obj_id] = content - return obj_id - - def restore(self, content, obj_id=None): - return self.add(content, obj_id, check_presence=False) - - def get(self, obj_id): - if obj_id not in self: - raise ObjNotFoundError(obj_id) - return self.objects[obj_id] - - def check(self, obj_id): - if obj_id not in self: - raise ObjNotFoundError(obj_id) - if obj_id != self.id(self.objects[obj_id]): - raise Error(obj_id) - - def get_random(self, batch_size): - batch_size = min(len(self), batch_size) - return random.sample(list(self.objects), batch_size) - - @attr('!db') class MixinTestReadFilter(unittest.TestCase): # Read only filter should not allow writing def setUp(self): super().setUp() - storage = MockObjStorage() - + pstorage = {'cls': 'pathslicing', + 'args': {'root': tempfile.mkdtemp(), + 'slicing': '0:5'}} + base_storage = get_objstorage(**pstorage) + base_storage.id = lambda cont: hashutil.hashdata(cont)['sha1'] + self.storage = get_objstorage('filtered', + {'storage_conf': pstorage, + 'filters_conf': [read_only()]}) self.valid_content = b'pre-existing content' self.invalid_content = b'invalid_content' self.true_invalid_content = b'Anything that is not correct' self.absent_content = b'non-existent content' # Create a valid content. - self.valid_id = storage.add(self.valid_content) + self.valid_id = base_storage.add(self.valid_content) # Create an invalid id and add a content with it. - self.invalid_id = storage.id(self.true_invalid_content) - storage.add(self.invalid_content, obj_id=self.invalid_id) + self.invalid_id = base_storage.id(self.true_invalid_content) + base_storage.add(self.invalid_content, obj_id=self.invalid_id) # Compute an id for a non-existing content. - self.absent_id = storage.id(self.absent_content) - - self.storage = add_filter(storage, read_only()) + self.absent_id = base_storage.id(self.absent_content) @istest def can_contains(self): @@ -124,21 +79,21 @@ @istest def can_get_random(self): - self.assertEqual(1, len(self.storage.get_random(1))) - self.assertEqual(len(self.storage), len(self.storage.get_random(1000))) + self.assertEqual(1, len(list(self.storage.get_random(1)))) + print(list(self.storage.get_random(1000))) + self.assertEqual(len(list(self.storage)), + len(set(self.storage.get_random(1000)))) @istest def cannot_add(self): new_id = self.storage.add(b'New content') result = self.storage.add(self.valid_content, self.valid_id) - self.assertNotIn(new_id, self.storage) + self.assertIsNone(new_id, self.storage) self.assertIsNone(result) @istest def cannot_restore(self): - new_id = self.storage.restore(b'New content') result = self.storage.restore(self.valid_content, self.valid_id) - self.assertNotIn(new_id, self.storage) self.assertIsNone(result) @@ -154,11 +109,15 @@ # Use a hack here : as the mock uses the content as id, it is easy to # create contents that are filtered or not. self.prefix = '71' - storage = MockObjStorage() - # Make the storage filtered + self.sconf = {'cls': 'pathslicing', + 'args': {'root': tempfile.mkdtemp(), + 'slicing': '0:5'}} + storage = get_objstorage(**self.sconf) self.base_storage = storage - self.storage = self.filter_storage(storage) + self.storage = self.filter_storage(self.sconf) + # Set the id calculators + storage.id = lambda cont: hashutil.hashdata(cont)['sha1'] # Present content with valid id self.present_valid_content = self.ensure_valid(b'yroqdtotji') @@ -208,14 +167,14 @@ self.true_missing_corrupted_invalid_content) # Add the content that are supposed to be present - storage.add(self.present_valid_content) - storage.add(self.present_invalid_content) - storage.add(self.present_corrupted_valid_content, - obj_id=self.present_corrupted_valid_id) - storage.add(self.present_corrupted_invalid_content, - obj_id=self.present_corrupted_invalid_id) - - def filter_storage(self, storage): + self.storage.add(self.present_valid_content) + self.storage.add(self.present_invalid_content) + self.storage.add(self.present_corrupted_valid_content, + obj_id=self.present_corrupted_valid_id) + self.storage.add(self.present_corrupted_invalid_content, + obj_id=self.present_corrupted_invalid_id) + + def filter_storage(self, sconf): raise NotImplementedError( 'Id_filter test class must have a filter_storage method') @@ -359,8 +318,10 @@ self.assertFalse(hex_obj_id.startswith(self.prefix)) return content - def filter_storage(self, storage): - return add_filter(storage, id_prefix(self.prefix)) + def filter_storage(self, sconf): + return get_objstorage('filtered', + {'storage_conf': sconf, + 'filters_conf': [id_prefix(self.prefix)]}) @attr('!db') @@ -369,5 +330,7 @@ self.regex = r'[a-f][0-9].*' super().setUp() - def filter_storage(self, storage): - return add_filter(storage, id_regex(self.regex)) + def filter_storage(self, sconf): + return get_objstorage('filtered', + {'storage_conf': sconf, + 'filters_conf': [id_regex(self.regex)]}) diff --git a/swh/objstorage/tests/test_objstorage_api.py b/swh/objstorage/tests/test_objstorage_api.py --- a/swh/objstorage/tests/test_objstorage_api.py +++ b/swh/objstorage/tests/test_objstorage_api.py @@ -6,18 +6,17 @@ import tempfile import unittest -from nose.tools import istest from nose.plugins.attrib import attr -from swh.core import hashutil -from swh.objstorage.exc import ObjNotFoundError, Error +from swh.objstorage import get_objstorage +from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture from swh.objstorage.tests.server_testing import ServerTestFixture -from swh.objstorage.api.client import RemoteObjStorage from swh.objstorage.api.server import app @attr('db') -class TestRemoteObjStorage(ServerTestFixture, unittest.TestCase): +class TestRemoteObjStorage(ServerTestFixture, ObjStorageTestFixture, + unittest.TestCase): """ Test the remote archive API. """ @@ -26,63 +25,4 @@ 'storage_slicing': '0:1/0:5'} self.app = app super().setUp() - self.objstorage = RemoteObjStorage(self.url()) - - def tearDown(self): - super().tearDown() - - @istest - def content_add(self): - content = bytes('Test content', 'utf8') - id = self.objstorage.content_add(content) - self.assertEquals(self.objstorage.content_get(id), content) - - @istest - def content_get_present(self): - content = bytes('content_get_present', 'utf8') - content_hash = hashutil.hashdata(content) - id = self.objstorage.content_add(content) - self.assertEquals(content_hash['sha1'], id) - - @istest - def content_get_missing(self): - content = bytes('content_get_missing', 'utf8') - content_hash = hashutil.hashdata(content) - with self.assertRaises(ObjNotFoundError): - self.objstorage.content_get(content_hash['sha1']) - - @istest - def content_get_random(self): - ids = [] - for i in range(100): - content = bytes('content_get_present', 'utf8') - id = self.objstorage.content_add(content) - ids.append(id) - for id in self.objstorage.content_get_random(50): - self.assertIn(id, ids) - - @istest - def content_check_invalid(self): - content = bytes('content_check_invalid', 'utf8') - invalid_id = hashutil.hashdata(b'invalid content')['sha1'] - # Add the content with an invalid id. - self.objstorage.content_add(content, invalid_id) - # Then check it and expect an error. - with self.assertRaises(Error): - self.objstorage.content_check(invalid_id) - - @istest - def content_check_valid(self): - content = bytes('content_check_valid', 'utf8') - id = self.objstorage.content_add(content) - try: - self.objstorage.content_check(id) - except: - self.fail('Integrity check failed') - - @istest - def content_check_missing(self): - content = bytes('content_check_valid', 'utf8') - content_hash = hashutil.hashdata(content) - with self.assertRaises(ObjNotFoundError): - self.objstorage.content_check(content_hash['sha1']) + self.storage = get_objstorage('remote', {'base_url': self.url()}) diff --git a/swh/objstorage/tests/test_objstorage_instantiation.py b/swh/objstorage/tests/test_objstorage_instantiation.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/tests/test_objstorage_instantiation.py @@ -0,0 +1,47 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import tempfile +import unittest + +from nose.tools import istest + +from swh.objstorage.tests.server_testing import ServerTestFixture +from swh.objstorage import get_objstorage +from swh.objstorage.objstorage_pathslicing import PathSlicingObjStorage +from swh.objstorage.api.client import RemoteObjStorage +from swh.objstorage.api.server import app + + +class TestObjStorageInitialization(ServerTestFixture, unittest.TestCase): + """ Test that the methods for ObjStorage initializations with + `get_objstorage` works properly. + """ + + def setUp(self): + self.path = tempfile.mkdtemp() + # Server is launched at self.url() + self.app = app + self.config = {'storage_base': tempfile.mkdtemp(), + 'storage_slicing': '0:1/0:5'} + super().setUp() + + @istest + def pathslicing_objstorage(self): + conf = { + 'cls': 'pathslicing', + 'args': {'root': self.path, 'slicing': '0:2/0:5'} + } + st = get_objstorage(**conf) + self.assertTrue(isinstance(st, PathSlicingObjStorage)) + + @istest + def remote_objstorage(self): + conf = { + 'cls': 'remote', + 'args': {'base_url': self.url()} + } + st = get_objstorage(**conf) + self.assertTrue(isinstance(st, RemoteObjStorage)) diff --git a/swh/objstorage/tests/test_objstorage_pathslicing.py b/swh/objstorage/tests/test_objstorage_pathslicing.py --- a/swh/objstorage/tests/test_objstorage_pathslicing.py +++ b/swh/objstorage/tests/test_objstorage_pathslicing.py @@ -10,7 +10,7 @@ from swh.core import hashutil from swh.objstorage import exc -from swh.objstorage import PathSlicingObjStorage +from swh.objstorage import get_objstorage from objstorage_testing import ObjStorageTestFixture @@ -21,7 +21,10 @@ super().setUp() self.slicing = '0:2/2:4/4:6' self.tmpdir = tempfile.mkdtemp() - self.storage = PathSlicingObjStorage(self.tmpdir, self.slicing) + self.storage = get_objstorage( + 'pathslicing', + {'root': self.tmpdir, 'slicing': self.slicing} + ) def content_path(self, obj_id): hex_obj_id = hashutil.hash_to_hex(obj_id)