diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -24,6 +24,9 @@ [mypy-pytest.*] ignore_missing_imports = True +[mypy-retrying.*] +ignore_missing_imports = True + # temporary work-around for landing typing support in spite of the current # journal<->storage dependency loop [mypy-swh.journal.*] diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,5 +1,6 @@ hypothesis >= 3.11.0 pytest +pytest-mock pytest-postgresql >= 2.1.0 sqlalchemy-stubs # pytz is in fact a dep of swh.model[testing] and should not be necessary, but diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ python-dateutil vcversioner aiohttp +retrying diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py --- a/swh/storage/__init__.py +++ b/swh/storage/__init__.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2016 The Software Heritage developers +# Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -15,7 +15,8 @@ STORAGE_IMPLEMENTATION = { - 'pipeline', 'local', 'remote', 'memory', 'filter', 'buffer'} + 'pipeline', 'local', 'remote', 'memory', 'filter', 'buffer', 'retry', +} def get_storage(cls, **kwargs): @@ -58,6 +59,8 @@ from .filter import FilteringProxyStorage as Storage elif cls == 'buffer': from .buffer import BufferingProxyStorage as Storage + elif cls == 'retry': + from .retry import RetryingProxyStorage as Storage return Storage(**kwargs) diff --git a/swh/storage/retry.py b/swh/storage/retry.py new file mode 100644 --- /dev/null +++ b/swh/storage/retry.py @@ -0,0 +1,60 @@ +# Copyright (C) 2019-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging +import psycopg2 +import traceback + +from typing import Dict, List + +from retrying import retry + +from swh.storage import get_storage, HashCollision + + +logger = logging.getLogger(__name__) + + +RETRY_EXCEPTIONS = [ + # raised when two parallel insertions insert the same data + psycopg2.IntegrityError, + HashCollision, +] + + +def should_retry_adding(error: Exception) -> bool: + """Retry policy when some kind of failures occur (database integrity error, + hash collision, etc...) + + """ + retry = any(isinstance(error, exc) for exc in RETRY_EXCEPTIONS) + if retry: + error_name = error.__module__ + '.' + error.__class__.__name__ + logger.warning('Retry adding a batch', exc_info=False, extra={ + 'swh_type': 'storage_retry', + 'swh_exception_type': error_name, + 'swh_exception': traceback.format_exception( + error.__class__, + error, + error.__traceback__, + ), + }) + return retry + + +class RetryingProxyStorage: + """Storage implementation which retries adding objects when it specifically + fails (hash collision, integrity error). + + """ + def __init__(self, storage): + self.storage = get_storage(**storage) + + def __getattr__(self, key): + return getattr(self.storage, key) + + @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) + def content_add(self, content: List[Dict]) -> Dict: + return self.storage.content_add(content) diff --git a/swh/storage/tests/test_init.py b/swh/storage/tests/test_init.py --- a/swh/storage/tests/test_init.py +++ b/swh/storage/tests/test_init.py @@ -14,6 +14,7 @@ from swh.storage.in_memory import Storage as MemoryStorage from swh.storage.buffer import BufferingProxyStorage from swh.storage.filter import FilteringProxyStorage +from swh.storage.retry import RetryingProxyStorage @patch('swh.storage.storage.psycopg2.pool') @@ -36,6 +37,9 @@ ('buffer', BufferingProxyStorage, {'storage': { 'cls': 'memory'} }), + ('retry', RetryingProxyStorage, {'storage': { + 'cls': 'memory'} + }), ]: actual_storage = get_storage(cls, **dummy_args) assert actual_storage is not None diff --git a/swh/storage/tests/test_retry.py b/swh/storage/tests/test_retry.py new file mode 100644 --- /dev/null +++ b/swh/storage/tests/test_retry.py @@ -0,0 +1,89 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import psycopg2 +import pytest + +from swh.storage import HashCollision +from swh.storage.retry import ( + RetryingProxyStorage, should_retry_adding, RETRY_EXCEPTIONS +) + + +def test_should_retry_adding(): + """Specific exceptions should be elected for retrial + + """ + for exc in RETRY_EXCEPTIONS: + assert should_retry_adding(exc('error')) is True + + +def test_should_retry_adding_no_retry(): + """Unspecific exceptions should raise as usual + + """ + for exc in [ValueError, Exception]: + assert should_retry_adding(exc('fail!')) is False + + +def test_retrying_proxy_storage_content(sample_data): + """Standard content_add works as before + + """ + sample_content = sample_data['content'][0] + storage = RetryingProxyStorage(storage={'cls': 'memory'}) + + content = next(storage.content_get([sample_content['sha1']])) + assert not content + + s = storage.content_add([sample_content]) + assert s == { + 'content:add': 1, + 'content:add:bytes': sample_content['length'], + 'skipped_content:add': 0 + } + + +def test_retrying_proxy_storage_with_retry(sample_data, mocker): + """Multiple retries for hash collision and psycopg2 error but finally ok + + """ + mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add') + mock_memory.side_effect = [ + # first try goes ko + HashCollision('content hash collision'), + # second try goes ko + psycopg2.IntegrityError('content already inserted'), + # ok then! + {'content:add': 1} + ] + + sample_content = sample_data['content'][0] + storage = RetryingProxyStorage(storage={'cls': 'memory'}) + + content = next(storage.content_get([sample_content['sha1']])) + assert not content + + s = storage.content_add([sample_content]) + assert s == { + 'content:add': 1, + } + + +def test_retrying_proxy_storage_failure_to_add(sample_data, mocker): + """Other errors are raising as usual + + """ + mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add') + mock_memory.side_effect = ValueError('Refuse to add content always!') + + sample_content = sample_data['content'][0] + storage = RetryingProxyStorage(storage={'cls': 'memory'}) + + content = next(storage.content_get([sample_content['sha1']])) + assert not content + + with pytest.raises(ValueError, match='Refuse to add'): + storage.content_add([sample_content])