diff --git a/swh/storage/retry.py b/swh/storage/retry.py index 40dd7c76..f96f7a92 100644 --- a/swh/storage/retry.py +++ b/swh/storage/retry.py @@ -1,88 +1,97 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import psycopg2 import traceback from datetime import datetime -from typing import Dict, List, Union +from typing import Dict, List, Optional, Union from retrying import retry from swh.storage import get_storage, HashCollision logger = logging.getLogger(__name__) RETRY_EXCEPTIONS = [ # raised when two parallel insertions insert the same data psycopg2.IntegrityError, HashCollision, ] def should_retry_adding(error: Exception) -> bool: """Retry policy when some kind of failures occur (database integrity error, hash collision, etc...) """ retry = any(isinstance(error, exc) for exc in RETRY_EXCEPTIONS) if retry: error_name = error.__module__ + '.' + error.__class__.__name__ logger.warning('Retry adding a batch', exc_info=False, extra={ 'swh_type': 'storage_retry', 'swh_exception_type': error_name, 'swh_exception': traceback.format_exception( error.__class__, error, error.__traceback__, ), }) return retry class RetryingProxyStorage: """Storage implementation which retries adding objects when it specifically fails (hash collision, integrity error). """ def __init__(self, storage): self.storage = get_storage(**storage) def __getattr__(self, key): return getattr(self.storage, key) @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) def content_add(self, content: List[Dict]) -> Dict: return self.storage.content_add(content) @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) def origin_add_one(self, origin: Dict) -> str: return self.storage.origin_add_one(origin) @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) def origin_visit_add(self, origin: Dict, date: Union[datetime, str], type: str) -> Dict: return self.storage.origin_visit_add(origin, date, type) + @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) + def origin_visit_update( + self, origin: str, visit_id: int, status: Optional[str] = None, + metadata: Optional[Dict] = None, + snapshot: Optional[Dict] = None) -> Dict: + return self.storage.origin_visit_update( + origin, visit_id, status=status, + metadata=metadata, snapshot=snapshot) + @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) def tool_add(self, tools: List[Dict]) -> List[Dict]: return self.storage.tool_add(tools) @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) def metadata_provider_add( self, provider_name: str, provider_type: str, provider_url: str, metadata: Dict) -> Union[str, int]: return self.storage.metadata_provider_add( provider_name, provider_type, provider_url, metadata) @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) def origin_metadata_add( self, origin_url: str, ts: Union[str, datetime], provider_id: int, tool_id: int, metadata: Dict) -> None: return self.storage.origin_metadata_add( origin_url, ts, provider_id, tool_id, metadata) diff --git a/swh/storage/tests/test_retry.py b/swh/storage/tests/test_retry.py index db4d1c18..5472abad 100644 --- a/swh/storage/tests/test_retry.py +++ b/swh/storage/tests/test_retry.py @@ -1,442 +1,520 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import psycopg2 import pytest from typing import Dict from unittest.mock import call from swh.storage import HashCollision from swh.storage.retry import ( RetryingProxyStorage, should_retry_adding, RETRY_EXCEPTIONS ) def test_should_retry_adding(): """Specific exceptions should be elected for retrial """ for exc in RETRY_EXCEPTIONS: assert should_retry_adding(exc('error')) is True def test_should_retry_adding_no_retry(): """Unspecific exceptions should raise as usual """ for exc in [ValueError, Exception]: assert should_retry_adding(exc('fail!')) is False @pytest.fixture def swh_storage(): return RetryingProxyStorage(storage={'cls': 'memory'}) def test_retrying_proxy_storage_content_add(swh_storage, sample_data): """Standard content_add works as before """ sample_content = sample_data['content'][0] content = next(swh_storage.content_get([sample_content['sha1']])) assert not content s = swh_storage.content_add([sample_content]) assert s == { 'content:add': 1, 'content:add:bytes': sample_content['length'], 'skipped_content:add': 0 } def test_retrying_proxy_storage_content_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add') mock_memory.side_effect = [ # first try goes ko HashCollision('content hash collision'), # second try goes ko psycopg2.IntegrityError('content already inserted'), # ok then! {'content:add': 1} ] sample_content = sample_data['content'][0] content = next(swh_storage.content_get([sample_content['sha1']])) assert not content s = swh_storage.content_add([sample_content]) assert s == { 'content:add': 1, } def test_retrying_proxy_swh_storage_content_add_failure( swh_storage, sample_data, mocker): """Other errors are raising as usual """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add') mock_memory.side_effect = ValueError('Refuse to add content always!') sample_content = sample_data['content'][0] content = next(swh_storage.content_get([sample_content['sha1']])) assert not content with pytest.raises(ValueError, match='Refuse to add'): swh_storage.content_add([sample_content]) content = next(swh_storage.content_get([sample_content['sha1']])) assert not content def test_retrying_proxy_swh_storage_origin_add_one(swh_storage, sample_data): - """Standard content_add works as before + """Standard origin_add_one works as before """ sample_origin = sample_data['origin'][0] origin = swh_storage.origin_get(sample_origin) assert not origin r = swh_storage.origin_add_one(sample_origin) assert r == sample_origin['url'] origin = swh_storage.origin_get(sample_origin) assert origin['url'] == sample_origin['url'] def test_retrying_proxy_swh_storage_origin_add_one_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ sample_origin = sample_data['origin'][1] mock_memory = mocker.patch('swh.storage.in_memory.Storage.origin_add_one') mock_memory.side_effect = [ # first try goes ko HashCollision('origin hash collision'), # second try goes ko psycopg2.IntegrityError('origin already inserted'), # ok then! sample_origin['url'] ] origin = swh_storage.origin_get(sample_origin) assert not origin r = swh_storage.origin_add_one(sample_origin) assert r == sample_origin['url'] def test_retrying_proxy_swh_storage_origin_add_one_failure( swh_storage, sample_data, mocker): """Other errors are raising as usual """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.origin_add_one') mock_memory.side_effect = ValueError('Refuse to add origin always!') sample_origin = sample_data['origin'][0] origin = swh_storage.origin_get(sample_origin) assert not origin with pytest.raises(ValueError, match='Refuse to add'): swh_storage.origin_add_one([sample_origin]) origin = swh_storage.origin_get(sample_origin) assert not origin def test_retrying_proxy_swh_storage_origin_visit_add(swh_storage, sample_data): - """Standard content_add works as before + """Standard origin_visit_add works as before """ sample_origin = sample_data['origin'][0] swh_storage.origin_add_one(sample_origin) origin_url = sample_origin['url'] origin = list(swh_storage.origin_visit_get(origin_url)) assert not origin origin_visit = swh_storage.origin_visit_add( origin_url, '2020-01-01', 'hg') assert origin_visit['origin'] == origin_url assert isinstance(origin_visit['visit'], int) origin_visit = next(swh_storage.origin_visit_get(origin_url)) assert origin_visit['origin'] == origin_url assert isinstance(origin_visit['visit'], int) def test_retrying_proxy_swh_storage_origin_visit_add_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ sample_origin = sample_data['origin'][1] swh_storage.origin_add_one(sample_origin) origin_url = sample_origin['url'] mock_memory = mocker.patch( 'swh.storage.in_memory.Storage.origin_visit_add') mock_memory.side_effect = [ # first try goes ko HashCollision('origin hash collision'), # second try goes ko psycopg2.IntegrityError('origin already inserted'), # ok then! {'origin': origin_url, 'visit': 1} ] origin = list(swh_storage.origin_visit_get(origin_url)) assert not origin r = swh_storage.origin_visit_add(sample_origin, '2020-01-01', 'git') assert r == {'origin': origin_url, 'visit': 1} def test_retrying_proxy_swh_storage_origin_visit_add_failure( swh_storage, sample_data, mocker): """Other errors are raising as usual """ mock_memory = mocker.patch( 'swh.storage.in_memory.Storage.origin_visit_add') mock_memory.side_effect = ValueError('Refuse to add origin always!') origin_url = sample_data['origin'][0]['url'] origin = list(swh_storage.origin_visit_get(origin_url)) assert not origin with pytest.raises(ValueError, match='Refuse to add'): swh_storage.origin_visit_add(origin_url, '2020-01-01', 'svn') def test_retrying_proxy_storage_tool_add(swh_storage, sample_data): """Standard tool_add works as before """ sample_tool = sample_data['tool'][0] tool = swh_storage.tool_get(sample_tool) assert not tool tools = swh_storage.tool_add([sample_tool]) assert tools tool = tools[0] tool.pop('id') assert tool == sample_tool tool = swh_storage.tool_get(sample_tool) tool.pop('id') assert tool == sample_tool def test_retrying_proxy_storage_tool_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ sample_tool = sample_data['tool'][0] mock_memory = mocker.patch('swh.storage.in_memory.Storage.tool_add') mock_memory.side_effect = [ # first try goes ko HashCollision('tool hash collision'), # second try goes ko psycopg2.IntegrityError('tool already inserted'), # ok then! [sample_tool] ] tool = swh_storage.tool_get(sample_tool) assert not tool tools = swh_storage.tool_add([sample_tool]) assert tools == [sample_tool] def test_retrying_proxy_swh_storage_tool_add_failure( swh_storage, sample_data, mocker): """Other errors are raising as usual """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.tool_add') mock_memory.side_effect = ValueError('Refuse to add tool always!') sample_tool = sample_data['tool'][0] tool = swh_storage.tool_get(sample_tool) assert not tool with pytest.raises(ValueError, match='Refuse to add'): swh_storage.tool_add([sample_tool]) tool = swh_storage.tool_get(sample_tool) assert not tool def to_provider(provider: Dict) -> Dict: return { 'provider_name': provider['name'], 'provider_url': provider['url'], 'provider_type': provider['type'], 'metadata': provider['metadata'], } def test_retrying_proxy_storage_metadata_provider_add( swh_storage, sample_data): """Standard metadata_provider_add works as before """ provider = sample_data['provider'][0] provider_get = to_provider(provider) provider = swh_storage.metadata_provider_get_by(provider_get) assert not provider provider_id = swh_storage.metadata_provider_add(**provider_get) assert provider_id actual_provider = swh_storage.metadata_provider_get(provider_id) assert actual_provider actual_provider_id = actual_provider.pop('id') assert actual_provider_id == provider_id assert actual_provider == provider_get def test_retrying_proxy_storage_metadata_provider_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ provider = sample_data['provider'][0] provider_get = to_provider(provider) mock_memory = mocker.patch( 'swh.storage.in_memory.Storage.metadata_provider_add') mock_memory.side_effect = [ # first try goes ko HashCollision('provider_id hash collision'), # second try goes ko psycopg2.IntegrityError('provider_id already inserted'), # ok then! 'provider_id', ] provider = swh_storage.metadata_provider_get_by(provider_get) assert not provider provider_id = swh_storage.metadata_provider_add(**provider_get) assert provider_id == 'provider_id' def test_retrying_proxy_swh_storage_metadata_provider_add_failure( swh_storage, sample_data, mocker): """Other errors are raising as usual """ mock_memory = mocker.patch( 'swh.storage.in_memory.Storage.metadata_provider_add') mock_memory.side_effect = ValueError('Refuse to add provider_id always!') provider = sample_data['provider'][0] provider_get = to_provider(provider) provider_id = swh_storage.metadata_provider_get_by(provider_get) assert not provider_id with pytest.raises(ValueError, match='Refuse to add'): swh_storage.metadata_provider_add(**provider_get) def test_retrying_proxy_storage_origin_metadata_add( swh_storage, sample_data): """Standard origin_metadata_add works as before """ ori_meta = sample_data['origin_metadata'][0] origin = ori_meta['origin'] swh_storage.origin_add_one(origin) provider_get = to_provider(ori_meta['provider']) provider_id = swh_storage.metadata_provider_add(**provider_get) origin_metadata = swh_storage.origin_metadata_get_by(origin['url']) assert not origin_metadata swh_storage.origin_metadata_add( origin['url'], ori_meta['discovery_date'], provider_id, ori_meta['tool'], ori_meta['metadata']) origin_metadata = swh_storage.origin_metadata_get_by( origin['url']) assert origin_metadata def test_retrying_proxy_storage_origin_metadata_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ ori_meta = sample_data['origin_metadata'][0] origin = ori_meta['origin'] swh_storage.origin_add_one(origin) provider_get = to_provider(ori_meta['provider']) provider_id = swh_storage.metadata_provider_add(**provider_get) mock_memory = mocker.patch( 'swh.storage.in_memory.Storage.origin_metadata_add') mock_memory.side_effect = [ # first try goes ko HashCollision('provider_id hash collision'), # second try goes ko psycopg2.IntegrityError('provider_id already inserted'), # ok then! None ] url = origin['url'] ts = ori_meta['discovery_date'] tool_id = ori_meta['tool'] metadata = ori_meta['metadata'] # No exception raised as insertion finally came through swh_storage.origin_metadata_add(url, ts, provider_id, tool_id, metadata) mock_memory.assert_has_calls([ # 3 calls, as long as error raised call(url, ts, provider_id, tool_id, metadata), call(url, ts, provider_id, tool_id, metadata), call(url, ts, provider_id, tool_id, metadata) ]) def test_retrying_proxy_swh_storage_origin_metadata_add_failure( swh_storage, sample_data, mocker): """Other errors are raising as usual """ mock_memory = mocker.patch( 'swh.storage.in_memory.Storage.origin_metadata_add') mock_memory.side_effect = ValueError('Refuse to add always!') ori_meta = sample_data['origin_metadata'][0] origin = ori_meta['origin'] swh_storage.origin_add_one(origin) with pytest.raises(ValueError, match='Refuse to add'): swh_storage.origin_metadata_add( origin['url'], ori_meta['discovery_date'], 'provider_id', ori_meta['tool'], ori_meta['metadata']) + + +def test_retrying_proxy_swh_storage_origin_visit_update( + swh_storage, sample_data): + """Standard origin_visit_update works as before + + """ + sample_origin = sample_data['origin'][0] + swh_storage.origin_add_one(sample_origin) + origin_url = sample_origin['url'] + origin_visit = swh_storage.origin_visit_add( + origin_url, '2020-01-01', 'hg') + + ov = next(swh_storage.origin_visit_get(origin_url)) + assert ov['origin'] == origin_url + assert ov['visit'] == origin_visit['visit'] + assert ov['status'] == 'ongoing' + assert ov['snapshot'] is None + assert ov['metadata'] is None + + swh_storage.origin_visit_update(origin_url, ov['visit'], status='full') + + ov = next(swh_storage.origin_visit_get(origin_url)) + assert ov['origin'] == origin_url + assert ov['visit'] == origin_visit['visit'] + assert ov['status'] == 'full' + assert ov['snapshot'] is None + assert ov['metadata'] is None + + +def test_retrying_proxy_swh_storage_origin_visit_update_retry( + swh_storage, sample_data, mocker): + """Multiple retries for hash collision and psycopg2 error but finally ok + + """ + sample_origin = sample_data['origin'][1] + origin_url = sample_origin['url'] + + mock_memory = mocker.patch( + 'swh.storage.in_memory.Storage.origin_visit_update') + mock_memory.side_effect = [ + # first try goes ko + HashCollision('origin hash collision'), + # second try goes ko + psycopg2.IntegrityError('origin already inserted'), + # ok then! + {'origin': origin_url, 'visit': 1} + ] + + visit_id = 1 + swh_storage.origin_visit_update(origin_url, visit_id, status='full') + + assert mock_memory.has_calls([ + call(origin_url, visit_id, status='full'), + call(origin_url, visit_id, status='full'), + call(origin_url, visit_id, status='full'), + ]) + + +def test_retrying_proxy_swh_storage_origin_visit_update_failure( + swh_storage, sample_data, mocker): + """Other errors are raising as usual + + """ + mock_memory = mocker.patch( + 'swh.storage.in_memory.Storage.origin_visit_update') + mock_memory.side_effect = ValueError('Refuse to add origin always!') + origin_url = sample_data['origin'][0]['url'] + visit_id = 9 + + with pytest.raises(ValueError, match='Refuse to add'): + swh_storage.origin_visit_update(origin_url, visit_id, 'partial') + + assert mock_memory.has_calls([ + call(origin_url, visit_id, 'partial'), + call(origin_url, visit_id, 'partial'), + call(origin_url, visit_id, 'partial'), + ])