diff --git a/mypy.ini b/mypy.ini index 9da07308..43606ce6 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,33 +1,36 @@ [mypy] namespace_packages = True # due to the conditional import logic on swh.journal, in some cases a specific # type: ignore is needed, in other it isn't... warn_unused_ignores = False # support for sqlalchemy magic: see https://github.com/dropbox/sqlalchemy-stubs plugins = sqlmypy # 3rd party libraries without stubs (yet) # only shipped indirectly via hypothesis [mypy-django.*] ignore_missing_imports = True [mypy-pkg_resources.*] ignore_missing_imports = True [mypy-psycopg2.*] ignore_missing_imports = True [mypy-pytest.*] ignore_missing_imports = True +[mypy-retrying.*] +ignore_missing_imports = True + # temporary work-around for landing typing support in spite of the current # journal<->storage dependency loop [mypy-swh.journal.*] ignore_missing_imports = True [mypy-pytest_postgresql.*] ignore_missing_imports = True diff --git a/requirements-test.txt b/requirements-test.txt index 6c90bf5f..a61e13d0 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,9 +1,10 @@ hypothesis >= 3.11.0 pytest +pytest-mock pytest-postgresql >= 2.1.0 sqlalchemy-stubs # pytz is in fact a dep of swh.model[testing] and should not be necessary, but # the dep on swh.model in the main requirements-swh.txt file shadows this one # adding the [testing] extra. swh.model[testing] >= 0.0.50 pytz diff --git a/requirements.txt b/requirements.txt index 2cbc284d..96791c82 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ click flask psycopg2 python-dateutil vcversioner aiohttp +retrying diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py index 189e99cd..9741394a 100644 --- a/swh/storage/__init__.py +++ b/swh/storage/__init__.py @@ -1,94 +1,97 @@ -# Copyright (C) 2015-2016 The Software Heritage developers +# Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import warnings from . import storage Storage = storage.Storage class HashCollision(Exception): pass STORAGE_IMPLEMENTATION = { - 'pipeline', 'local', 'remote', 'memory', 'filter', 'buffer'} + 'pipeline', 'local', 'remote', 'memory', 'filter', 'buffer', 'retry', +} def get_storage(cls, **kwargs): """Get a storage object of class `storage_class` with arguments `storage_args`. Args: storage (dict): dictionary with keys: - cls (str): storage's class, either local, remote, memory, filter, buffer - args (dict): dictionary with keys Returns: an instance of swh.storage.Storage or compatible class Raises: ValueError if passed an unknown storage class. """ if cls not in STORAGE_IMPLEMENTATION: raise ValueError('Unknown storage class `%s`. Supported: %s' % ( cls, ', '.join(STORAGE_IMPLEMENTATION))) if 'args' in kwargs: warnings.warn( 'Explicit "args" key is deprecated, use keys directly instead.', DeprecationWarning) kwargs = kwargs['args'] if cls == 'pipeline': return get_storage_pipeline(**kwargs) if cls == 'remote': from .api.client import RemoteStorage as Storage elif cls == 'local': from .storage import Storage elif cls == 'memory': from .in_memory import Storage elif cls == 'filter': from .filter import FilteringProxyStorage as Storage elif cls == 'buffer': from .buffer import BufferingProxyStorage as Storage + elif cls == 'retry': + from .retry import RetryingProxyStorage as Storage return Storage(**kwargs) def get_storage_pipeline(steps): """Recursively get a storage object that may use other storage objects as backends. Args: steps (List[dict]): List of dicts that may be used as kwargs for `get_storage`. Returns: an instance of swh.storage.Storage or compatible class Raises: ValueError if passed an unknown storage class. """ storage_config = None for step in reversed(steps): if 'args' in step: warnings.warn( 'Explicit "args" key is deprecated, use keys directly ' 'instead.', DeprecationWarning) step = { 'cls': step['cls'], **step['args'], } if storage_config: step['storage'] = storage_config storage_config = step return get_storage(**storage_config) diff --git a/swh/storage/retry.py b/swh/storage/retry.py new file mode 100644 index 00000000..756ec927 --- /dev/null +++ b/swh/storage/retry.py @@ -0,0 +1,60 @@ +# Copyright (C) 2019-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging +import psycopg2 +import traceback + +from typing import Dict, List + +from retrying import retry + +from swh.storage import get_storage, HashCollision + + +logger = logging.getLogger(__name__) + + +RETRY_EXCEPTIONS = [ + # raised when two parallel insertions insert the same data + psycopg2.IntegrityError, + HashCollision, +] + + +def should_retry_adding(error: Exception) -> bool: + """Retry policy when some kind of failures occur (database integrity error, + hash collision, etc...) + + """ + retry = any(isinstance(error, exc) for exc in RETRY_EXCEPTIONS) + if retry: + error_name = error.__module__ + '.' + error.__class__.__name__ + logger.warning('Retry adding a batch', exc_info=False, extra={ + 'swh_type': 'storage_retry', + 'swh_exception_type': error_name, + 'swh_exception': traceback.format_exception( + error.__class__, + error, + error.__traceback__, + ), + }) + return retry + + +class RetryingProxyStorage: + """Storage implementation which retries adding objects when it specifically + fails (hash collision, integrity error). + + """ + def __init__(self, storage): + self.storage = get_storage(**storage) + + def __getattr__(self, key): + return getattr(self.storage, key) + + @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) + def content_add(self, content: List[Dict]) -> Dict: + return self.storage.content_add(content) diff --git a/swh/storage/tests/test_init.py b/swh/storage/tests/test_init.py index 2c1d493b..e95d4ea9 100644 --- a/swh/storage/tests/test_init.py +++ b/swh/storage/tests/test_init.py @@ -1,133 +1,137 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from unittest.mock import patch from swh.storage import get_storage from swh.storage.api.client import RemoteStorage from swh.storage.storage import Storage as DbStorage from swh.storage.in_memory import Storage as MemoryStorage from swh.storage.buffer import BufferingProxyStorage from swh.storage.filter import FilteringProxyStorage +from swh.storage.retry import RetryingProxyStorage @patch('swh.storage.storage.psycopg2.pool') def test_get_storage(mock_pool): """Instantiating an existing storage should be ok """ mock_pool.ThreadedConnectionPool.return_value = None for cls, real_class, dummy_args in [ ('remote', RemoteStorage, {'url': 'url'}), ('memory', MemoryStorage, {}), ('local', DbStorage, { 'db': 'postgresql://db', 'objstorage': { 'cls': 'memory', 'args': {}, }, }), ('filter', FilteringProxyStorage, {'storage': { 'cls': 'memory'} }), ('buffer', BufferingProxyStorage, {'storage': { 'cls': 'memory'} }), + ('retry', RetryingProxyStorage, {'storage': { + 'cls': 'memory'} + }), ]: actual_storage = get_storage(cls, **dummy_args) assert actual_storage is not None assert isinstance(actual_storage, real_class) @patch('swh.storage.storage.psycopg2.pool') def test_get_storage_legacy_args(mock_pool): """Instantiating an existing storage should be ok even with the legacy explicit 'args' keys """ mock_pool.ThreadedConnectionPool.return_value = None for cls, real_class, dummy_args in [ ('remote', RemoteStorage, {'url': 'url'}), ('memory', MemoryStorage, {}), ('local', DbStorage, { 'db': 'postgresql://db', 'objstorage': { 'cls': 'memory', 'args': {}, }, }), ('filter', FilteringProxyStorage, {'storage': { 'cls': 'memory', 'args': {}} }), ('buffer', BufferingProxyStorage, {'storage': { 'cls': 'memory', 'args': {}} }), ]: with pytest.warns(DeprecationWarning): actual_storage = get_storage(cls, args=dummy_args) assert actual_storage is not None assert isinstance(actual_storage, real_class) def test_get_storage_failure(): """Instantiating an unknown storage should raise """ with pytest.raises(ValueError, match='Unknown storage class `unknown`'): get_storage('unknown', args=[]) def test_get_storage_pipeline(): config = { 'cls': 'pipeline', 'steps': [ { 'cls': 'filter', }, { 'cls': 'buffer', 'min_batch_size': { 'content': 10, }, }, { 'cls': 'memory', } ] } storage = get_storage(**config) assert isinstance(storage, FilteringProxyStorage) assert isinstance(storage.storage, BufferingProxyStorage) assert isinstance(storage.storage.storage, MemoryStorage) def test_get_storage_pipeline_legacy_args(): config = { 'cls': 'pipeline', 'steps': [ { 'cls': 'filter', }, { 'cls': 'buffer', 'args': { 'min_batch_size': { 'content': 10, }, } }, { 'cls': 'memory', } ] } with pytest.warns(DeprecationWarning): storage = get_storage(**config) assert isinstance(storage, FilteringProxyStorage) assert isinstance(storage.storage, BufferingProxyStorage) assert isinstance(storage.storage.storage, MemoryStorage) diff --git a/swh/storage/tests/test_retry.py b/swh/storage/tests/test_retry.py new file mode 100644 index 00000000..70af6356 --- /dev/null +++ b/swh/storage/tests/test_retry.py @@ -0,0 +1,89 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import psycopg2 +import pytest + +from swh.storage import HashCollision +from swh.storage.retry import ( + RetryingProxyStorage, should_retry_adding, RETRY_EXCEPTIONS +) + + +def test_should_retry_adding(): + """Specific exceptions should be elected for retrial + + """ + for exc in RETRY_EXCEPTIONS: + assert should_retry_adding(exc('error')) is True + + +def test_should_retry_adding_no_retry(): + """Unspecific exceptions should raise as usual + + """ + for exc in [ValueError, Exception]: + assert should_retry_adding(exc('fail!')) is False + + +def test_retrying_proxy_storage_content(sample_data): + """Standard content_add works as before + + """ + sample_content = sample_data['content'][0] + storage = RetryingProxyStorage(storage={'cls': 'memory'}) + + content = next(storage.content_get([sample_content['sha1']])) + assert not content + + s = storage.content_add([sample_content]) + assert s == { + 'content:add': 1, + 'content:add:bytes': sample_content['length'], + 'skipped_content:add': 0 + } + + +def test_retrying_proxy_storage_with_retry(sample_data, mocker): + """Multiple retries for hash collision and psycopg2 error but finally ok + + """ + mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add') + mock_memory.side_effect = [ + # first try goes ko + HashCollision('content hash collision'), + # second try goes ko + psycopg2.IntegrityError('content already inserted'), + # ok then! + {'content:add': 1} + ] + + sample_content = sample_data['content'][0] + storage = RetryingProxyStorage(storage={'cls': 'memory'}) + + content = next(storage.content_get([sample_content['sha1']])) + assert not content + + s = storage.content_add([sample_content]) + assert s == { + 'content:add': 1, + } + + +def test_retrying_proxy_storage_failure_to_add(sample_data, mocker): + """Other errors are raising as usual + + """ + mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add') + mock_memory.side_effect = ValueError('Refuse to add content always!') + + sample_content = sample_data['content'][0] + storage = RetryingProxyStorage(storage={'cls': 'memory'}) + + content = next(storage.content_get([sample_content['sha1']])) + assert not content + + with pytest.raises(ValueError, match='Refuse to add'): + storage.content_add([sample_content])