diff --git a/swh/objstorage/__init__.py b/swh/objstorage/__init__.py index 939c418..fd36149 100644 --- a/swh/objstorage/__init__.py +++ b/swh/objstorage/__init__.py @@ -1,77 +1,86 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from .objstorage import ObjStorage from .objstorage_pathslicing import PathSlicingObjStorage from .objstorage_in_memory import InMemoryObjStorage from .api.client import RemoteObjStorage -from .multiplexer import MultiplexerObjStorage +from .multiplexer import MultiplexerObjStorage, StripingObjStorage from .multiplexer.filter import add_filters __all__ = ['get_objstorage', 'ObjStorage'] _STORAGE_CLASSES = { 'pathslicing': PathSlicingObjStorage, 'remote': RemoteObjStorage, 'in-memory': InMemoryObjStorage, } try: from swh.objstorage.cloud.objstorage_azure import ( AzureCloudObjStorage, PrefixedAzureCloudObjStorage, ) _STORAGE_CLASSES['azure'] = AzureCloudObjStorage _STORAGE_CLASSES['azure-prefixed'] = PrefixedAzureCloudObjStorage except ImportError: pass try: from swh.objstorage.objstorage_rados import RADOSObjStorage _STORAGE_CLASSES['rados'] = RADOSObjStorage except ImportError: pass def get_objstorage(cls, args): """ Create an ObjStorage using the given implementation class. Args: cls (str): objstorage class unique key contained in the _STORAGE_CLASSES dict. args (dict): arguments for the required class of objstorage that must match exactly the one in the `__init__` method of the class. Returns: subclass of ObjStorage that match the given `storage_class` argument. Raises: ValueError: if the given storage class is not a valid objstorage key. """ try: return _STORAGE_CLASSES[cls](**args) except KeyError: raise ValueError('Storage class %s does not exist' % cls) def _construct_filtered_objstorage(storage_conf, filters_conf): return add_filters( get_objstorage(**storage_conf), filters_conf ) _STORAGE_CLASSES['filtered'] = _construct_filtered_objstorage def _construct_multiplexer_objstorage(objstorages): storages = [get_objstorage(**conf) for conf in objstorages] return MultiplexerObjStorage(storages) _STORAGE_CLASSES['multiplexer'] = _construct_multiplexer_objstorage + + +def _construct_striping_objstorage(objstorages): + storages = [get_objstorage(**conf) + for conf in objstorages] + return StripingObjStorage(storages) + + +_STORAGE_CLASSES['striping'] = _construct_striping_objstorage diff --git a/swh/objstorage/multiplexer/__init__.py b/swh/objstorage/multiplexer/__init__.py index 09f8b18..33f21ff 100644 --- a/swh/objstorage/multiplexer/__init__.py +++ b/swh/objstorage/multiplexer/__init__.py @@ -1,4 +1,5 @@ from .multiplexer_objstorage import MultiplexerObjStorage +from .striping_objstorage import StripingObjStorage -__all__ = ['MultiplexerObjStorage'] +__all__ = ['MultiplexerObjStorage', 'StripingObjStorage'] diff --git a/swh/objstorage/multiplexer/striping_objstorage.py b/swh/objstorage/multiplexer/striping_objstorage.py new file mode 100644 index 0000000..38a7c2b --- /dev/null +++ b/swh/objstorage/multiplexer/striping_objstorage.py @@ -0,0 +1,42 @@ +# Copyright (C) 2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from .multiplexer_objstorage import MultiplexerObjStorage + + +class StripingObjStorage(MultiplexerObjStorage): + """Stripes objects across multiple objstorages + + This objstorage implementation will write objects to objstorages in a + predictable way: it takes the modulo of the last 8 bytes of the object + identifier with the number of object storages passed, which will yield an + (almost) even distribution. + + Objects are read from all storages in turn until it succeeds. + + """ + MOD_BYTES = 8 + + def __init__(self, storages, **kwargs): + super().__init__(storages, **kwargs) + self.num_storages = len(storages) + + def get_storage_index(self, obj_id): + if obj_id is None: + raise ValueError( + 'StripingObjStorage always needs obj_id to be set' + ) + + index = int.from_bytes(obj_id[:-self.MOD_BYTES], 'little') + return index % self.num_storages + + def get_write_storages(self, obj_id): + idx = self.get_storage_index(obj_id) + yield self.storages[idx] + + def get_read_storages(self, obj_id): + idx = self.get_storage_index(obj_id) + for i in range(self.num_storages): + yield self.storages[(idx + i) % self.num_storages] diff --git a/swh/objstorage/tests/test_objstorage_striping.py b/swh/objstorage/tests/test_objstorage_striping.py new file mode 100644 index 0000000..6466874 --- /dev/null +++ b/swh/objstorage/tests/test_objstorage_striping.py @@ -0,0 +1,84 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import os +import shutil +import tempfile +import unittest + +from nose.tools import istest + +from swh.objstorage import get_objstorage +from objstorage_testing import ObjStorageTestFixture + + +class TestMultiplexerObjStorage(ObjStorageTestFixture, unittest.TestCase): + + def setUp(self): + super().setUp() + self.base_dir = tempfile.mkdtemp() + os.mkdir(os.path.join(self.base_dir, 'root1')) + os.mkdir(os.path.join(self.base_dir, 'root2')) + storage_config = { + 'cls': 'striping', + 'args': { + 'objstorages': [ + { + 'cls': 'pathslicing', + 'args': { + 'root': os.path.join(self.base_dir, 'root1'), + 'slicing': '0:2', + 'allow_delete': True, + } + }, + { + 'cls': 'pathslicing', + 'args': { + 'root': os.path.join(self.base_dir, 'root2'), + 'slicing': '0:2', + 'allow_delete': True, + } + }, + ] + } + } + self.storage = get_objstorage(**storage_config) + + def tearDown(self): + shutil.rmtree(self.base_dir) + + @istest + def add_get_wo_id(self): + self.skipTest("can't add without id in the multiplexer storage") + + @istest + def add_striping_behavior(self): + exp_storage_counts = [0, 0] + storage_counts = [0, 0] + for i in range(100): + content, obj_id = self.hash_content( + b'striping_behavior_test%02d' % i + ) + self.storage.add(content, obj_id) + exp_storage_counts[self.storage.get_storage_index(obj_id)] += 1 + count = 0 + for i, storage in enumerate(self.storage.storages): + if obj_id not in storage: + continue + count += 1 + storage_counts[i] += 1 + self.assertEqual(count, 1) + self.assertEqual(storage_counts, exp_storage_counts) + + @istest + def get_striping_behavior(self): + # Make sure we can read objects that are available in any backend + # storage + content, obj_id = self.hash_content(b'striping_behavior_test') + for storage in self.storage.storages: + storage.add(content, obj_id) + self.assertIn(obj_id, self.storage) + storage.delete(obj_id) + self.assertNotIn(obj_id, self.storage)