diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -24,7 +24,7 @@ [mypy-pytest.*] ignore_missing_imports = True -[mypy-retrying.*] +[mypy-tenacity.*] ignore_missing_imports = True # temporary work-around for landing typing support in spite of the current diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,4 @@ python-dateutil vcversioner aiohttp -retrying +tenacity diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -581,8 +581,11 @@ """Finds a random directory id. Returns: - a sha1_git + a sha1_git if any + """ + if not self._directories: + return None return random.choice(list(self._directories)) def _directory_entry_get_by_path(self, directory, paths, prefix): diff --git a/swh/storage/retry.py b/swh/storage/retry.py --- a/swh/storage/retry.py +++ b/swh/storage/retry.py @@ -7,9 +7,13 @@ import psycopg2 import traceback -from typing import Dict, List +from datetime import datetime +from typing import Dict, List, Optional, Union -from retrying import retry +from requests.exceptions import ConnectionError +from tenacity import ( + retry, stop_after_attempt, wait_random_exponential, retry_if_exception_type +) from swh.storage import get_storage, HashCollision @@ -21,27 +25,30 @@ # raised when two parallel insertions insert the same data psycopg2.IntegrityError, HashCollision, + # when the server is restarting + ConnectionError, ] def should_retry_adding(error: Exception) -> bool: - """Retry policy when some kind of failures occur (database integrity error, - hash collision, etc...) + """Retry if the error/exception if one of the RETRY_EXCEPTIONS type. """ - retry = any(isinstance(error, exc) for exc in RETRY_EXCEPTIONS) - if retry: - error_name = error.__module__ + '.' + error.__class__.__name__ - logger.warning('Retry adding a batch', exc_info=False, extra={ - 'swh_type': 'storage_retry', - 'swh_exception_type': error_name, - 'swh_exception': traceback.format_exception( - error.__class__, - error, - error.__traceback__, - ), - }) - return retry + for exc in RETRY_EXCEPTIONS: + if retry_if_exception_type(exc)(error): + error_name = error.__module__ + '.' + error.__class__.__name__ + logger.warning('Retry adding a batch', exc_info=False, extra={ + 'swh_type': 'storage_retry', + 'swh_exception_type': error_name, + 'swh_exception': traceback.format_exc(), + }) + return True + return False + + +swh_retry = retry(retry=should_retry_adding, + wait=wait_random_exponential(multiplier=1, max=10), + stop=stop_after_attempt(3)) class RetryingProxyStorage: @@ -55,6 +62,62 @@ def __getattr__(self, key): return getattr(self.storage, key) - @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) + @swh_retry def content_add(self, content: List[Dict]) -> Dict: return self.storage.content_add(content) + + @swh_retry + def content_add_metadata(self, content: List[Dict]) -> Dict: + return self.storage.content_add_metadata(content) + + @swh_retry + def origin_add_one(self, origin: Dict) -> str: + return self.storage.origin_add_one(origin) + + @swh_retry + def origin_visit_add(self, origin: Dict, + date: Union[datetime, str], type: str) -> Dict: + return self.storage.origin_visit_add(origin, date, type) + + @swh_retry + def origin_visit_update( + self, origin: str, visit_id: int, status: Optional[str] = None, + metadata: Optional[Dict] = None, + snapshot: Optional[Dict] = None) -> Dict: + return self.storage.origin_visit_update( + origin, visit_id, status=status, + metadata=metadata, snapshot=snapshot) + + @swh_retry + def tool_add(self, tools: List[Dict]) -> List[Dict]: + return self.storage.tool_add(tools) + + @swh_retry + def metadata_provider_add( + self, provider_name: str, provider_type: str, provider_url: str, + metadata: Dict) -> Union[str, int]: + return self.storage.metadata_provider_add( + provider_name, provider_type, provider_url, metadata) + + @swh_retry + def origin_metadata_add( + self, origin_url: str, ts: Union[str, datetime], + provider_id: int, tool_id: int, metadata: Dict) -> None: + return self.storage.origin_metadata_add( + origin_url, ts, provider_id, tool_id, metadata) + + @swh_retry + def directory_add(self, directories: List[Dict]) -> Dict: + return self.storage.directory_add(directories) + + @swh_retry + def revision_add(self, revisions: List[Dict]) -> Dict: + return self.storage.revision_add(revisions) + + @swh_retry + def release_add(self, releases: List[Dict]) -> Dict: + return self.storage.release_add(releases) + + @swh_retry + def snapshot_add(self, snapshot: List[Dict]) -> Dict: + return self.storage.snapshot_add(snapshot) diff --git a/swh/storage/tests/conftest.py b/swh/storage/tests/conftest.py --- a/swh/storage/tests/conftest.py +++ b/swh/storage/tests/conftest.py @@ -209,15 +209,22 @@ """Pre-defined sample storage object data to manipulate Returns: - Dict of data (keys: content, directory, revision, release, person) + Dict of data (keys: content, directory, revision, release, person, + origin) """ from .storage_data import data return { 'content': [data.cont, data.cont2], + 'content_metadata': [data.cont3], 'person': [data.person], - 'directory': [data.dir2], + 'directory': [data.dir2, data.dir], 'revision': [data.revision], 'release': [data.release, data.release2, data.release3], + 'snapshot': [data.snapshot], + 'origin': [data.origin, data.origin2], + 'tool': [data.metadata_tool], + 'provider': [data.provider], + 'origin_metadata': [data.origin_metadata, data.origin_metadata2], } diff --git a/swh/storage/tests/storage_data.py b/swh/storage/tests/storage_data.py --- a/swh/storage/tests/storage_data.py +++ b/swh/storage/tests/storage_data.py @@ -62,6 +62,7 @@ 'blake2s256': hash_to_bytes( '76d0346f44e5a27f6bafdd9c2befd304aff83780f93121d801ab6a1d4769db11'), 'status': 'visible', + 'ctime': '2019-12-01', } contents = (cont, cont2, cont3) diff --git a/swh/storage/tests/test_retry.py b/swh/storage/tests/test_retry.py --- a/swh/storage/tests/test_retry.py +++ b/swh/storage/tests/test_retry.py @@ -6,47 +6,41 @@ import psycopg2 import pytest -from swh.storage import HashCollision -from swh.storage.retry import ( - RetryingProxyStorage, should_retry_adding, RETRY_EXCEPTIONS -) - - -def test_should_retry_adding(): - """Specific exceptions should be elected for retrial +from typing import Dict - """ - for exc in RETRY_EXCEPTIONS: - assert should_retry_adding(exc('error')) is True +from unittest.mock import call +from swh.storage import HashCollision +from swh.storage.retry import RetryingProxyStorage -def test_should_retry_adding_no_retry(): - """Unspecific exceptions should raise as usual - """ - for exc in [ValueError, Exception]: - assert should_retry_adding(exc('fail!')) is False +@pytest.fixture +def swh_storage(): + return RetryingProxyStorage(storage={'cls': 'memory'}) -def test_retrying_proxy_storage_content(sample_data): +def test_retrying_proxy_storage_content_add(swh_storage, sample_data): """Standard content_add works as before """ sample_content = sample_data['content'][0] - storage = RetryingProxyStorage(storage={'cls': 'memory'}) - content = next(storage.content_get([sample_content['sha1']])) + content = next(swh_storage.content_get([sample_content['sha1']])) assert not content - s = storage.content_add([sample_content]) + s = swh_storage.content_add([sample_content]) assert s == { 'content:add': 1, 'content:add:bytes': sample_content['length'], 'skipped_content:add': 0 } + content = next(swh_storage.content_get([sample_content['sha1']])) + assert content['sha1'] == sample_content['sha1'] -def test_retrying_proxy_storage_with_retry(sample_data, mocker): + +def test_retrying_proxy_storage_content_add_with_retry( + swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ @@ -61,29 +55,831 @@ ] sample_content = sample_data['content'][0] - storage = RetryingProxyStorage(storage={'cls': 'memory'}) - content = next(storage.content_get([sample_content['sha1']])) + content = next(swh_storage.content_get([sample_content['sha1']])) assert not content - s = storage.content_add([sample_content]) - assert s == { - 'content:add': 1, - } + s = swh_storage.content_add([sample_content]) + assert s == {'content:add': 1} + + assert mock_memory.has_calls([ + call([sample_content]), + call([sample_content]), + call([sample_content]), + ]) -def test_retrying_proxy_storage_failure_to_add(sample_data, mocker): - """Other errors are raising as usual +def test_retrying_proxy_swh_storage_content_add_failure( + swh_storage, sample_data, mocker): + """Unfiltered errors are raising without retry """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add') mock_memory.side_effect = ValueError('Refuse to add content always!') sample_content = sample_data['content'][0] - storage = RetryingProxyStorage(storage={'cls': 'memory'}) - content = next(storage.content_get([sample_content['sha1']])) + content = next(swh_storage.content_get([sample_content['sha1']])) assert not content with pytest.raises(ValueError, match='Refuse to add'): - storage.content_add([sample_content]) + swh_storage.content_add([sample_content]) + + assert mock_memory.call_count == 1 + + +def test_retrying_proxy_storage_content_add_metadata(swh_storage, sample_data): + """Standard content_add_metadata works as before + + """ + sample_content = sample_data['content_metadata'][0] + + pk = sample_content['sha1'] + content_metadata = swh_storage.content_get_metadata([pk]) + assert not content_metadata[pk] + + s = swh_storage.content_add_metadata([sample_content]) + assert s == { + 'content:add': 1, + 'skipped_content:add': 0 + } + + content_metadata = swh_storage.content_get_metadata([pk]) + assert len(content_metadata[pk]) == 1 + assert content_metadata[pk][0]['sha1'] == pk + + +def test_retrying_proxy_storage_content_add_metadata_with_retry( + swh_storage, sample_data, mocker): + """Multiple retries for hash collision and psycopg2 error but finally ok + + """ + mock_memory = mocker.patch( + 'swh.storage.in_memory.Storage.content_add_metadata') + mock_memory.side_effect = [ + # first try goes ko + HashCollision('content_metadata hash collision'), + # second try goes ko + psycopg2.IntegrityError('content_metadata already inserted'), + # ok then! + {'content:add': 1} + ] + + sample_content = sample_data['content_metadata'][0] + + s = swh_storage.content_add_metadata([sample_content]) + assert s == {'content:add': 1} + + assert mock_memory.has_calls([ + call([sample_content]), + call([sample_content]), + call([sample_content]), + ]) + + +def test_retrying_proxy_swh_storage_content_add_metadata_failure( + swh_storage, sample_data, mocker): + """Unfiltered errors are raising without retry + + """ + mock_memory = mocker.patch( + 'swh.storage.in_memory.Storage.content_add_metadata') + mock_memory.side_effect = ValueError('Refuse to add content_metadata!') + + sample_content = sample_data['content_metadata'][0] + pk = sample_content['sha1'] + + content_metadata = swh_storage.content_get_metadata([pk]) + assert not content_metadata[pk] + + with pytest.raises(ValueError, match='Refuse to add'): + swh_storage.content_add_metadata([sample_content]) + + assert mock_memory.call_count == 1 + + +def test_retrying_proxy_swh_storage_origin_add_one(swh_storage, sample_data): + """Standard origin_add_one works as before + + """ + sample_origin = sample_data['origin'][0] + + origin = swh_storage.origin_get(sample_origin) + assert not origin + + r = swh_storage.origin_add_one(sample_origin) + assert r == sample_origin['url'] + + origin = swh_storage.origin_get(sample_origin) + assert origin['url'] == sample_origin['url'] + + +def test_retrying_proxy_swh_storage_origin_add_one_retry( + swh_storage, sample_data, mocker): + """Multiple retries for hash collision and psycopg2 error but finally ok + + """ + sample_origin = sample_data['origin'][1] + mock_memory = mocker.patch('swh.storage.in_memory.Storage.origin_add_one') + mock_memory.side_effect = [ + # first try goes ko + HashCollision('origin hash collision'), + # second try goes ko + psycopg2.IntegrityError('origin already inserted'), + # ok then! + sample_origin['url'] + ] + + origin = swh_storage.origin_get(sample_origin) + assert not origin + + r = swh_storage.origin_add_one(sample_origin) + assert r == sample_origin['url'] + + assert mock_memory.has_calls([ + call([sample_origin]), + call([sample_origin]), + call([sample_origin]), + ]) + + +def test_retrying_proxy_swh_storage_origin_add_one_failure( + swh_storage, sample_data, mocker): + """Unfiltered errors are raising without retry + + """ + mock_memory = mocker.patch('swh.storage.in_memory.Storage.origin_add_one') + mock_memory.side_effect = ValueError('Refuse to add origin always!') + + sample_origin = sample_data['origin'][0] + + origin = swh_storage.origin_get(sample_origin) + assert not origin + + with pytest.raises(ValueError, match='Refuse to add'): + swh_storage.origin_add_one([sample_origin]) + + assert mock_memory.call_count == 1 + + +def test_retrying_proxy_swh_storage_origin_visit_add(swh_storage, sample_data): + """Standard origin_visit_add works as before + + """ + sample_origin = sample_data['origin'][0] + swh_storage.origin_add_one(sample_origin) + origin_url = sample_origin['url'] + + origin = list(swh_storage.origin_visit_get(origin_url)) + assert not origin + + origin_visit = swh_storage.origin_visit_add( + origin_url, '2020-01-01', 'hg') + assert origin_visit['origin'] == origin_url + assert isinstance(origin_visit['visit'], int) + + origin_visit = next(swh_storage.origin_visit_get(origin_url)) + assert origin_visit['origin'] == origin_url + assert isinstance(origin_visit['visit'], int) + + +def test_retrying_proxy_swh_storage_origin_visit_add_retry( + swh_storage, sample_data, mocker): + """Multiple retries for hash collision and psycopg2 error but finally ok + + """ + sample_origin = sample_data['origin'][1] + swh_storage.origin_add_one(sample_origin) + origin_url = sample_origin['url'] + + mock_memory = mocker.patch( + 'swh.storage.in_memory.Storage.origin_visit_add') + mock_memory.side_effect = [ + # first try goes ko + HashCollision('origin hash collision'), + # second try goes ko + psycopg2.IntegrityError('origin already inserted'), + # ok then! + {'origin': origin_url, 'visit': 1} + ] + + origin = list(swh_storage.origin_visit_get(origin_url)) + assert not origin + + r = swh_storage.origin_visit_add(sample_origin, '2020-01-01', 'git') + assert r == {'origin': origin_url, 'visit': 1} + + assert mock_memory.has_calls([ + call(sample_origin, '2020-01-01', 'git'), + call(sample_origin, '2020-01-01', 'git'), + call(sample_origin, '2020-01-01', 'git') + ]) + + +def test_retrying_proxy_swh_storage_origin_visit_add_failure( + swh_storage, sample_data, mocker): + """Unfiltered errors are raising without retry + + """ + mock_memory = mocker.patch( + 'swh.storage.in_memory.Storage.origin_visit_add') + mock_memory.side_effect = ValueError('Refuse to add origin always!') + + origin_url = sample_data['origin'][0]['url'] + + origin = list(swh_storage.origin_visit_get(origin_url)) + assert not origin + + with pytest.raises(ValueError, match='Refuse to add'): + swh_storage.origin_visit_add(origin_url, '2020-01-31', 'svn') + + assert mock_memory.has_calls([ + call(origin_url, '2020-01-31', 'svn'), + ]) + + +def test_retrying_proxy_storage_tool_add(swh_storage, sample_data): + """Standard tool_add works as before + + """ + sample_tool = sample_data['tool'][0] + + tool = swh_storage.tool_get(sample_tool) + assert not tool + + tools = swh_storage.tool_add([sample_tool]) + assert tools + tool = tools[0] + tool.pop('id') + assert tool == sample_tool + + tool = swh_storage.tool_get(sample_tool) + tool.pop('id') + assert tool == sample_tool + + +def test_retrying_proxy_storage_tool_add_with_retry( + swh_storage, sample_data, mocker): + """Multiple retries for hash collision and psycopg2 error but finally ok + + """ + sample_tool = sample_data['tool'][0] + mock_memory = mocker.patch('swh.storage.in_memory.Storage.tool_add') + mock_memory.side_effect = [ + # first try goes ko + HashCollision('tool hash collision'), + # second try goes ko + psycopg2.IntegrityError('tool already inserted'), + # ok then! + [sample_tool] + ] + + tool = swh_storage.tool_get(sample_tool) + assert not tool + + tools = swh_storage.tool_add([sample_tool]) + assert tools == [sample_tool] + + assert mock_memory.has_calls([ + call([sample_tool]), + call([sample_tool]), + call([sample_tool]), + ]) + + +def test_retrying_proxy_swh_storage_tool_add_failure( + swh_storage, sample_data, mocker): + """Unfiltered errors are raising without retry + + """ + mock_memory = mocker.patch('swh.storage.in_memory.Storage.tool_add') + mock_memory.side_effect = ValueError('Refuse to add tool always!') + + sample_tool = sample_data['tool'][0] + + tool = swh_storage.tool_get(sample_tool) + assert not tool + + with pytest.raises(ValueError, match='Refuse to add'): + swh_storage.tool_add([sample_tool]) + + assert mock_memory.call_count == 1 + + +def to_provider(provider: Dict) -> Dict: + return { + 'provider_name': provider['name'], + 'provider_url': provider['url'], + 'provider_type': provider['type'], + 'metadata': provider['metadata'], + } + + +def test_retrying_proxy_storage_metadata_provider_add( + swh_storage, sample_data): + """Standard metadata_provider_add works as before + + """ + provider = sample_data['provider'][0] + provider_get = to_provider(provider) + + provider = swh_storage.metadata_provider_get_by(provider_get) + assert not provider + + provider_id = swh_storage.metadata_provider_add(**provider_get) + assert provider_id + + actual_provider = swh_storage.metadata_provider_get(provider_id) + assert actual_provider + actual_provider_id = actual_provider.pop('id') + assert actual_provider_id == provider_id + assert actual_provider == provider_get + + +def test_retrying_proxy_storage_metadata_provider_add_with_retry( + swh_storage, sample_data, mocker): + """Multiple retries for hash collision and psycopg2 error but finally ok + + """ + provider = sample_data['provider'][0] + provider_get = to_provider(provider) + + mock_memory = mocker.patch( + 'swh.storage.in_memory.Storage.metadata_provider_add') + mock_memory.side_effect = [ + # first try goes ko + HashCollision('provider_id hash collision'), + # second try goes ko + psycopg2.IntegrityError('provider_id already inserted'), + # ok then! + 'provider_id', + ] + + provider = swh_storage.metadata_provider_get_by(provider_get) + assert not provider + + provider_id = swh_storage.metadata_provider_add(**provider_get) + assert provider_id == 'provider_id' + + assert mock_memory.has_calls([ + call(**provider_get), + call(**provider_get), + call(**provider_get), + ]) + + +def test_retrying_proxy_swh_storage_metadata_provider_add_failure( + swh_storage, sample_data, mocker): + """Unfiltered errors are raising without retry + + """ + mock_memory = mocker.patch( + 'swh.storage.in_memory.Storage.metadata_provider_add') + mock_memory.side_effect = ValueError('Refuse to add provider_id always!') + + provider = sample_data['provider'][0] + provider_get = to_provider(provider) + + provider_id = swh_storage.metadata_provider_get_by(provider_get) + assert not provider_id + + with pytest.raises(ValueError, match='Refuse to add'): + swh_storage.metadata_provider_add(**provider_get) + + assert mock_memory.call_count == 1 + + +def test_retrying_proxy_storage_origin_metadata_add( + swh_storage, sample_data): + """Standard origin_metadata_add works as before + + """ + ori_meta = sample_data['origin_metadata'][0] + origin = ori_meta['origin'] + swh_storage.origin_add_one(origin) + provider_get = to_provider(ori_meta['provider']) + provider_id = swh_storage.metadata_provider_add(**provider_get) + + origin_metadata = swh_storage.origin_metadata_get_by(origin['url']) + assert not origin_metadata + + swh_storage.origin_metadata_add( + origin['url'], ori_meta['discovery_date'], + provider_id, ori_meta['tool'], ori_meta['metadata']) + + origin_metadata = swh_storage.origin_metadata_get_by( + origin['url']) + assert origin_metadata + + +def test_retrying_proxy_storage_origin_metadata_add_with_retry( + swh_storage, sample_data, mocker): + """Multiple retries for hash collision and psycopg2 error but finally ok + + """ + ori_meta = sample_data['origin_metadata'][0] + origin = ori_meta['origin'] + swh_storage.origin_add_one(origin) + provider_get = to_provider(ori_meta['provider']) + provider_id = swh_storage.metadata_provider_add(**provider_get) + mock_memory = mocker.patch( + 'swh.storage.in_memory.Storage.origin_metadata_add') + + mock_memory.side_effect = [ + # first try goes ko + HashCollision('provider_id hash collision'), + # second try goes ko + psycopg2.IntegrityError('provider_id already inserted'), + # ok then! + None + ] + + url = origin['url'] + ts = ori_meta['discovery_date'] + tool_id = ori_meta['tool'] + metadata = ori_meta['metadata'] + + # No exception raised as insertion finally came through + swh_storage.origin_metadata_add(url, ts, provider_id, tool_id, metadata) + + mock_memory.assert_has_calls([ # 3 calls, as long as error raised + call(url, ts, provider_id, tool_id, metadata), + call(url, ts, provider_id, tool_id, metadata), + call(url, ts, provider_id, tool_id, metadata) + ]) + + +def test_retrying_proxy_swh_storage_origin_metadata_add_failure( + swh_storage, sample_data, mocker): + """Unfiltered errors are raising without retry + + """ + mock_memory = mocker.patch( + 'swh.storage.in_memory.Storage.origin_metadata_add') + mock_memory.side_effect = ValueError('Refuse to add always!') + + ori_meta = sample_data['origin_metadata'][0] + origin = ori_meta['origin'] + swh_storage.origin_add_one(origin) + + url = origin['url'] + ts = ori_meta['discovery_date'] + provider_id = 'provider_id' + tool_id = ori_meta['tool'] + metadata = ori_meta['metadata'] + + with pytest.raises(ValueError, match='Refuse to add'): + swh_storage.origin_metadata_add(url, ts, provider_id, tool_id, + metadata) + + assert mock_memory.call_count == 1 + + +def test_retrying_proxy_swh_storage_origin_visit_update( + swh_storage, sample_data): + """Standard origin_visit_update works as before + + """ + sample_origin = sample_data['origin'][0] + swh_storage.origin_add_one(sample_origin) + origin_url = sample_origin['url'] + origin_visit = swh_storage.origin_visit_add( + origin_url, '2020-01-01', 'hg') + + ov = next(swh_storage.origin_visit_get(origin_url)) + assert ov['origin'] == origin_url + assert ov['visit'] == origin_visit['visit'] + assert ov['status'] == 'ongoing' + assert ov['snapshot'] is None + assert ov['metadata'] is None + + swh_storage.origin_visit_update(origin_url, ov['visit'], status='full') + + ov = next(swh_storage.origin_visit_get(origin_url)) + assert ov['origin'] == origin_url + assert ov['visit'] == origin_visit['visit'] + assert ov['status'] == 'full' + assert ov['snapshot'] is None + assert ov['metadata'] is None + + +def test_retrying_proxy_swh_storage_origin_visit_update_retry( + swh_storage, sample_data, mocker): + """Multiple retries for hash collision and psycopg2 error but finally ok + + """ + sample_origin = sample_data['origin'][1] + origin_url = sample_origin['url'] + + mock_memory = mocker.patch( + 'swh.storage.in_memory.Storage.origin_visit_update') + mock_memory.side_effect = [ + # first try goes ko + HashCollision('origin hash collision'), + # second try goes ko + psycopg2.IntegrityError('origin already inserted'), + # ok then! + {'origin': origin_url, 'visit': 1} + ] + + visit_id = 1 + swh_storage.origin_visit_update(origin_url, visit_id, status='full') + + assert mock_memory.has_calls([ + call(origin_url, visit_id, status='full'), + call(origin_url, visit_id, status='full'), + call(origin_url, visit_id, status='full'), + ]) + + +def test_retrying_proxy_swh_storage_origin_visit_update_failure( + swh_storage, sample_data, mocker): + """Unfiltered errors are raising without retry + + """ + mock_memory = mocker.patch( + 'swh.storage.in_memory.Storage.origin_visit_update') + mock_memory.side_effect = ValueError('Refuse to add origin always!') + origin_url = sample_data['origin'][0]['url'] + visit_id = 9 + + with pytest.raises(ValueError, match='Refuse to add'): + swh_storage.origin_visit_update(origin_url, visit_id, 'partial') + + assert mock_memory.call_count == 1 + + +def test_retrying_proxy_storage_directory_add(swh_storage, sample_data): + """Standard directory_add works as before + + """ + sample_dir = sample_data['directory'][0] + + directory = swh_storage.directory_get_random() # no directory + assert not directory + + s = swh_storage.directory_add([sample_dir]) + assert s == { + 'directory:add': 1, + } + + directory_id = swh_storage.directory_get_random() # only 1 + assert directory_id == sample_dir['id'] + + +def test_retrying_proxy_storage_directory_add_with_retry( + swh_storage, sample_data, mocker): + """Multiple retries for hash collision and psycopg2 error but finally ok + + """ + mock_memory = mocker.patch('swh.storage.in_memory.Storage.directory_add') + mock_memory.side_effect = [ + # first try goes ko + HashCollision('directory hash collision'), + # second try goes ko + psycopg2.IntegrityError('directory already inserted'), + # ok then! + {'directory:add': 1} + ] + + sample_dir = sample_data['directory'][1] + + directory_id = swh_storage.directory_get_random() # no directory + assert not directory_id + + s = swh_storage.directory_add([sample_dir]) + assert s == { + 'directory:add': 1, + } + + assert mock_memory.has_calls([ + call([sample_dir]), + call([sample_dir]), + call([sample_dir]), + ]) + + +def test_retrying_proxy_swh_storage_directory_add_failure( + swh_storage, sample_data, mocker): + """Unfiltered errors are raising without retry + + """ + mock_memory = mocker.patch('swh.storage.in_memory.Storage.directory_add') + mock_memory.side_effect = ValueError('Refuse to add directory always!') + + sample_dir = sample_data['directory'][0] + + directory_id = swh_storage.directory_get_random() # no directory + assert not directory_id + + with pytest.raises(ValueError, match='Refuse to add'): + swh_storage.directory_add([sample_dir]) + + assert mock_memory.call_count == 1 + + +def test_retrying_proxy_storage_revision_add(swh_storage, sample_data): + """Standard revision_add works as before + + """ + sample_rev = sample_data['revision'][0] + + revision = next(swh_storage.revision_get([sample_rev['id']])) + assert not revision + + s = swh_storage.revision_add([sample_rev]) + assert s == { + 'revision:add': 1, + } + + revision = next(swh_storage.revision_get([sample_rev['id']])) + assert revision['id'] == sample_rev['id'] + + +def test_retrying_proxy_storage_revision_add_with_retry( + swh_storage, sample_data, mocker): + """Multiple retries for hash collision and psycopg2 error but finally ok + + """ + mock_memory = mocker.patch('swh.storage.in_memory.Storage.revision_add') + mock_memory.side_effect = [ + # first try goes ko + HashCollision('revision hash collision'), + # second try goes ko + psycopg2.IntegrityError('revision already inserted'), + # ok then! + {'revision:add': 1} + ] + + sample_rev = sample_data['revision'][0] + + revision = next(swh_storage.revision_get([sample_rev['id']])) + assert not revision + + s = swh_storage.revision_add([sample_rev]) + assert s == { + 'revision:add': 1, + } + + assert mock_memory.has_calls([ + call([sample_rev]), + call([sample_rev]), + call([sample_rev]), + ]) + + +def test_retrying_proxy_swh_storage_revision_add_failure( + swh_storage, sample_data, mocker): + """Unfiltered errors are raising without retry + + """ + mock_memory = mocker.patch('swh.storage.in_memory.Storage.revision_add') + mock_memory.side_effect = ValueError('Refuse to add revision always!') + + sample_rev = sample_data['revision'][0] + + revision = next(swh_storage.revision_get([sample_rev['id']])) + assert not revision + + with pytest.raises(ValueError, match='Refuse to add'): + swh_storage.revision_add([sample_rev]) + + assert mock_memory.call_count == 1 + + +def test_retrying_proxy_storage_release_add(swh_storage, sample_data): + """Standard release_add works as before + + """ + sample_rel = sample_data['release'][0] + + release = next(swh_storage.release_get([sample_rel['id']])) + assert not release + + s = swh_storage.release_add([sample_rel]) + assert s == { + 'release:add': 1, + } + + release = next(swh_storage.release_get([sample_rel['id']])) + assert release['id'] == sample_rel['id'] + + +def test_retrying_proxy_storage_release_add_with_retry( + swh_storage, sample_data, mocker): + """Multiple retries for hash collision and psycopg2 error but finally ok + + """ + mock_memory = mocker.patch('swh.storage.in_memory.Storage.release_add') + mock_memory.side_effect = [ + # first try goes ko + HashCollision('release hash collision'), + # second try goes ko + psycopg2.IntegrityError('release already inserted'), + # ok then! + {'release:add': 1} + ] + + sample_rel = sample_data['release'][0] + + release = next(swh_storage.release_get([sample_rel['id']])) + assert not release + + s = swh_storage.release_add([sample_rel]) + assert s == { + 'release:add': 1, + } + + assert mock_memory.has_calls([ + call([sample_rel]), + call([sample_rel]), + call([sample_rel]), + ]) + + +def test_retrying_proxy_swh_storage_release_add_failure( + swh_storage, sample_data, mocker): + """Unfiltered errors are raising without retry + + """ + mock_memory = mocker.patch('swh.storage.in_memory.Storage.release_add') + mock_memory.side_effect = ValueError('Refuse to add release always!') + + sample_rel = sample_data['release'][0] + + release = next(swh_storage.release_get([sample_rel['id']])) + assert not release + + with pytest.raises(ValueError, match='Refuse to add'): + swh_storage.release_add([sample_rel]) + + assert mock_memory.call_count == 1 + + +def test_retrying_proxy_storage_snapshot_add(swh_storage, sample_data): + """Standard snapshot_add works as before + + """ + sample_snap = sample_data['snapshot'][0] + + snapshot = swh_storage.snapshot_get(sample_snap['id']) + assert not snapshot + + s = swh_storage.snapshot_add([sample_snap]) + assert s == { + 'snapshot:add': 1, + } + + snapshot = swh_storage.snapshot_get(sample_snap['id']) + assert snapshot['id'] == sample_snap['id'] + + +def test_retrying_proxy_storage_snapshot_add_with_retry( + swh_storage, sample_data, mocker): + """Multiple retries for hash collision and psycopg2 error but finally ok + + """ + mock_memory = mocker.patch('swh.storage.in_memory.Storage.snapshot_add') + mock_memory.side_effect = [ + # first try goes ko + HashCollision('snapshot hash collision'), + # second try goes ko + psycopg2.IntegrityError('snapshot already inserted'), + # ok then! + {'snapshot:add': 1} + ] + + sample_snap = sample_data['snapshot'][0] + + snapshot = swh_storage.snapshot_get(sample_snap['id']) + assert not snapshot + + s = swh_storage.snapshot_add([sample_snap]) + assert s == { + 'snapshot:add': 1, + } + + assert mock_memory.has_calls([ + call([sample_snap]), + call([sample_snap]), + call([sample_snap]), + ]) + + +def test_retrying_proxy_swh_storage_snapshot_add_failure( + swh_storage, sample_data, mocker): + """Unfiltered errors are raising without retry + + """ + mock_memory = mocker.patch('swh.storage.in_memory.Storage.snapshot_add') + mock_memory.side_effect = ValueError('Refuse to add snapshot always!') + + sample_snap = sample_data['snapshot'][0] + + snapshot = swh_storage.snapshot_get(sample_snap['id']) + assert not snapshot + + with pytest.raises(ValueError, match='Refuse to add'): + swh_storage.snapshot_add([sample_snap]) + + assert mock_memory.call_count == 1