diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -24,6 +24,9 @@ [mypy-pytest.*] ignore_missing_imports = True +[mypy-retrying.*] +ignore_missing_imports = True + # temporary work-around for landing typing support in spite of the current # journal<->storage dependency loop [mypy-swh.journal.*] diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,5 +1,6 @@ hypothesis >= 3.11.0 pytest +pytest-mock pytest-postgresql >= 2.1.0 sqlalchemy-stubs # pytz is in fact a dep of swh.model[testing] and should not be necessary, but diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ python-dateutil vcversioner aiohttp +retrying diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py --- a/swh/storage/__init__.py +++ b/swh/storage/__init__.py @@ -15,7 +15,8 @@ STORAGE_IMPLEMENTATION = { - 'pipeline', 'local', 'remote', 'memory', 'filter', 'buffer'} + 'pipeline', 'local', 'remote', 'memory', 'filter', 'buffer', 'retry', +} def get_storage(cls, **kwargs): @@ -58,6 +59,8 @@ from .filter import FilteringProxyStorage as Storage elif cls == 'buffer': from .buffer import BufferingProxyStorage as Storage + elif cls == 'retry': + from .retrying import RetryingProxyStorage as Storage return Storage(**kwargs) diff --git a/swh/storage/retrying.py b/swh/storage/retrying.py new file mode 100644 --- /dev/null +++ b/swh/storage/retrying.py @@ -0,0 +1,61 @@ +# Copyright (C) 2019-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging +import psycopg2 +import traceback + +from typing import Dict, List + +from retrying import retry + +from swh.storage import get_storage, HashCollision + + +logger = logging.getLogger(__name__) + + +def retry_loading(error): + """Retry policy when some kind of failures occur (database integrity error, + hash collision, etc...) + + """ + exception_classes = [ + # raised when two parallel insertions insert the same data. + psycopg2.IntegrityError, + HashCollision, + ] + + if not any(isinstance(error, exc) for exc in exception_classes): + return False + + error_name = error.__module__ + '.' + error.__class__.__name__ + logger.warning('Retry loading a batch', exc_info=False, extra={ + 'swh_type': 'storage_retry', + 'swh_exception_type': error_name, + 'swh_exception': traceback.format_exception( + error.__class__, + error, + error.__traceback__, + ), + }) + + return True + + +class RetryingProxyStorage: + """Retrying proxy storage. For the storage's `_add` methods, retry to add + data when failing. + + """ + def __init__(self, storage): + self.storage = get_storage(**storage) + + def __getattr__(self, key): + return getattr(self.storage, key) + + @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) + def content_add(self, content: List[Dict]) -> Dict: + return self.storage.content_add(content) diff --git a/swh/storage/tests/test_init.py b/swh/storage/tests/test_init.py --- a/swh/storage/tests/test_init.py +++ b/swh/storage/tests/test_init.py @@ -14,6 +14,7 @@ from swh.storage.in_memory import Storage as MemoryStorage from swh.storage.buffer import BufferingProxyStorage from swh.storage.filter import FilteringProxyStorage +from swh.storage.retrying import RetryingProxyStorage @patch('swh.storage.storage.psycopg2.pool') @@ -36,6 +37,9 @@ ('buffer', BufferingProxyStorage, {'storage': { 'cls': 'memory'} }), + ('retry', RetryingProxyStorage, {'storage': { + 'cls': 'memory'} + }), ]: actual_storage = get_storage(cls, **dummy_args) assert actual_storage is not None diff --git a/swh/storage/tests/test_retrying.py b/swh/storage/tests/test_retrying.py new file mode 100644 --- /dev/null +++ b/swh/storage/tests/test_retrying.py @@ -0,0 +1,86 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import psycopg2 +import pytest + +from swh.storage import HashCollision +from swh.storage.retrying import RetryingProxyStorage + + +def test_retrying_proxy_storage_content(sample_data): + """Standard content_add works as before + + """ + sample_content = sample_data['content'][0] + storage = RetryingProxyStorage(storage={'cls': 'memory'}) + + content = next(storage.content_get([sample_content['sha1']])) + assert not content + + s = storage.content_add([sample_content]) + assert s == { + 'content:add': 1, + 'content:add:bytes': sample_content['length'], + 'skipped_content:add': 0 + } + + +count_local_content_add_call = 0 + + +def _count_content_add(content): + """Mimic content_add with failures + + """ + global count_local_content_add_call + count_local_content_add_call += 1 + error = { + # first try + 1: HashCollision('Refuse to add content'), + # second try + 2: psycopg2.IntegrityError('Refuse differently to add content'), + } + exception = error.get(count_local_content_add_call) + if exception: # we raise exceptions + raise exception + # at last, we accept to add content + return {'content:add': 1} + + +def test_retrying_proxy_storage_with_retry(sample_data, mocker): + """Multiple retries for hash collision and psycopg2 error + + """ + mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add') + mock_memory.side_effect = _count_content_add + + sample_content = sample_data['content'][0] + storage = RetryingProxyStorage(storage={'cls': 'memory'}) + + content = next(storage.content_get([sample_content['sha1']])) + assert not content + + s = storage.content_add([sample_content]) + assert s == { + 'content:add': 1, + } + + +def test_retrying_proxy_storage_failure_to_add(sample_data, mocker): + """Other errors are raising as usual + + """ + mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add') + mock_memory.side_effect = ValueError('Refuse to add') + + sample_content = sample_data['content'][0] + storage = RetryingProxyStorage(storage={'cls': 'memory'}) + + content = next(storage.content_get([sample_content['sha1']])) + assert not content + + with pytest.raises(ValueError, match='Refuse to add'): + storage.content_add([sample_content])