diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py index 50354eab..189e99cd 100644 --- a/swh/storage/__init__.py +++ b/swh/storage/__init__.py @@ -1,51 +1,94 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import warnings from . import storage Storage = storage.Storage class HashCollision(Exception): pass -STORAGE_IMPLEMENTATION = {'local', 'remote', 'memory', 'filter', 'buffer'} +STORAGE_IMPLEMENTATION = { + 'pipeline', 'local', 'remote', 'memory', 'filter', 'buffer'} -def get_storage(cls, args): +def get_storage(cls, **kwargs): """Get a storage object of class `storage_class` with arguments `storage_args`. Args: storage (dict): dictionary with keys: - cls (str): storage's class, either local, remote, memory, filter, buffer - args (dict): dictionary with keys Returns: - an instance of swh.storage.Storage (either local or remote) + an instance of swh.storage.Storage or compatible class Raises: ValueError if passed an unknown storage class. """ if cls not in STORAGE_IMPLEMENTATION: raise ValueError('Unknown storage class `%s`. Supported: %s' % ( cls, ', '.join(STORAGE_IMPLEMENTATION))) + if 'args' in kwargs: + warnings.warn( + 'Explicit "args" key is deprecated, use keys directly instead.', + DeprecationWarning) + kwargs = kwargs['args'] + + if cls == 'pipeline': + return get_storage_pipeline(**kwargs) + if cls == 'remote': from .api.client import RemoteStorage as Storage elif cls == 'local': from .storage import Storage elif cls == 'memory': from .in_memory import Storage elif cls == 'filter': from .filter import FilteringProxyStorage as Storage elif cls == 'buffer': from .buffer import BufferingProxyStorage as Storage - return Storage(**args) + return Storage(**kwargs) + + +def get_storage_pipeline(steps): + """Recursively get a storage object that may use other storage objects + as backends. + + Args: + steps (List[dict]): List of dicts that may be used as kwargs for + `get_storage`. + + Returns: + an instance of swh.storage.Storage or compatible class + + Raises: + ValueError if passed an unknown storage class. + """ + storage_config = None + for step in reversed(steps): + if 'args' in step: + warnings.warn( + 'Explicit "args" key is deprecated, use keys directly ' + 'instead.', + DeprecationWarning) + step = { + 'cls': step['cls'], + **step['args'], + } + if storage_config: + step['storage'] = storage_config + storage_config = step + + return get_storage(**storage_config) diff --git a/swh/storage/tests/test_init.py b/swh/storage/tests/test_init.py index 73092802..2c1d493b 100644 --- a/swh/storage/tests/test_init.py +++ b/swh/storage/tests/test_init.py @@ -1,50 +1,133 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from unittest.mock import patch from swh.storage import get_storage from swh.storage.api.client import RemoteStorage from swh.storage.storage import Storage as DbStorage from swh.storage.in_memory import Storage as MemoryStorage from swh.storage.buffer import BufferingProxyStorage from swh.storage.filter import FilteringProxyStorage @patch('swh.storage.storage.psycopg2.pool') def test_get_storage(mock_pool): """Instantiating an existing storage should be ok + """ + mock_pool.ThreadedConnectionPool.return_value = None + for cls, real_class, dummy_args in [ + ('remote', RemoteStorage, {'url': 'url'}), + ('memory', MemoryStorage, {}), + ('local', DbStorage, { + 'db': 'postgresql://db', 'objstorage': { + 'cls': 'memory', 'args': {}, + }, + }), + ('filter', FilteringProxyStorage, {'storage': { + 'cls': 'memory'} + }), + ('buffer', BufferingProxyStorage, {'storage': { + 'cls': 'memory'} + }), + ]: + actual_storage = get_storage(cls, **dummy_args) + assert actual_storage is not None + assert isinstance(actual_storage, real_class) + + +@patch('swh.storage.storage.psycopg2.pool') +def test_get_storage_legacy_args(mock_pool): + """Instantiating an existing storage should be ok even with the legacy + explicit 'args' keys + """ mock_pool.ThreadedConnectionPool.return_value = None for cls, real_class, dummy_args in [ ('remote', RemoteStorage, {'url': 'url'}), ('memory', MemoryStorage, {}), ('local', DbStorage, { 'db': 'postgresql://db', 'objstorage': { 'cls': 'memory', 'args': {}, }, }), ('filter', FilteringProxyStorage, {'storage': { 'cls': 'memory', 'args': {}} }), ('buffer', BufferingProxyStorage, {'storage': { 'cls': 'memory', 'args': {}} }), ]: - actual_storage = get_storage(cls, args=dummy_args) + with pytest.warns(DeprecationWarning): + actual_storage = get_storage(cls, args=dummy_args) assert actual_storage is not None assert isinstance(actual_storage, real_class) def test_get_storage_failure(): """Instantiating an unknown storage should raise """ with pytest.raises(ValueError, match='Unknown storage class `unknown`'): get_storage('unknown', args=[]) + + +def test_get_storage_pipeline(): + config = { + 'cls': 'pipeline', + 'steps': [ + { + 'cls': 'filter', + }, + { + 'cls': 'buffer', + 'min_batch_size': { + 'content': 10, + }, + }, + { + 'cls': 'memory', + } + ] + } + + storage = get_storage(**config) + + assert isinstance(storage, FilteringProxyStorage) + assert isinstance(storage.storage, BufferingProxyStorage) + assert isinstance(storage.storage.storage, MemoryStorage) + + +def test_get_storage_pipeline_legacy_args(): + config = { + 'cls': 'pipeline', + 'steps': [ + { + 'cls': 'filter', + }, + { + 'cls': 'buffer', + 'args': { + 'min_batch_size': { + 'content': 10, + }, + } + }, + { + 'cls': 'memory', + } + ] + } + + with pytest.warns(DeprecationWarning): + storage = get_storage(**config) + + assert isinstance(storage, FilteringProxyStorage) + assert isinstance(storage.storage, BufferingProxyStorage) + assert isinstance(storage.storage.storage, MemoryStorage)