diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py --- a/swh/storage/__init__.py +++ b/swh/storage/__init__.py @@ -3,6 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import warnings from . import storage @@ -13,10 +14,11 @@ pass -STORAGE_IMPLEMENTATION = {'local', 'remote', 'memory', 'filter', 'buffer'} +STORAGE_IMPLEMENTATION = { + 'pipeline', 'local', 'remote', 'memory', 'filter', 'buffer'} -def get_storage(cls, args): +def get_storage(cls, **kwargs): """Get a storage object of class `storage_class` with arguments `storage_args`. @@ -27,7 +29,7 @@ - args (dict): dictionary with keys Returns: - an instance of swh.storage.Storage (either local or remote) + an instance of swh.storage.Storage or compatible class Raises: ValueError if passed an unknown storage class. @@ -37,6 +39,15 @@ raise ValueError('Unknown storage class `%s`. Supported: %s' % ( cls, ', '.join(STORAGE_IMPLEMENTATION))) + if 'args' in kwargs: + warnings.warn( + 'Explicit "args" key is deprecated, use keys directly instead.', + DeprecationWarning) + kwargs = kwargs['args'] + + if cls == 'pipeline': + return get_storage_pipeline(**kwargs) + if cls == 'remote': from .api.client import RemoteStorage as Storage elif cls == 'local': @@ -48,4 +59,36 @@ elif cls == 'buffer': from .buffer import BufferingProxyStorage as Storage - return Storage(**args) + return Storage(**kwargs) + + +def get_storage_pipeline(steps): + """Recursively get a storage object that may use other storage objects + as backends. + + Args: + steps (List[dict]): List of dicts that may be used as kwargs for + `get_storage`. + + Returns: + an instance of swh.storage.Storage or compatible class + + Raises: + ValueError if passed an unknown storage class. + """ + storage_config = None + for step in reversed(steps): + if 'args' in step: + warnings.warn( + 'Explicit "args" key is deprecated, use keys directly ' + 'instead.', + DeprecationWarning) + step = { + 'cls': step['cls'], + **step['args'], + } + if storage_config: + step['storage'] = storage_config + storage_config = step + + return get_storage(**storage_config) diff --git a/swh/storage/tests/test_init.py b/swh/storage/tests/test_init.py --- a/swh/storage/tests/test_init.py +++ b/swh/storage/tests/test_init.py @@ -20,6 +20,33 @@ def test_get_storage(mock_pool): """Instantiating an existing storage should be ok + """ + mock_pool.ThreadedConnectionPool.return_value = None + for cls, real_class, dummy_args in [ + ('remote', RemoteStorage, {'url': 'url'}), + ('memory', MemoryStorage, {}), + ('local', DbStorage, { + 'db': 'postgresql://db', 'objstorage': { + 'cls': 'memory', 'args': {}, + }, + }), + ('filter', FilteringProxyStorage, {'storage': { + 'cls': 'memory'} + }), + ('buffer', BufferingProxyStorage, {'storage': { + 'cls': 'memory'} + }), + ]: + actual_storage = get_storage(cls, **dummy_args) + assert actual_storage is not None + assert isinstance(actual_storage, real_class) + + +@patch('swh.storage.storage.psycopg2.pool') +def test_get_storage_legacy_args(mock_pool): + """Instantiating an existing storage should be ok even with the legacy + explicit 'args' keys + """ mock_pool.ThreadedConnectionPool.return_value = None for cls, real_class, dummy_args in [ @@ -37,7 +64,8 @@ 'cls': 'memory', 'args': {}} }), ]: - actual_storage = get_storage(cls, args=dummy_args) + with pytest.warns(DeprecationWarning): + actual_storage = get_storage(cls, args=dummy_args) assert actual_storage is not None assert isinstance(actual_storage, real_class) @@ -48,3 +76,58 @@ """ with pytest.raises(ValueError, match='Unknown storage class `unknown`'): get_storage('unknown', args=[]) + + +def test_get_storage_pipeline(): + config = { + 'cls': 'pipeline', + 'steps': [ + { + 'cls': 'filter', + }, + { + 'cls': 'buffer', + 'min_batch_size': { + 'content': 10, + }, + }, + { + 'cls': 'memory', + } + ] + } + + storage = get_storage(**config) + + assert isinstance(storage, FilteringProxyStorage) + assert isinstance(storage.storage, BufferingProxyStorage) + assert isinstance(storage.storage.storage, MemoryStorage) + + +def test_get_storage_pipeline_legacy_args(): + config = { + 'cls': 'pipeline', + 'steps': [ + { + 'cls': 'filter', + }, + { + 'cls': 'buffer', + 'args': { + 'min_batch_size': { + 'content': 10, + }, + } + }, + { + 'cls': 'memory', + } + ] + } + + with pytest.warns(DeprecationWarning): + storage = get_storage(**config) + + assert isinstance(storage, FilteringProxyStorage) + assert isinstance(storage.storage, BufferingProxyStorage) + assert isinstance(storage.storage.storage, MemoryStorage)