diff --git a/swh/storage/retry.py b/swh/storage/retry.py --- a/swh/storage/retry.py +++ b/swh/storage/retry.py @@ -5,22 +5,9 @@ import logging import traceback -from typing import Dict, Iterable, List from tenacity import retry, stop_after_attempt, wait_random_exponential -from swh.model.model import ( - Content, - Directory, - MetadataAuthority, - MetadataFetcher, - OriginVisit, - RawExtrinsicMetadata, - Release, - Revision, - SkippedContent, - Snapshot, -) from swh.storage import get_storage from swh.storage.exc import StorageArgumentException from swh.storage.interface import StorageInterface @@ -53,7 +40,7 @@ else: error_name = error.__class__.__name__ logger.warning( - "Retry adding a batch", + "Retrying RPC call", exc_info=False, extra={ "swh_type": "storage_retry", @@ -74,6 +61,14 @@ ) +def retry_function(storage, attribute_name): + @swh_retry + def newf(*args, **kwargs): + return getattr(storage, attribute_name)(*args, **kwargs) + + return newf + + class RetryingProxyStorage: """Storage implementation which retries adding objects when it specifically fails (hash collision, integrity error). @@ -82,52 +77,11 @@ def __init__(self, storage): self.storage: StorageInterface = get_storage(**storage) - - def __getattr__(self, key): - if key == "storage": - raise AttributeError(key) - return getattr(self.storage, key) - - @swh_retry - def content_add(self, content: List[Content]) -> Dict: - return self.storage.content_add(content) - - @swh_retry - def content_add_metadata(self, content: List[Content]) -> Dict: - return self.storage.content_add_metadata(content) - - @swh_retry - def skipped_content_add(self, content: List[SkippedContent]) -> Dict: - return self.storage.skipped_content_add(content) - - @swh_retry - def origin_visit_add(self, visits: List[OriginVisit]) -> Iterable[OriginVisit]: - return self.storage.origin_visit_add(visits) - - @swh_retry - def metadata_fetcher_add(self, fetchers: List[MetadataFetcher],) -> None: - return self.storage.metadata_fetcher_add(fetchers) - - @swh_retry - def metadata_authority_add(self, authorities: List[MetadataAuthority]) -> None: - return self.storage.metadata_authority_add(authorities) - - @swh_retry - def raw_extrinsic_metadata_add(self, metadata: List[RawExtrinsicMetadata],) -> None: - return self.storage.raw_extrinsic_metadata_add(metadata) - - @swh_retry - def directory_add(self, directories: List[Directory]) -> Dict: - return self.storage.directory_add(directories) - - @swh_retry - def revision_add(self, revisions: List[Revision]) -> Dict: - return self.storage.revision_add(revisions) - - @swh_retry - def release_add(self, releases: List[Release]) -> Dict: - return self.storage.release_add(releases) - - @swh_retry - def snapshot_add(self, snapshots: List[Snapshot]) -> Dict: - return self.storage.snapshot_add(snapshots) + for attribute_name in dir(StorageInterface): + if attribute_name.startswith("_"): + continue + attribute = getattr(self.storage, attribute_name) + if hasattr(attribute, "__call__"): + setattr( + self, attribute_name, retry_function(self.storage, attribute_name) + ) diff --git a/swh/storage/tests/test_retry.py b/swh/storage/tests/test_retry.py --- a/swh/storage/tests/test_retry.py +++ b/swh/storage/tests/test_retry.py @@ -9,7 +9,6 @@ import psycopg2 import pytest -from swh.model.model import MetadataTargetType from swh.storage.exc import HashCollision, StorageArgumentException from swh.storage.utils import now @@ -180,645 +179,6 @@ assert mock_memory.call_count == 1 -def test_retrying_proxy_storage_skipped_content_add(swh_storage, sample_data): - """Standard skipped_content_add works as before - - """ - sample_content = sample_data.skipped_content - sample_content_dict = sample_content.to_dict() - - skipped_contents = list(swh_storage.skipped_content_missing([sample_content_dict])) - assert len(skipped_contents) == 1 - - s = swh_storage.skipped_content_add([sample_content]) - assert s == { - "skipped_content:add": 1, - } - - skipped_content = list(swh_storage.skipped_content_missing([sample_content_dict])) - assert len(skipped_content) == 0 - - -def test_retrying_proxy_storage_skipped_content_add_with_retry( - monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision -): - """Multiple retries for hash collision and psycopg2 error but finally ok - - """ - mock_memory = mocker.patch( - "swh.storage.in_memory.InMemoryStorage.skipped_content_add" - ) - mock_memory.side_effect = [ - # 1st & 2nd try goes ko - fake_hash_collision, - psycopg2.IntegrityError("skipped_content already inserted"), - # ok then! - {"skipped_content:add": 1}, - ] - - sample_content = sample_data.skipped_content - - s = swh_storage.skipped_content_add([sample_content]) - assert s == {"skipped_content:add": 1} - - mock_memory.assert_has_calls( - [call([sample_content]), call([sample_content]), call([sample_content]),] - ) - - -def test_retrying_proxy_swh_storage_skipped_content_add_failure( - swh_storage, sample_data, mocker -): - """Unfiltered errors are raising without retry - - """ - mock_memory = mocker.patch( - "swh.storage.in_memory.InMemoryStorage.skipped_content_add" - ) - mock_memory.side_effect = StorageArgumentException( - "Refuse to add content_metadata!" - ) - - sample_content = sample_data.skipped_content - sample_content_dict = sample_content.to_dict() - - skipped_contents = list(swh_storage.skipped_content_missing([sample_content_dict])) - assert len(skipped_contents) == 1 - - with pytest.raises(StorageArgumentException, match="Refuse to add"): - swh_storage.skipped_content_add([sample_content]) - - skipped_contents = list(swh_storage.skipped_content_missing([sample_content_dict])) - assert len(skipped_contents) == 1 - - assert mock_memory.call_count == 1 - - -def test_retrying_proxy_swh_storage_origin_visit_add(swh_storage, sample_data): - """Standard origin_visit_add works as before - - """ - origin = sample_data.origin - visit = sample_data.origin_visit - assert visit.origin == origin.url - - swh_storage.origin_add([origin]) - - origins = swh_storage.origin_visit_get(origin.url).results - assert not origins - - origin_visit = swh_storage.origin_visit_add([visit])[0] - assert origin_visit.origin == origin.url - assert isinstance(origin_visit.visit, int) - - actual_visit = swh_storage.origin_visit_get(origin.url).results[0] - assert actual_visit == visit - - -def test_retrying_proxy_swh_storage_origin_visit_add_retry( - monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision -): - """Multiple retries for hash collision and psycopg2 error but finally ok - - """ - origin = sample_data.origin - visit = sample_data.origin_visit - assert visit.origin == origin.url - - swh_storage.origin_add([origin]) - - mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.origin_visit_add") - mock_memory.side_effect = [ - # first try goes ko - fake_hash_collision, - # second try goes ko - psycopg2.IntegrityError("origin already inserted"), - # ok then! - [visit], - ] - - origins = swh_storage.origin_visit_get(origin.url).results - assert not origins - - r = swh_storage.origin_visit_add([visit]) - assert r == [visit] - - mock_memory.assert_has_calls( - [call([visit]), call([visit]), call([visit]),] - ) - - -def test_retrying_proxy_swh_storage_origin_visit_add_failure( - swh_storage, sample_data, mocker -): - """Unfiltered errors are raising without retry - - """ - mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.origin_visit_add") - mock_memory.side_effect = StorageArgumentException("Refuse to add origin always!") - - origin = sample_data.origin - visit = sample_data.origin_visit - assert visit.origin == origin.url - - origins = swh_storage.origin_visit_get(origin.url).results - assert not origins - - with pytest.raises(StorageArgumentException, match="Refuse to add"): - swh_storage.origin_visit_add([visit]) - - mock_memory.assert_has_calls( - [call([visit]),] - ) - - -def test_retrying_proxy_storage_metadata_fetcher_add(swh_storage, sample_data): - """Standard metadata_fetcher_add works as before - - """ - fetcher = sample_data.metadata_fetcher - - metadata_fetcher = swh_storage.metadata_fetcher_get(fetcher.name, fetcher.version) - assert not metadata_fetcher - - swh_storage.metadata_fetcher_add([fetcher]) - - actual_fetcher = swh_storage.metadata_fetcher_get(fetcher.name, fetcher.version) - assert actual_fetcher == fetcher - - -def test_retrying_proxy_storage_metadata_fetcher_add_with_retry( - monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision, -): - """Multiple retries for hash collision and psycopg2 error but finally ok - - """ - fetcher = sample_data.metadata_fetcher - mock_memory = mocker.patch( - "swh.storage.in_memory.InMemoryStorage.metadata_fetcher_add" - ) - mock_memory.side_effect = [ - # first try goes ko - fake_hash_collision, - # second try goes ko - psycopg2.IntegrityError("metadata_fetcher already inserted"), - # ok then! - [fetcher], - ] - - actual_fetcher = swh_storage.metadata_fetcher_get(fetcher.name, fetcher.version) - assert not actual_fetcher - - swh_storage.metadata_fetcher_add([fetcher]) - - mock_memory.assert_has_calls( - [call([fetcher]), call([fetcher]), call([fetcher]),] - ) - - -def test_retrying_proxy_swh_storage_metadata_fetcher_add_failure( - swh_storage, sample_data, mocker -): - """Unfiltered errors are raising without retry - - """ - mock_memory = mocker.patch( - "swh.storage.in_memory.InMemoryStorage.metadata_fetcher_add" - ) - mock_memory.side_effect = StorageArgumentException( - "Refuse to add metadata_fetcher always!" - ) - - fetcher = sample_data.metadata_fetcher - - actual_fetcher = swh_storage.metadata_fetcher_get(fetcher.name, fetcher.version) - assert not actual_fetcher - - with pytest.raises(StorageArgumentException, match="Refuse to add"): - swh_storage.metadata_fetcher_add([fetcher]) - - assert mock_memory.call_count == 1 - - -def test_retrying_proxy_storage_metadata_authority_add(swh_storage, sample_data): - """Standard metadata_authority_add works as before - - """ - authority = sample_data.metadata_authority - - assert not swh_storage.metadata_authority_get(authority.type, authority.url) - - swh_storage.metadata_authority_add([authority]) - - actual_authority = swh_storage.metadata_authority_get(authority.type, authority.url) - assert actual_authority == authority - - -def test_retrying_proxy_storage_metadata_authority_add_with_retry( - monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision, -): - """Multiple retries for hash collision and psycopg2 error but finally ok - - """ - authority = sample_data.metadata_authority - - mock_memory = mocker.patch( - "swh.storage.in_memory.InMemoryStorage.metadata_authority_add" - ) - mock_memory.side_effect = [ - # first try goes ko - fake_hash_collision, - # second try goes ko - psycopg2.IntegrityError("foo bar"), - # ok then! - None, - ] - - assert not swh_storage.metadata_authority_get(authority.type, authority.url) - - swh_storage.metadata_authority_add([authority]) - - mock_memory.assert_has_calls( - [call([authority]), call([authority]), call([authority])] - ) - - -def test_retrying_proxy_swh_storage_metadata_authority_add_failure( - swh_storage, sample_data, mocker -): - """Unfiltered errors are raising without retry - - """ - mock_memory = mocker.patch( - "swh.storage.in_memory.InMemoryStorage.metadata_authority_add" - ) - mock_memory.side_effect = StorageArgumentException( - "Refuse to add authority_id always!" - ) - - authority = sample_data.metadata_authority - - swh_storage.metadata_authority_get(authority.type, authority.url) - - with pytest.raises(StorageArgumentException, match="Refuse to add"): - swh_storage.metadata_authority_add([authority]) - - assert mock_memory.call_count == 1 - - -def test_retrying_proxy_storage_raw_extrinsic_metadata_add(swh_storage, sample_data): - """Standard raw_extrinsic_metadata_add works as before - - """ - origin = sample_data.origin - ori_meta = sample_data.origin_metadata1 - assert origin.url == ori_meta.target - swh_storage.origin_add([origin]) - swh_storage.metadata_authority_add([sample_data.metadata_authority]) - swh_storage.metadata_fetcher_add([sample_data.metadata_fetcher]) - - origin_metadata = swh_storage.raw_extrinsic_metadata_get( - MetadataTargetType.ORIGIN, ori_meta.target, ori_meta.authority - ) - assert origin_metadata.next_page_token is None - assert not origin_metadata.results - - swh_storage.raw_extrinsic_metadata_add([ori_meta]) - - origin_metadata = swh_storage.raw_extrinsic_metadata_get( - MetadataTargetType.ORIGIN, ori_meta.target, ori_meta.authority - ) - assert origin_metadata - - -def test_retrying_proxy_storage_raw_extrinsic_metadata_add_with_retry( - monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision, -): - """Multiple retries for hash collision and psycopg2 error but finally ok - - """ - origin = sample_data.origin - ori_meta = sample_data.origin_metadata1 - assert origin.url == ori_meta.target - swh_storage.origin_add([origin]) - swh_storage.metadata_authority_add([sample_data.metadata_authority]) - swh_storage.metadata_fetcher_add([sample_data.metadata_fetcher]) - mock_memory = mocker.patch( - "swh.storage.in_memory.InMemoryStorage.raw_extrinsic_metadata_add" - ) - - mock_memory.side_effect = [ - # first try goes ko - fake_hash_collision, - # second try goes ko - psycopg2.IntegrityError("foo bar"), - # ok then! - None, - ] - - # No exception raised as insertion finally came through - swh_storage.raw_extrinsic_metadata_add([ori_meta]) - - mock_memory.assert_has_calls( - [ # 3 calls, as long as error raised - call([ori_meta]), - call([ori_meta]), - call([ori_meta]), - ] - ) - - -def test_retrying_proxy_swh_storage_raw_extrinsic_metadata_add_failure( - swh_storage, sample_data, mocker -): - """Unfiltered errors are raising without retry - - """ - mock_memory = mocker.patch( - "swh.storage.in_memory.InMemoryStorage.raw_extrinsic_metadata_add" - ) - mock_memory.side_effect = StorageArgumentException("Refuse to add always!") - - origin = sample_data.origin - ori_meta = sample_data.origin_metadata1 - assert origin.url == ori_meta.target - swh_storage.origin_add([origin]) - - with pytest.raises(StorageArgumentException, match="Refuse to add"): - swh_storage.raw_extrinsic_metadata_add([ori_meta]) - - assert mock_memory.call_count == 1 - - -def test_retrying_proxy_storage_directory_add(swh_storage, sample_data): - """Standard directory_add works as before - - """ - sample_dir = sample_data.directory - - s = swh_storage.directory_add([sample_dir]) - assert s == { - "directory:add": 1, - } - - directory_id = swh_storage.directory_get_random() # only 1 - assert directory_id == sample_dir.id - - -def test_retrying_proxy_storage_directory_add_with_retry( - monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision -): - """Multiple retries for hash collision and psycopg2 error but finally ok - - """ - mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.directory_add") - mock_memory.side_effect = [ - # first try goes ko - fake_hash_collision, - # second try goes ko - psycopg2.IntegrityError("directory already inserted"), - # ok then! - {"directory:add": 1}, - ] - - sample_dir = sample_data.directories[1] - - s = swh_storage.directory_add([sample_dir]) - assert s == { - "directory:add": 1, - } - - mock_memory.assert_has_calls( - [call([sample_dir]), call([sample_dir]), call([sample_dir]),] - ) - - -def test_retrying_proxy_swh_storage_directory_add_failure( - swh_storage, sample_data, mocker -): - """Unfiltered errors are raising without retry - - """ - mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.directory_add") - mock_memory.side_effect = StorageArgumentException( - "Refuse to add directory always!" - ) - - sample_dir = sample_data.directory - - with pytest.raises(StorageArgumentException, match="Refuse to add"): - swh_storage.directory_add([sample_dir]) - - assert mock_memory.call_count == 1 - - -def test_retrying_proxy_storage_revision_add(swh_storage, sample_data): - """Standard revision_add works as before - - """ - sample_rev = sample_data.revision - - revision = swh_storage.revision_get([sample_rev.id])[0] - assert revision is None - - s = swh_storage.revision_add([sample_rev]) - assert s == { - "revision:add": 1, - } - - revision = swh_storage.revision_get([sample_rev.id])[0] - assert revision == sample_rev - - -def test_retrying_proxy_storage_revision_add_with_retry( - monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision -): - """Multiple retries for hash collision and psycopg2 error but finally ok - - """ - mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.revision_add") - mock_memory.side_effect = [ - # first try goes ko - fake_hash_collision, - # second try goes ko - psycopg2.IntegrityError("revision already inserted"), - # ok then! - {"revision:add": 1}, - ] - - sample_rev = sample_data.revision - - revision = swh_storage.revision_get([sample_rev.id])[0] - assert revision is None - - s = swh_storage.revision_add([sample_rev]) - assert s == { - "revision:add": 1, - } - - mock_memory.assert_has_calls( - [call([sample_rev]), call([sample_rev]), call([sample_rev]),] - ) - - -def test_retrying_proxy_swh_storage_revision_add_failure( - swh_storage, sample_data, mocker -): - """Unfiltered errors are raising without retry - - """ - mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.revision_add") - mock_memory.side_effect = StorageArgumentException("Refuse to add revision always!") - - sample_rev = sample_data.revision - - revision = swh_storage.revision_get([sample_rev.id])[0] - assert revision is None - - with pytest.raises(StorageArgumentException, match="Refuse to add"): - swh_storage.revision_add([sample_rev]) - - assert mock_memory.call_count == 1 - - -def test_retrying_proxy_storage_release_add(swh_storage, sample_data): - """Standard release_add works as before - - """ - sample_rel = sample_data.release - - release = swh_storage.release_get([sample_rel.id])[0] - assert release is None - - s = swh_storage.release_add([sample_rel]) - assert s == { - "release:add": 1, - } - - release = swh_storage.release_get([sample_rel.id])[0] - assert release == sample_rel - - -def test_retrying_proxy_storage_release_add_with_retry( - monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision -): - """Multiple retries for hash collision and psycopg2 error but finally ok - - """ - mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.release_add") - mock_memory.side_effect = [ - # first try goes ko - fake_hash_collision, - # second try goes ko - psycopg2.IntegrityError("release already inserted"), - # ok then! - {"release:add": 1}, - ] - - sample_rel = sample_data.release - - release = swh_storage.release_get([sample_rel.id])[0] - assert release is None - - s = swh_storage.release_add([sample_rel]) - assert s == { - "release:add": 1, - } - - mock_memory.assert_has_calls( - [call([sample_rel]), call([sample_rel]), call([sample_rel]),] - ) - - -def test_retrying_proxy_swh_storage_release_add_failure( - swh_storage, sample_data, mocker -): - """Unfiltered errors are raising without retry - - """ - mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.release_add") - mock_memory.side_effect = StorageArgumentException("Refuse to add release always!") - - sample_rel = sample_data.release - - release = swh_storage.release_get([sample_rel.id])[0] - assert release is None - - with pytest.raises(StorageArgumentException, match="Refuse to add"): - swh_storage.release_add([sample_rel]) - - assert mock_memory.call_count == 1 - - -def test_retrying_proxy_storage_snapshot_add(swh_storage, sample_data): - """Standard snapshot_add works as before - - """ - sample_snap = sample_data.snapshot - - snapshot = swh_storage.snapshot_get(sample_snap.id) - assert not snapshot - - s = swh_storage.snapshot_add([sample_snap]) - assert s == { - "snapshot:add": 1, - } - - snapshot = swh_storage.snapshot_get(sample_snap.id) - assert snapshot["id"] == sample_snap.id - - -def test_retrying_proxy_storage_snapshot_add_with_retry( - monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision -): - """Multiple retries for hash collision and psycopg2 error but finally ok - - """ - mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.snapshot_add") - mock_memory.side_effect = [ - # first try goes ko - fake_hash_collision, - # second try goes ko - psycopg2.IntegrityError("snapshot already inserted"), - # ok then! - {"snapshot:add": 1}, - ] - - sample_snap = sample_data.snapshot - - snapshot = swh_storage.snapshot_get(sample_snap.id) - assert not snapshot - - s = swh_storage.snapshot_add([sample_snap]) - assert s == { - "snapshot:add": 1, - } - - mock_memory.assert_has_calls( - [call([sample_snap]), call([sample_snap]), call([sample_snap]),] - ) - - -def test_retrying_proxy_swh_storage_snapshot_add_failure( - swh_storage, sample_data, mocker -): - """Unfiltered errors are raising without retry - - """ - mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.snapshot_add") - mock_memory.side_effect = StorageArgumentException("Refuse to add snapshot always!") - - sample_snap = sample_data.snapshot - - snapshot = swh_storage.snapshot_get(sample_snap.id) - assert not snapshot - - with pytest.raises(StorageArgumentException, match="Refuse to add"): - swh_storage.snapshot_add([sample_snap]) - - assert mock_memory.call_count == 1 - - def test_retrying_proxy_swh_storage_keyboardinterrupt(swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry