diff --git a/swh/storage/proxies/retry.py b/swh/storage/proxies/retry.py index 31dff2f2..99756ae2 100644 --- a/swh/storage/proxies/retry.py +++ b/swh/storage/proxies/retry.py @@ -1,82 +1,101 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import traceback -from tenacity import retry, stop_after_attempt, wait_random_exponential +from tenacity import RetryCallState, retry, stop_after_attempt, wait_random_exponential +from tenacity.wait import wait_base +from swh.core.api import TransientRemoteException from swh.storage import get_storage from swh.storage.exc import StorageArgumentException from swh.storage.interface import StorageInterface logger = logging.getLogger(__name__) -def should_retry_adding(retry_state) -> bool: +def should_retry_adding(retry_state: RetryCallState) -> bool: """Retry if the error/exception is (probably) not about a caller error""" attempt = retry_state.outcome + assert attempt if attempt.failed: error = attempt.exception() if isinstance(error, StorageArgumentException): # Exception is due to an invalid argument return False elif isinstance(error, KeyboardInterrupt): return False else: # Other exception module = getattr(error, "__module__", None) if module: error_name = error.__module__ + "." + error.__class__.__name__ else: error_name = error.__class__.__name__ logger.warning( "Retrying RPC call", exc_info=False, extra={ "swh_type": "storage_retry", "swh_exception_type": error_name, "swh_exception": traceback.format_exc(), }, ) return True else: # No exception return False +class wait_transient_exceptions(wait_base): + """Wait longer when servers return HTTP 503.""" + + def __init__(self, wait: float) -> None: + self.wait = wait + + def __call__(self, retry_state: RetryCallState) -> float: + attempt = retry_state.outcome + assert attempt + + if attempt.failed and isinstance(attempt.exception(), TransientRemoteException): + return self.wait + else: + return 0.0 + + swh_retry = retry( retry=should_retry_adding, - wait=wait_random_exponential(multiplier=1, max=10), + wait=wait_random_exponential(multiplier=1, max=10) + wait_transient_exceptions(10), stop=stop_after_attempt(3), reraise=True, ) def retry_function(storage, attribute_name): @swh_retry def newf(*args, **kwargs): return getattr(storage, attribute_name)(*args, **kwargs) return newf class RetryingProxyStorage: """Storage implementation which retries adding objects when it specifically fails (hash collision, integrity error). """ def __init__(self, storage): self.storage: StorageInterface = get_storage(**storage) for attribute_name in dir(StorageInterface): if attribute_name.startswith("_"): continue attribute = getattr(self.storage, attribute_name) if hasattr(attribute, "__call__"): setattr( self, attribute_name, retry_function(self.storage, attribute_name) ) diff --git a/swh/storage/tests/test_retry.py b/swh/storage/tests/test_retry.py index 75f7586e..dde53861 100644 --- a/swh/storage/tests/test_retry.py +++ b/swh/storage/tests/test_retry.py @@ -1,219 +1,267 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from unittest.mock import call import attr import psycopg2 import pytest +from swh.core.api import TransientRemoteException from swh.storage.exc import HashCollision, StorageArgumentException from swh.storage.utils import now @pytest.fixture def monkeypatch_sleep(monkeypatch, swh_storage): """In test context, we don't want to wait, make test faster""" from swh.storage.proxies.retry import RetryingProxyStorage for method_name, method in RetryingProxyStorage.__dict__.items(): if "_add" in method_name or "_update" in method_name: monkeypatch.setattr(method.retry, "sleep", lambda x: None) return monkeypatch @pytest.fixture def fake_hash_collision(sample_data): return HashCollision("sha1", "38762cf7f55934b34d179ae6a4c80cadccbb7f0a", []) @pytest.fixture def swh_storage_backend_config(): yield { "cls": "pipeline", "steps": [ {"cls": "retry"}, {"cls": "memory"}, ], } def test_retrying_proxy_storage_content_add(swh_storage, sample_data): """Standard content_add works as before""" sample_content = sample_data.content content = swh_storage.content_get_data(sample_content.sha1) assert content is None s = swh_storage.content_add([sample_content]) assert s == { "content:add": 1, "content:add:bytes": sample_content.length, } content = swh_storage.content_get_data(sample_content.sha1) assert content == sample_content.data def test_retrying_proxy_storage_content_add_with_retry( monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision, ): """Multiple retries for hash collision and psycopg2 error but finally ok""" mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.content_add") mock_memory.side_effect = [ # first try goes ko fake_hash_collision, # second try goes ko psycopg2.IntegrityError("content already inserted"), # ok then! {"content:add": 1}, ] sample_content = sample_data.content + sleep = mocker.patch("time.sleep") content = swh_storage.content_get_data(sample_content.sha1) assert content is None s = swh_storage.content_add([sample_content]) assert s == {"content:add": 1} mock_memory.assert_has_calls( [ call([sample_content]), call([sample_content]), call([sample_content]), ] ) + assert len(sleep.mock_calls) == 2 + (_name, args1, _kwargs) = sleep.mock_calls[0] + (_name, args2, _kwargs) = sleep.mock_calls[1] + assert 0 < args1[0] < 1 + assert 0 < args2[0] < 2 + + +def test_retrying_proxy_storage_content_add_with_retry_of_transient( + monkeypatch_sleep, + swh_storage, + sample_data, + mocker, +): + """Multiple retries for hash collision and psycopg2 error but finally ok + after many attempts""" + mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.content_add") + mock_memory.side_effect = [ + TransientRemoteException("temporary failure"), + TransientRemoteException("temporary failure"), + # ok then! + {"content:add": 1}, + ] + + sample_content = sample_data.content + + content = swh_storage.content_get_data(sample_content.sha1) + assert content is None + + sleep = mocker.patch("time.sleep") + s = swh_storage.content_add([sample_content]) + assert s == {"content:add": 1} + + mock_memory.assert_has_calls( + [ + call([sample_content]), + call([sample_content]), + call([sample_content]), + ] + ) + + assert len(sleep.mock_calls) == 2 + (_name, args1, _kwargs) = sleep.mock_calls[0] + (_name, args2, _kwargs) = sleep.mock_calls[1] + assert 10 < args1[0] < 11 + assert 10 < args2[0] < 12 + def test_retrying_proxy_swh_storage_content_add_failure( swh_storage, sample_data, mocker ): """Unfiltered errors are raising without retry""" mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.content_add") mock_memory.side_effect = StorageArgumentException("Refuse to add content always!") sample_content = sample_data.content content = swh_storage.content_get_data(sample_content.sha1) assert content is None with pytest.raises(StorageArgumentException, match="Refuse to add"): swh_storage.content_add([sample_content]) assert mock_memory.call_count == 1 def test_retrying_proxy_storage_content_add_metadata(swh_storage, sample_data): """Standard content_add_metadata works as before""" sample_content = sample_data.content content = attr.evolve(sample_content, data=None) pk = content.sha1 content_metadata = swh_storage.content_get([pk]) assert content_metadata == [None] s = swh_storage.content_add_metadata([attr.evolve(content, ctime=now())]) assert s == { "content:add": 1, } content_metadata = swh_storage.content_get([pk]) assert len(content_metadata) == 1 assert content_metadata[0].sha1 == pk def test_retrying_proxy_storage_content_add_metadata_with_retry( monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision ): """Multiple retries for hash collision and psycopg2 error but finally ok""" mock_memory = mocker.patch( "swh.storage.in_memory.InMemoryStorage.content_add_metadata" ) mock_memory.side_effect = [ # first try goes ko fake_hash_collision, # second try goes ko psycopg2.IntegrityError("content_metadata already inserted"), # ok then! {"content:add": 1}, ] sample_content = sample_data.content content = attr.evolve(sample_content, data=None) s = swh_storage.content_add_metadata([content]) assert s == {"content:add": 1} mock_memory.assert_has_calls( [ call([content]), call([content]), call([content]), ] ) def test_retrying_proxy_swh_storage_content_add_metadata_failure( swh_storage, sample_data, mocker ): """Unfiltered errors are raising without retry""" mock_memory = mocker.patch( "swh.storage.in_memory.InMemoryStorage.content_add_metadata" ) mock_memory.side_effect = StorageArgumentException( "Refuse to add content_metadata!" ) sample_content = sample_data.content content = attr.evolve(sample_content, data=None) with pytest.raises(StorageArgumentException, match="Refuse to add"): swh_storage.content_add_metadata([content]) assert mock_memory.call_count == 1 def test_retrying_proxy_swh_storage_keyboardinterrupt(swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry""" mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.content_add") mock_memory.side_effect = KeyboardInterrupt() sample_content = sample_data.content content = swh_storage.content_get_data(sample_content.sha1) assert content is None with pytest.raises(KeyboardInterrupt): swh_storage.content_add([sample_content]) assert mock_memory.call_count == 1 def test_retrying_proxy_swh_storage_content_add_metadata_retry_failed( swh_storage, sample_data, mocker ): """When retrying fails every time, the last exception gets re-raised instead of a RetryError""" mock_memory = mocker.patch( "swh.storage.in_memory.InMemoryStorage.content_add_metadata" ) mock_memory.side_effect = [ ValueError(f"This will get raised eventually (attempt {i})") for i in range(1, 10) ] sample_content = sample_data.content content = attr.evolve(sample_content, data=None) with pytest.raises(ValueError, match="(attempt 3)"): swh_storage.content_add_metadata([content]) assert mock_memory.call_count == 3