diff --git a/swh/objstorage/__init__.py b/swh/objstorage/__init__.py --- a/swh/objstorage/__init__.py +++ b/swh/objstorage/__init__.py @@ -7,7 +7,9 @@ from .objstorage_pathslicing import PathSlicingObjStorage from .objstorage_in_memory import InMemoryObjStorage from .api.client import RemoteObjStorage -from .multiplexer import MultiplexerObjStorage, StripingObjStorage +from .multiplexer import ( + MultiplexerObjStorage, SizeTieringObjStorage, StripingObjStorage, +) from .multiplexer.filter import add_filters @@ -84,3 +86,20 @@ _STORAGE_CLASSES['striping'] = _construct_striping_objstorage + + +def _construct_size_tiering_objstorage(objstorages): + storages = [] + for conf in objstorages: + min_size = conf.pop('min_size', 0) + max_size = conf.pop('max_size', None) + storage = get_objstorage(**conf) + storages.append({ + 'storage': storage, + 'min_size': min_size, + 'max_size': max_size, + }) + return SizeTieringObjStorage(storages) + + +_STORAGE_CLASSES['size-tiering'] = _construct_size_tiering_objstorage diff --git a/swh/objstorage/multiplexer/__init__.py b/swh/objstorage/multiplexer/__init__.py --- a/swh/objstorage/multiplexer/__init__.py +++ b/swh/objstorage/multiplexer/__init__.py @@ -1,5 +1,7 @@ from .multiplexer_objstorage import MultiplexerObjStorage from .striping_objstorage import StripingObjStorage +from .size_tiering_objstorage import SizeTieringObjStorage -__all__ = ['MultiplexerObjStorage', 'StripingObjStorage'] +__all__ = ['MultiplexerObjStorage', 'StripingObjStorage', + 'SizeTieringObjStorage'] diff --git a/swh/objstorage/multiplexer/size_tiering_objstorage.py b/swh/objstorage/multiplexer/size_tiering_objstorage.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/multiplexer/size_tiering_objstorage.py @@ -0,0 +1,120 @@ +# Copyright (C) 2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import queue + +from ..exc import ObjNotFoundError +from ..objstorage import ObjStorage +from .multiplexer_objstorage import MultiplexerObjStorage, ObjStorageThread + + +class SizeTieringObjStorage(MultiplexerObjStorage): + """An ObjStorage dispatcher that distributes objects across objstorages + according to their size. + + Example:: + SizeTieringObjStorage([ + { + 'storage': PathSlicingObjStorage('/small', '0:1/1:5'), + 'min_size': 0, + 'max_size': 20 * 1024 - 1, + }, { + 'storage': PathSlicingObjStorage('/large', '0:4'), + 'min_size': 20 * 1024, + } + ]) + """ + def __init__(self, storage_configs, **kwargs): + bounds = [] + storages = [] + for storage_config in sorted( + storage_configs, + key=lambda config: config.get('min_size', 0) + ): + storages.append(storage_config['storage']) + bounds.append([storage_config.get('min_size', 0), + storage_config.get('max_size')]) + + if bounds[0][0] != 0: + raise ValueError( + 'Minimum size must be 0 for SizeTieringObjStorage, found %s' % + bounds[0][0] + ) + + if bounds[-1][1] is not None: + raise ValueError( + 'Maximum size must be None for SizeTieringObjStorage' + ', found %s' % bounds[-1][1] + ) + + for i in range(len(bounds) - 1): + left_bound = bounds[i][1] + right_bound = bounds[i+1][0] + if right_bound != left_bound + 1: + raise ValueError( + 'SizeTieringObjStorage ranges must be contiguous, ' + 'found issue between %s and %s' % (left_bound, right_bound) + ) + + super().__init__(storages, **kwargs) + self.upper_bounds = [bound[1] for bound in bounds] + + def get_storage_for_length(self, length): + for i, upper_bound in enumerate(self.upper_bounds): + if upper_bound and length > upper_bound: + continue + break + return self.storage_threads[i] + + def add(self, content, obj_id=None, check_presence=True): + """ Add a new object to the object storage. + + Only adds the content to a the single storage which stores objects of + the given size. + """ + storage = self.get_storage_for_length(len(content)) + return storage.add(content, obj_id=obj_id, + check_presence=check_presence) + + def restore(self, content, obj_id=None): + storage = self.get_storage_for_length(len(content)) + return storage.restore(content, obj_id=obj_id) + + def add_batch(self, contents, check_presence=True): + ctr = 0 + mailbox = queue.Queue() + for obj_id, content in contents.items(): + ctr += 1 + storage = self.get_storage_for_length(len(content)) + storage.queue_command('add', content, obj_id=obj_id, + mailbox=mailbox, + check_presence=check_presence) + + return len(ObjStorageThread.collect_results(mailbox, ctr)) + + def delete(self, obj_id): + ObjStorage.delete(self, obj_id) # check delete permission + + ctr = 0 + mailbox = queue.Queue() + for storage in self.get_write_threads(obj_id): + ctr += 1 + storage.queue_command('delete', obj_id, mailbox=mailbox) + + deleted = 0 + while ctr: + ret = mailbox.get() + ctr -= 1 + if ret['type'] == 'exception': + if isinstance(ret['result'], ObjNotFoundError): + continue + raise ret['result'] from None + + deleted += 1 + + if not deleted: + raise ObjNotFoundError(obj_id) + + return True diff --git a/swh/objstorage/tests/test_objstorage_size_tiering.py b/swh/objstorage/tests/test_objstorage_size_tiering.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/tests/test_objstorage_size_tiering.py @@ -0,0 +1,68 @@ +# Copyright (C) 2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import os +import shutil +import tempfile +import unittest + +from nose.tools import istest + +from swh.objstorage import get_objstorage +from .objstorage_testing import ObjStorageTestFixture + + +class TestSizeTieringObjStorage(ObjStorageTestFixture, unittest.TestCase): + def setUp(self): + super().setUp() + self.base_dir = tempfile.mkdtemp() + self.size_threshold = 512 + os.mkdir(os.path.join(self.base_dir, 'root1')) + os.mkdir(os.path.join(self.base_dir, 'root2')) + storage_config = { + 'cls': 'size-tiering', + 'args': { + 'objstorages': [ + { + 'cls': 'pathslicing', + 'args': { + 'root': os.path.join(self.base_dir, 'root1'), + 'slicing': '0:2', + 'allow_delete': True, + }, + 'min_size': 0, + 'max_size': self.size_threshold - 1, + }, + { + 'cls': 'pathslicing', + 'args': { + 'root': os.path.join(self.base_dir, 'root2'), + 'slicing': '0:2', + 'allow_delete': True, + }, + 'min_size': self.size_threshold, + 'max_size': None, + }, + ] + } + } + self.storage = get_objstorage(**storage_config) + + def tearDown(self): + shutil.rmtree(self.base_dir) + + @istest + def add_tiering_behavior(self): + sizes = [self.size_threshold + i for i in [-5, 0, 5]] + contents = [b'\x00' * size for size in sizes] + + for content in contents: + obj_id = self.storage.add(content) + if len(content) < self.size_threshold: + self.assertIn(obj_id, self.storage.storages[0]) + self.assertNotIn(obj_id, self.storage.storages[1]) + else: + self.assertNotIn(obj_id, self.storage.storages[0]) + self.assertIn(obj_id, self.storage.storages[1])