diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py --- a/swh/storage/__init__.py +++ b/swh/storage/__init__.py @@ -13,7 +13,8 @@ pass -STORAGE_IMPLEMENTATION = {'local', 'remote', 'memory', 'filter', 'buffer'} +STORAGE_IMPLEMENTATION = { + 'pipeline', 'local', 'remote', 'memory', 'filter', 'buffer'} def get_storage(cls, args): @@ -27,7 +28,7 @@ - args (dict): dictionary with keys Returns: - an instance of swh.storage.Storage (either local or remote) + an instance of swh.storage.Storage or compatible class Raises: ValueError if passed an unknown storage class. @@ -37,6 +38,11 @@ raise ValueError('Unknown storage class `%s`. Supported: %s' % ( cls, ', '.join(STORAGE_IMPLEMENTATION))) + if cls == 'pipeline': + import pprint + pprint.pprint(args) + return get_storage_pipeline(**args) + if cls == 'remote': from .api.client import RemoteStorage as Storage elif cls == 'local': @@ -49,3 +55,35 @@ from .buffer import BufferingProxyStorage as Storage return Storage(**args) + + +def get_storage_pipeline(steps): + """Recursively get a storage object that may use other storage objects + as backends. + + Args: + steps (List[dict]): List of dicts that may be used as kwargs for + `get_storage`. + + Returns: + an instance of swh.storage.Storage or compatible class + + Raises: + ValueError if passed an unknown storage class. + """ + storage_config = None + for step in reversed(steps): + if isinstance(step, str): + cls = step + args = {} + else: + cls = step['cls'] + args = step.get('args', {}) + if storage_config: + args['storage'] = storage_config + storage_config = { + 'cls': cls, + 'args': args, + } + + return get_storage(**storage_config) diff --git a/swh/storage/tests/test_init.py b/swh/storage/tests/test_init.py --- a/swh/storage/tests/test_init.py +++ b/swh/storage/tests/test_init.py @@ -48,3 +48,59 @@ """ with pytest.raises(ValueError, match='Unknown storage class `unknown`'): get_storage('unknown', args=[]) + + +def test_get_storage_pipeline(): + config = { + 'cls': 'pipeline', + 'args': { + 'steps': [ + { + 'cls': 'filter', + }, + { + 'cls': 'buffer', + 'args': { + 'min_batch_size': { + 'content': 10, + }, + } + }, + { + 'cls': 'memory', + } + ] + } + } + + storage = get_storage(**config) + + assert isinstance(storage, FilteringProxyStorage) + assert isinstance(storage.storage, BufferingProxyStorage) + assert isinstance(storage.storage.storage, MemoryStorage) + + +def test_get_storage_pipeline_omit_keys(): + config = { + 'cls': 'pipeline', + 'args': { + 'steps': [ + 'filter', + { + 'cls': 'buffer', + 'args': { + 'min_batch_size': { + 'content': 10, + }, + } + }, + 'memory', + ] + } + } + + storage = get_storage(**config) + + assert isinstance(storage, FilteringProxyStorage) + assert isinstance(storage.storage, BufferingProxyStorage) + assert isinstance(storage.storage.storage, MemoryStorage)