diff --git a/mypy.ini b/mypy.ini index 43606ce6..4a09c588 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,36 +1,36 @@ [mypy] namespace_packages = True # due to the conditional import logic on swh.journal, in some cases a specific # type: ignore is needed, in other it isn't... warn_unused_ignores = False # support for sqlalchemy magic: see https://github.com/dropbox/sqlalchemy-stubs plugins = sqlmypy # 3rd party libraries without stubs (yet) # only shipped indirectly via hypothesis [mypy-django.*] ignore_missing_imports = True [mypy-pkg_resources.*] ignore_missing_imports = True [mypy-psycopg2.*] ignore_missing_imports = True [mypy-pytest.*] ignore_missing_imports = True -[mypy-retrying.*] +[mypy-tenacity.*] ignore_missing_imports = True # temporary work-around for landing typing support in spite of the current # journal<->storage dependency loop [mypy-swh.journal.*] ignore_missing_imports = True [mypy-pytest_postgresql.*] ignore_missing_imports = True diff --git a/requirements.txt b/requirements.txt index 96791c82..daf673a8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ click flask psycopg2 python-dateutil vcversioner aiohttp -retrying +tenacity diff --git a/swh/storage/retry.py b/swh/storage/retry.py index 01ccc51b..67158cd3 100644 --- a/swh/storage/retry.py +++ b/swh/storage/retry.py @@ -1,113 +1,119 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import psycopg2 import traceback from datetime import datetime from typing import Dict, List, Optional, Union -from retrying import retry +from requests.exceptions import ConnectionError +from tenacity import ( + retry, stop_after_attempt, wait_random_exponential, retry_if_exception_type +) from swh.storage import get_storage, HashCollision logger = logging.getLogger(__name__) RETRY_EXCEPTIONS = [ # raised when two parallel insertions insert the same data psycopg2.IntegrityError, HashCollision, + # when the server is restarting + ConnectionError, ] def should_retry_adding(error: Exception) -> bool: - """Retry policy when some kind of failures occur (database integrity error, - hash collision, etc...) + """Retry if the error/exception if one of the RETRY_EXCEPTIONS type. """ - retry = any(isinstance(error, exc) for exc in RETRY_EXCEPTIONS) - if retry: - error_name = error.__module__ + '.' + error.__class__.__name__ - logger.warning('Retry adding a batch', exc_info=False, extra={ - 'swh_type': 'storage_retry', - 'swh_exception_type': error_name, - 'swh_exception': traceback.format_exception( - error.__class__, - error, - error.__traceback__, - ), - }) - return retry + for exc in RETRY_EXCEPTIONS: + if retry_if_exception_type(exc)(error): + error_name = error.__module__ + '.' + error.__class__.__name__ + logger.warning('Retry adding a batch', exc_info=False, extra={ + 'swh_type': 'storage_retry', + 'swh_exception_type': error_name, + 'swh_exception': traceback.format_exc(), + }) + return True + return False + + +swh_retry = retry(retry=should_retry_adding, + wait=wait_random_exponential(multiplier=1, max=10), + stop=stop_after_attempt(3)) class RetryingProxyStorage: """Storage implementation which retries adding objects when it specifically fails (hash collision, integrity error). """ def __init__(self, storage): self.storage = get_storage(**storage) def __getattr__(self, key): return getattr(self.storage, key) - @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) + @swh_retry def content_add(self, content: List[Dict]) -> Dict: return self.storage.content_add(content) - @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) + @swh_retry def origin_add_one(self, origin: Dict) -> str: return self.storage.origin_add_one(origin) - @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) + @swh_retry def origin_visit_add(self, origin: Dict, date: Union[datetime, str], type: str) -> Dict: return self.storage.origin_visit_add(origin, date, type) - @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) + @swh_retry def origin_visit_update( self, origin: str, visit_id: int, status: Optional[str] = None, metadata: Optional[Dict] = None, snapshot: Optional[Dict] = None) -> Dict: return self.storage.origin_visit_update( origin, visit_id, status=status, metadata=metadata, snapshot=snapshot) - @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) + @swh_retry def tool_add(self, tools: List[Dict]) -> List[Dict]: return self.storage.tool_add(tools) - @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) + @swh_retry def metadata_provider_add( self, provider_name: str, provider_type: str, provider_url: str, metadata: Dict) -> Union[str, int]: return self.storage.metadata_provider_add( provider_name, provider_type, provider_url, metadata) - @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) + @swh_retry def origin_metadata_add( self, origin_url: str, ts: Union[str, datetime], provider_id: int, tool_id: int, metadata: Dict) -> None: return self.storage.origin_metadata_add( origin_url, ts, provider_id, tool_id, metadata) - @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) + @swh_retry def directory_add(self, directories: List[Dict]) -> Dict: return self.storage.directory_add(directories) - @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) + @swh_retry def revision_add(self, revisions: List[Dict]) -> Dict: return self.storage.revision_add(revisions) - @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) + @swh_retry def release_add(self, releases: List[Dict]) -> Dict: return self.storage.release_add(releases) - @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) + @swh_retry def snapshot_add(self, snapshot: List[Dict]) -> Dict: return self.storage.snapshot_add(snapshot) diff --git a/swh/storage/tests/test_retry.py b/swh/storage/tests/test_retry.py index 0b28d18a..343d54f1 100644 --- a/swh/storage/tests/test_retry.py +++ b/swh/storage/tests/test_retry.py @@ -1,833 +1,815 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import psycopg2 import pytest from typing import Dict from unittest.mock import call from swh.storage import HashCollision -from swh.storage.retry import ( - RetryingProxyStorage, should_retry_adding, RETRY_EXCEPTIONS -) - - -def test_should_retry_adding(): - """Specific exceptions should be elected for retrial - - """ - for exc in RETRY_EXCEPTIONS: - assert should_retry_adding(exc('error')) is True - - -def test_should_retry_adding_no_retry(): - """Unspecific exceptions should raise as usual - - """ - for exc in [ValueError, Exception]: - assert should_retry_adding(exc('fail!')) is False +from swh.storage.retry import RetryingProxyStorage @pytest.fixture def swh_storage(): return RetryingProxyStorage(storage={'cls': 'memory'}) def test_retrying_proxy_storage_content_add(swh_storage, sample_data): """Standard content_add works as before """ sample_content = sample_data['content'][0] content = next(swh_storage.content_get([sample_content['sha1']])) assert not content s = swh_storage.content_add([sample_content]) assert s == { 'content:add': 1, 'content:add:bytes': sample_content['length'], 'skipped_content:add': 0 } content = next(swh_storage.content_get([sample_content['sha1']])) assert content['sha1'] == sample_content['sha1'] def test_retrying_proxy_storage_content_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add') mock_memory.side_effect = [ # first try goes ko HashCollision('content hash collision'), # second try goes ko psycopg2.IntegrityError('content already inserted'), # ok then! {'content:add': 1} ] sample_content = sample_data['content'][0] content = next(swh_storage.content_get([sample_content['sha1']])) assert not content s = swh_storage.content_add([sample_content]) assert s == {'content:add': 1} assert mock_memory.has_calls([ call([sample_content]), call([sample_content]), call([sample_content]), ]) def test_retrying_proxy_swh_storage_content_add_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add') mock_memory.side_effect = ValueError('Refuse to add content always!') sample_content = sample_data['content'][0] content = next(swh_storage.content_get([sample_content['sha1']])) assert not content with pytest.raises(ValueError, match='Refuse to add'): swh_storage.content_add([sample_content]) assert mock_memory.call_count == 1 def test_retrying_proxy_swh_storage_origin_add_one(swh_storage, sample_data): """Standard origin_add_one works as before """ sample_origin = sample_data['origin'][0] origin = swh_storage.origin_get(sample_origin) assert not origin r = swh_storage.origin_add_one(sample_origin) assert r == sample_origin['url'] origin = swh_storage.origin_get(sample_origin) assert origin['url'] == sample_origin['url'] def test_retrying_proxy_swh_storage_origin_add_one_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ sample_origin = sample_data['origin'][1] mock_memory = mocker.patch('swh.storage.in_memory.Storage.origin_add_one') mock_memory.side_effect = [ # first try goes ko HashCollision('origin hash collision'), # second try goes ko psycopg2.IntegrityError('origin already inserted'), # ok then! sample_origin['url'] ] origin = swh_storage.origin_get(sample_origin) assert not origin r = swh_storage.origin_add_one(sample_origin) assert r == sample_origin['url'] assert mock_memory.has_calls([ call([sample_origin]), call([sample_origin]), call([sample_origin]), ]) def test_retrying_proxy_swh_storage_origin_add_one_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.origin_add_one') mock_memory.side_effect = ValueError('Refuse to add origin always!') sample_origin = sample_data['origin'][0] origin = swh_storage.origin_get(sample_origin) assert not origin with pytest.raises(ValueError, match='Refuse to add'): swh_storage.origin_add_one([sample_origin]) assert mock_memory.call_count == 1 def test_retrying_proxy_swh_storage_origin_visit_add(swh_storage, sample_data): """Standard origin_visit_add works as before """ sample_origin = sample_data['origin'][0] swh_storage.origin_add_one(sample_origin) origin_url = sample_origin['url'] origin = list(swh_storage.origin_visit_get(origin_url)) assert not origin origin_visit = swh_storage.origin_visit_add( origin_url, '2020-01-01', 'hg') assert origin_visit['origin'] == origin_url assert isinstance(origin_visit['visit'], int) origin_visit = next(swh_storage.origin_visit_get(origin_url)) assert origin_visit['origin'] == origin_url assert isinstance(origin_visit['visit'], int) def test_retrying_proxy_swh_storage_origin_visit_add_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ sample_origin = sample_data['origin'][1] swh_storage.origin_add_one(sample_origin) origin_url = sample_origin['url'] mock_memory = mocker.patch( 'swh.storage.in_memory.Storage.origin_visit_add') mock_memory.side_effect = [ # first try goes ko HashCollision('origin hash collision'), # second try goes ko psycopg2.IntegrityError('origin already inserted'), # ok then! {'origin': origin_url, 'visit': 1} ] origin = list(swh_storage.origin_visit_get(origin_url)) assert not origin r = swh_storage.origin_visit_add(sample_origin, '2020-01-01', 'git') assert r == {'origin': origin_url, 'visit': 1} assert mock_memory.has_calls([ call(sample_origin, '2020-01-01', 'git'), call(sample_origin, '2020-01-01', 'git'), call(sample_origin, '2020-01-01', 'git') ]) def test_retrying_proxy_swh_storage_origin_visit_add_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch( 'swh.storage.in_memory.Storage.origin_visit_add') mock_memory.side_effect = ValueError('Refuse to add origin always!') origin_url = sample_data['origin'][0]['url'] origin = list(swh_storage.origin_visit_get(origin_url)) assert not origin with pytest.raises(ValueError, match='Refuse to add'): swh_storage.origin_visit_add(origin_url, '2020-01-31', 'svn') assert mock_memory.has_calls([ call(origin_url, '2020-01-31', 'svn'), ]) def test_retrying_proxy_storage_tool_add(swh_storage, sample_data): """Standard tool_add works as before """ sample_tool = sample_data['tool'][0] tool = swh_storage.tool_get(sample_tool) assert not tool tools = swh_storage.tool_add([sample_tool]) assert tools tool = tools[0] tool.pop('id') assert tool == sample_tool tool = swh_storage.tool_get(sample_tool) tool.pop('id') assert tool == sample_tool def test_retrying_proxy_storage_tool_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ sample_tool = sample_data['tool'][0] mock_memory = mocker.patch('swh.storage.in_memory.Storage.tool_add') mock_memory.side_effect = [ # first try goes ko HashCollision('tool hash collision'), # second try goes ko psycopg2.IntegrityError('tool already inserted'), # ok then! [sample_tool] ] tool = swh_storage.tool_get(sample_tool) assert not tool tools = swh_storage.tool_add([sample_tool]) assert tools == [sample_tool] assert mock_memory.has_calls([ call([sample_tool]), call([sample_tool]), call([sample_tool]), ]) def test_retrying_proxy_swh_storage_tool_add_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.tool_add') mock_memory.side_effect = ValueError('Refuse to add tool always!') sample_tool = sample_data['tool'][0] tool = swh_storage.tool_get(sample_tool) assert not tool with pytest.raises(ValueError, match='Refuse to add'): swh_storage.tool_add([sample_tool]) assert mock_memory.call_count == 1 def to_provider(provider: Dict) -> Dict: return { 'provider_name': provider['name'], 'provider_url': provider['url'], 'provider_type': provider['type'], 'metadata': provider['metadata'], } def test_retrying_proxy_storage_metadata_provider_add( swh_storage, sample_data): """Standard metadata_provider_add works as before """ provider = sample_data['provider'][0] provider_get = to_provider(provider) provider = swh_storage.metadata_provider_get_by(provider_get) assert not provider provider_id = swh_storage.metadata_provider_add(**provider_get) assert provider_id actual_provider = swh_storage.metadata_provider_get(provider_id) assert actual_provider actual_provider_id = actual_provider.pop('id') assert actual_provider_id == provider_id assert actual_provider == provider_get def test_retrying_proxy_storage_metadata_provider_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ provider = sample_data['provider'][0] provider_get = to_provider(provider) mock_memory = mocker.patch( 'swh.storage.in_memory.Storage.metadata_provider_add') mock_memory.side_effect = [ # first try goes ko HashCollision('provider_id hash collision'), # second try goes ko psycopg2.IntegrityError('provider_id already inserted'), # ok then! 'provider_id', ] provider = swh_storage.metadata_provider_get_by(provider_get) assert not provider provider_id = swh_storage.metadata_provider_add(**provider_get) assert provider_id == 'provider_id' assert mock_memory.has_calls([ call(**provider_get), call(**provider_get), call(**provider_get), ]) def test_retrying_proxy_swh_storage_metadata_provider_add_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch( 'swh.storage.in_memory.Storage.metadata_provider_add') mock_memory.side_effect = ValueError('Refuse to add provider_id always!') provider = sample_data['provider'][0] provider_get = to_provider(provider) provider_id = swh_storage.metadata_provider_get_by(provider_get) assert not provider_id with pytest.raises(ValueError, match='Refuse to add'): swh_storage.metadata_provider_add(**provider_get) assert mock_memory.call_count == 1 def test_retrying_proxy_storage_origin_metadata_add( swh_storage, sample_data): """Standard origin_metadata_add works as before """ ori_meta = sample_data['origin_metadata'][0] origin = ori_meta['origin'] swh_storage.origin_add_one(origin) provider_get = to_provider(ori_meta['provider']) provider_id = swh_storage.metadata_provider_add(**provider_get) origin_metadata = swh_storage.origin_metadata_get_by(origin['url']) assert not origin_metadata swh_storage.origin_metadata_add( origin['url'], ori_meta['discovery_date'], provider_id, ori_meta['tool'], ori_meta['metadata']) origin_metadata = swh_storage.origin_metadata_get_by( origin['url']) assert origin_metadata def test_retrying_proxy_storage_origin_metadata_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ ori_meta = sample_data['origin_metadata'][0] origin = ori_meta['origin'] swh_storage.origin_add_one(origin) provider_get = to_provider(ori_meta['provider']) provider_id = swh_storage.metadata_provider_add(**provider_get) mock_memory = mocker.patch( 'swh.storage.in_memory.Storage.origin_metadata_add') mock_memory.side_effect = [ # first try goes ko HashCollision('provider_id hash collision'), # second try goes ko psycopg2.IntegrityError('provider_id already inserted'), # ok then! None ] url = origin['url'] ts = ori_meta['discovery_date'] tool_id = ori_meta['tool'] metadata = ori_meta['metadata'] # No exception raised as insertion finally came through swh_storage.origin_metadata_add(url, ts, provider_id, tool_id, metadata) mock_memory.assert_has_calls([ # 3 calls, as long as error raised call(url, ts, provider_id, tool_id, metadata), call(url, ts, provider_id, tool_id, metadata), call(url, ts, provider_id, tool_id, metadata) ]) def test_retrying_proxy_swh_storage_origin_metadata_add_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch( 'swh.storage.in_memory.Storage.origin_metadata_add') mock_memory.side_effect = ValueError('Refuse to add always!') ori_meta = sample_data['origin_metadata'][0] origin = ori_meta['origin'] swh_storage.origin_add_one(origin) url = origin['url'] ts = ori_meta['discovery_date'] provider_id = 'provider_id' tool_id = ori_meta['tool'] metadata = ori_meta['metadata'] with pytest.raises(ValueError, match='Refuse to add'): swh_storage.origin_metadata_add(url, ts, provider_id, tool_id, metadata) assert mock_memory.call_count == 1 def test_retrying_proxy_swh_storage_origin_visit_update( swh_storage, sample_data): """Standard origin_visit_update works as before """ sample_origin = sample_data['origin'][0] swh_storage.origin_add_one(sample_origin) origin_url = sample_origin['url'] origin_visit = swh_storage.origin_visit_add( origin_url, '2020-01-01', 'hg') ov = next(swh_storage.origin_visit_get(origin_url)) assert ov['origin'] == origin_url assert ov['visit'] == origin_visit['visit'] assert ov['status'] == 'ongoing' assert ov['snapshot'] is None assert ov['metadata'] is None swh_storage.origin_visit_update(origin_url, ov['visit'], status='full') ov = next(swh_storage.origin_visit_get(origin_url)) assert ov['origin'] == origin_url assert ov['visit'] == origin_visit['visit'] assert ov['status'] == 'full' assert ov['snapshot'] is None assert ov['metadata'] is None def test_retrying_proxy_swh_storage_origin_visit_update_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ sample_origin = sample_data['origin'][1] origin_url = sample_origin['url'] mock_memory = mocker.patch( 'swh.storage.in_memory.Storage.origin_visit_update') mock_memory.side_effect = [ # first try goes ko HashCollision('origin hash collision'), # second try goes ko psycopg2.IntegrityError('origin already inserted'), # ok then! {'origin': origin_url, 'visit': 1} ] visit_id = 1 swh_storage.origin_visit_update(origin_url, visit_id, status='full') assert mock_memory.has_calls([ call(origin_url, visit_id, status='full'), call(origin_url, visit_id, status='full'), call(origin_url, visit_id, status='full'), ]) def test_retrying_proxy_swh_storage_origin_visit_update_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch( 'swh.storage.in_memory.Storage.origin_visit_update') mock_memory.side_effect = ValueError('Refuse to add origin always!') origin_url = sample_data['origin'][0]['url'] visit_id = 9 with pytest.raises(ValueError, match='Refuse to add'): swh_storage.origin_visit_update(origin_url, visit_id, 'partial') assert mock_memory.call_count == 1 def test_retrying_proxy_storage_directory_add(swh_storage, sample_data): """Standard directory_add works as before """ sample_dir = sample_data['directory'][0] directory = swh_storage.directory_get_random() # no directory assert not directory s = swh_storage.directory_add([sample_dir]) assert s == { 'directory:add': 1, } directory_id = swh_storage.directory_get_random() # only 1 assert directory_id == sample_dir['id'] def test_retrying_proxy_storage_directory_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.directory_add') mock_memory.side_effect = [ # first try goes ko HashCollision('directory hash collision'), # second try goes ko psycopg2.IntegrityError('directory already inserted'), # ok then! {'directory:add': 1} ] sample_dir = sample_data['directory'][1] directory_id = swh_storage.directory_get_random() # no directory assert not directory_id s = swh_storage.directory_add([sample_dir]) assert s == { 'directory:add': 1, } assert mock_memory.has_calls([ call([sample_dir]), call([sample_dir]), call([sample_dir]), ]) def test_retrying_proxy_swh_storage_directory_add_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.directory_add') mock_memory.side_effect = ValueError('Refuse to add directory always!') sample_dir = sample_data['directory'][0] directory_id = swh_storage.directory_get_random() # no directory assert not directory_id with pytest.raises(ValueError, match='Refuse to add'): swh_storage.directory_add([sample_dir]) assert mock_memory.call_count == 1 def test_retrying_proxy_storage_revision_add(swh_storage, sample_data): """Standard revision_add works as before """ sample_rev = sample_data['revision'][0] revision = next(swh_storage.revision_get([sample_rev['id']])) assert not revision s = swh_storage.revision_add([sample_rev]) assert s == { 'revision:add': 1, } revision = next(swh_storage.revision_get([sample_rev['id']])) assert revision['id'] == sample_rev['id'] def test_retrying_proxy_storage_revision_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.revision_add') mock_memory.side_effect = [ # first try goes ko HashCollision('revision hash collision'), # second try goes ko psycopg2.IntegrityError('revision already inserted'), # ok then! {'revision:add': 1} ] sample_rev = sample_data['revision'][0] revision = next(swh_storage.revision_get([sample_rev['id']])) assert not revision s = swh_storage.revision_add([sample_rev]) assert s == { 'revision:add': 1, } assert mock_memory.has_calls([ call([sample_rev]), call([sample_rev]), call([sample_rev]), ]) def test_retrying_proxy_swh_storage_revision_add_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.revision_add') mock_memory.side_effect = ValueError('Refuse to add revision always!') sample_rev = sample_data['revision'][0] revision = next(swh_storage.revision_get([sample_rev['id']])) assert not revision with pytest.raises(ValueError, match='Refuse to add'): swh_storage.revision_add([sample_rev]) assert mock_memory.call_count == 1 def test_retrying_proxy_storage_release_add(swh_storage, sample_data): """Standard release_add works as before """ sample_rel = sample_data['release'][0] release = next(swh_storage.release_get([sample_rel['id']])) assert not release s = swh_storage.release_add([sample_rel]) assert s == { 'release:add': 1, } release = next(swh_storage.release_get([sample_rel['id']])) assert release['id'] == sample_rel['id'] def test_retrying_proxy_storage_release_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.release_add') mock_memory.side_effect = [ # first try goes ko HashCollision('release hash collision'), # second try goes ko psycopg2.IntegrityError('release already inserted'), # ok then! {'release:add': 1} ] sample_rel = sample_data['release'][0] release = next(swh_storage.release_get([sample_rel['id']])) assert not release s = swh_storage.release_add([sample_rel]) assert s == { 'release:add': 1, } assert mock_memory.has_calls([ call([sample_rel]), call([sample_rel]), call([sample_rel]), ]) def test_retrying_proxy_swh_storage_release_add_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.release_add') mock_memory.side_effect = ValueError('Refuse to add release always!') sample_rel = sample_data['release'][0] release = next(swh_storage.release_get([sample_rel['id']])) assert not release with pytest.raises(ValueError, match='Refuse to add'): swh_storage.release_add([sample_rel]) assert mock_memory.call_count == 1 def test_retrying_proxy_storage_snapshot_add(swh_storage, sample_data): """Standard snapshot_add works as before """ sample_snap = sample_data['snapshot'][0] snapshot = swh_storage.snapshot_get(sample_snap['id']) assert not snapshot s = swh_storage.snapshot_add([sample_snap]) assert s == { 'snapshot:add': 1, } snapshot = swh_storage.snapshot_get(sample_snap['id']) assert snapshot['id'] == sample_snap['id'] def test_retrying_proxy_storage_snapshot_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.snapshot_add') mock_memory.side_effect = [ # first try goes ko HashCollision('snapshot hash collision'), # second try goes ko psycopg2.IntegrityError('snapshot already inserted'), # ok then! {'snapshot:add': 1} ] sample_snap = sample_data['snapshot'][0] snapshot = swh_storage.snapshot_get(sample_snap['id']) assert not snapshot s = swh_storage.snapshot_add([sample_snap]) assert s == { 'snapshot:add': 1, } assert mock_memory.has_calls([ call([sample_snap]), call([sample_snap]), call([sample_snap]), ]) def test_retrying_proxy_swh_storage_snapshot_add_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.snapshot_add') mock_memory.side_effect = ValueError('Refuse to add snapshot always!') sample_snap = sample_data['snapshot'][0] snapshot = swh_storage.snapshot_get(sample_snap['id']) assert not snapshot with pytest.raises(ValueError, match='Refuse to add'): swh_storage.snapshot_add([sample_snap]) assert mock_memory.call_count == 1