Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7123281
D4434.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
23 KB
Subscribers
None
D4434.diff
View Options
diff --git a/swh/storage/retry.py b/swh/storage/retry.py
--- a/swh/storage/retry.py
+++ b/swh/storage/retry.py
@@ -5,22 +5,9 @@
import logging
import traceback
-from typing import Dict, Iterable, List
from tenacity import retry, stop_after_attempt, wait_random_exponential
-from swh.model.model import (
- Content,
- Directory,
- MetadataAuthority,
- MetadataFetcher,
- OriginVisit,
- RawExtrinsicMetadata,
- Release,
- Revision,
- SkippedContent,
- Snapshot,
-)
from swh.storage import get_storage
from swh.storage.exc import StorageArgumentException
from swh.storage.interface import StorageInterface
@@ -53,7 +40,7 @@
else:
error_name = error.__class__.__name__
logger.warning(
- "Retry adding a batch",
+ "Retrying RPC call",
exc_info=False,
extra={
"swh_type": "storage_retry",
@@ -74,6 +61,14 @@
)
+def retry_function(storage, attribute_name):
+ @swh_retry
+ def newf(*args, **kwargs):
+ return getattr(storage, attribute_name)(*args, **kwargs)
+
+ return newf
+
+
class RetryingProxyStorage:
"""Storage implementation which retries adding objects when it specifically
fails (hash collision, integrity error).
@@ -82,52 +77,11 @@
def __init__(self, storage):
self.storage: StorageInterface = get_storage(**storage)
-
- def __getattr__(self, key):
- if key == "storage":
- raise AttributeError(key)
- return getattr(self.storage, key)
-
- @swh_retry
- def content_add(self, content: List[Content]) -> Dict:
- return self.storage.content_add(content)
-
- @swh_retry
- def content_add_metadata(self, content: List[Content]) -> Dict:
- return self.storage.content_add_metadata(content)
-
- @swh_retry
- def skipped_content_add(self, content: List[SkippedContent]) -> Dict:
- return self.storage.skipped_content_add(content)
-
- @swh_retry
- def origin_visit_add(self, visits: List[OriginVisit]) -> Iterable[OriginVisit]:
- return self.storage.origin_visit_add(visits)
-
- @swh_retry
- def metadata_fetcher_add(self, fetchers: List[MetadataFetcher],) -> None:
- return self.storage.metadata_fetcher_add(fetchers)
-
- @swh_retry
- def metadata_authority_add(self, authorities: List[MetadataAuthority]) -> None:
- return self.storage.metadata_authority_add(authorities)
-
- @swh_retry
- def raw_extrinsic_metadata_add(self, metadata: List[RawExtrinsicMetadata],) -> None:
- return self.storage.raw_extrinsic_metadata_add(metadata)
-
- @swh_retry
- def directory_add(self, directories: List[Directory]) -> Dict:
- return self.storage.directory_add(directories)
-
- @swh_retry
- def revision_add(self, revisions: List[Revision]) -> Dict:
- return self.storage.revision_add(revisions)
-
- @swh_retry
- def release_add(self, releases: List[Release]) -> Dict:
- return self.storage.release_add(releases)
-
- @swh_retry
- def snapshot_add(self, snapshots: List[Snapshot]) -> Dict:
- return self.storage.snapshot_add(snapshots)
+ for attribute_name in dir(StorageInterface):
+ if attribute_name.startswith("_"):
+ continue
+ attribute = getattr(self.storage, attribute_name)
+ if hasattr(attribute, "__call__"):
+ setattr(
+ self, attribute_name, retry_function(self.storage, attribute_name)
+ )
diff --git a/swh/storage/tests/test_retry.py b/swh/storage/tests/test_retry.py
--- a/swh/storage/tests/test_retry.py
+++ b/swh/storage/tests/test_retry.py
@@ -9,7 +9,6 @@
import psycopg2
import pytest
-from swh.model.model import MetadataTargetType
from swh.storage.exc import HashCollision, StorageArgumentException
from swh.storage.utils import now
@@ -180,645 +179,6 @@
assert mock_memory.call_count == 1
-def test_retrying_proxy_storage_skipped_content_add(swh_storage, sample_data):
- """Standard skipped_content_add works as before
-
- """
- sample_content = sample_data.skipped_content
- sample_content_dict = sample_content.to_dict()
-
- skipped_contents = list(swh_storage.skipped_content_missing([sample_content_dict]))
- assert len(skipped_contents) == 1
-
- s = swh_storage.skipped_content_add([sample_content])
- assert s == {
- "skipped_content:add": 1,
- }
-
- skipped_content = list(swh_storage.skipped_content_missing([sample_content_dict]))
- assert len(skipped_content) == 0
-
-
-def test_retrying_proxy_storage_skipped_content_add_with_retry(
- monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision
-):
- """Multiple retries for hash collision and psycopg2 error but finally ok
-
- """
- mock_memory = mocker.patch(
- "swh.storage.in_memory.InMemoryStorage.skipped_content_add"
- )
- mock_memory.side_effect = [
- # 1st & 2nd try goes ko
- fake_hash_collision,
- psycopg2.IntegrityError("skipped_content already inserted"),
- # ok then!
- {"skipped_content:add": 1},
- ]
-
- sample_content = sample_data.skipped_content
-
- s = swh_storage.skipped_content_add([sample_content])
- assert s == {"skipped_content:add": 1}
-
- mock_memory.assert_has_calls(
- [call([sample_content]), call([sample_content]), call([sample_content]),]
- )
-
-
-def test_retrying_proxy_swh_storage_skipped_content_add_failure(
- swh_storage, sample_data, mocker
-):
- """Unfiltered errors are raising without retry
-
- """
- mock_memory = mocker.patch(
- "swh.storage.in_memory.InMemoryStorage.skipped_content_add"
- )
- mock_memory.side_effect = StorageArgumentException(
- "Refuse to add content_metadata!"
- )
-
- sample_content = sample_data.skipped_content
- sample_content_dict = sample_content.to_dict()
-
- skipped_contents = list(swh_storage.skipped_content_missing([sample_content_dict]))
- assert len(skipped_contents) == 1
-
- with pytest.raises(StorageArgumentException, match="Refuse to add"):
- swh_storage.skipped_content_add([sample_content])
-
- skipped_contents = list(swh_storage.skipped_content_missing([sample_content_dict]))
- assert len(skipped_contents) == 1
-
- assert mock_memory.call_count == 1
-
-
-def test_retrying_proxy_swh_storage_origin_visit_add(swh_storage, sample_data):
- """Standard origin_visit_add works as before
-
- """
- origin = sample_data.origin
- visit = sample_data.origin_visit
- assert visit.origin == origin.url
-
- swh_storage.origin_add([origin])
-
- origins = swh_storage.origin_visit_get(origin.url).results
- assert not origins
-
- origin_visit = swh_storage.origin_visit_add([visit])[0]
- assert origin_visit.origin == origin.url
- assert isinstance(origin_visit.visit, int)
-
- actual_visit = swh_storage.origin_visit_get(origin.url).results[0]
- assert actual_visit == visit
-
-
-def test_retrying_proxy_swh_storage_origin_visit_add_retry(
- monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision
-):
- """Multiple retries for hash collision and psycopg2 error but finally ok
-
- """
- origin = sample_data.origin
- visit = sample_data.origin_visit
- assert visit.origin == origin.url
-
- swh_storage.origin_add([origin])
-
- mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.origin_visit_add")
- mock_memory.side_effect = [
- # first try goes ko
- fake_hash_collision,
- # second try goes ko
- psycopg2.IntegrityError("origin already inserted"),
- # ok then!
- [visit],
- ]
-
- origins = swh_storage.origin_visit_get(origin.url).results
- assert not origins
-
- r = swh_storage.origin_visit_add([visit])
- assert r == [visit]
-
- mock_memory.assert_has_calls(
- [call([visit]), call([visit]), call([visit]),]
- )
-
-
-def test_retrying_proxy_swh_storage_origin_visit_add_failure(
- swh_storage, sample_data, mocker
-):
- """Unfiltered errors are raising without retry
-
- """
- mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.origin_visit_add")
- mock_memory.side_effect = StorageArgumentException("Refuse to add origin always!")
-
- origin = sample_data.origin
- visit = sample_data.origin_visit
- assert visit.origin == origin.url
-
- origins = swh_storage.origin_visit_get(origin.url).results
- assert not origins
-
- with pytest.raises(StorageArgumentException, match="Refuse to add"):
- swh_storage.origin_visit_add([visit])
-
- mock_memory.assert_has_calls(
- [call([visit]),]
- )
-
-
-def test_retrying_proxy_storage_metadata_fetcher_add(swh_storage, sample_data):
- """Standard metadata_fetcher_add works as before
-
- """
- fetcher = sample_data.metadata_fetcher
-
- metadata_fetcher = swh_storage.metadata_fetcher_get(fetcher.name, fetcher.version)
- assert not metadata_fetcher
-
- swh_storage.metadata_fetcher_add([fetcher])
-
- actual_fetcher = swh_storage.metadata_fetcher_get(fetcher.name, fetcher.version)
- assert actual_fetcher == fetcher
-
-
-def test_retrying_proxy_storage_metadata_fetcher_add_with_retry(
- monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision,
-):
- """Multiple retries for hash collision and psycopg2 error but finally ok
-
- """
- fetcher = sample_data.metadata_fetcher
- mock_memory = mocker.patch(
- "swh.storage.in_memory.InMemoryStorage.metadata_fetcher_add"
- )
- mock_memory.side_effect = [
- # first try goes ko
- fake_hash_collision,
- # second try goes ko
- psycopg2.IntegrityError("metadata_fetcher already inserted"),
- # ok then!
- [fetcher],
- ]
-
- actual_fetcher = swh_storage.metadata_fetcher_get(fetcher.name, fetcher.version)
- assert not actual_fetcher
-
- swh_storage.metadata_fetcher_add([fetcher])
-
- mock_memory.assert_has_calls(
- [call([fetcher]), call([fetcher]), call([fetcher]),]
- )
-
-
-def test_retrying_proxy_swh_storage_metadata_fetcher_add_failure(
- swh_storage, sample_data, mocker
-):
- """Unfiltered errors are raising without retry
-
- """
- mock_memory = mocker.patch(
- "swh.storage.in_memory.InMemoryStorage.metadata_fetcher_add"
- )
- mock_memory.side_effect = StorageArgumentException(
- "Refuse to add metadata_fetcher always!"
- )
-
- fetcher = sample_data.metadata_fetcher
-
- actual_fetcher = swh_storage.metadata_fetcher_get(fetcher.name, fetcher.version)
- assert not actual_fetcher
-
- with pytest.raises(StorageArgumentException, match="Refuse to add"):
- swh_storage.metadata_fetcher_add([fetcher])
-
- assert mock_memory.call_count == 1
-
-
-def test_retrying_proxy_storage_metadata_authority_add(swh_storage, sample_data):
- """Standard metadata_authority_add works as before
-
- """
- authority = sample_data.metadata_authority
-
- assert not swh_storage.metadata_authority_get(authority.type, authority.url)
-
- swh_storage.metadata_authority_add([authority])
-
- actual_authority = swh_storage.metadata_authority_get(authority.type, authority.url)
- assert actual_authority == authority
-
-
-def test_retrying_proxy_storage_metadata_authority_add_with_retry(
- monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision,
-):
- """Multiple retries for hash collision and psycopg2 error but finally ok
-
- """
- authority = sample_data.metadata_authority
-
- mock_memory = mocker.patch(
- "swh.storage.in_memory.InMemoryStorage.metadata_authority_add"
- )
- mock_memory.side_effect = [
- # first try goes ko
- fake_hash_collision,
- # second try goes ko
- psycopg2.IntegrityError("foo bar"),
- # ok then!
- None,
- ]
-
- assert not swh_storage.metadata_authority_get(authority.type, authority.url)
-
- swh_storage.metadata_authority_add([authority])
-
- mock_memory.assert_has_calls(
- [call([authority]), call([authority]), call([authority])]
- )
-
-
-def test_retrying_proxy_swh_storage_metadata_authority_add_failure(
- swh_storage, sample_data, mocker
-):
- """Unfiltered errors are raising without retry
-
- """
- mock_memory = mocker.patch(
- "swh.storage.in_memory.InMemoryStorage.metadata_authority_add"
- )
- mock_memory.side_effect = StorageArgumentException(
- "Refuse to add authority_id always!"
- )
-
- authority = sample_data.metadata_authority
-
- swh_storage.metadata_authority_get(authority.type, authority.url)
-
- with pytest.raises(StorageArgumentException, match="Refuse to add"):
- swh_storage.metadata_authority_add([authority])
-
- assert mock_memory.call_count == 1
-
-
-def test_retrying_proxy_storage_raw_extrinsic_metadata_add(swh_storage, sample_data):
- """Standard raw_extrinsic_metadata_add works as before
-
- """
- origin = sample_data.origin
- ori_meta = sample_data.origin_metadata1
- assert origin.url == ori_meta.target
- swh_storage.origin_add([origin])
- swh_storage.metadata_authority_add([sample_data.metadata_authority])
- swh_storage.metadata_fetcher_add([sample_data.metadata_fetcher])
-
- origin_metadata = swh_storage.raw_extrinsic_metadata_get(
- MetadataTargetType.ORIGIN, ori_meta.target, ori_meta.authority
- )
- assert origin_metadata.next_page_token is None
- assert not origin_metadata.results
-
- swh_storage.raw_extrinsic_metadata_add([ori_meta])
-
- origin_metadata = swh_storage.raw_extrinsic_metadata_get(
- MetadataTargetType.ORIGIN, ori_meta.target, ori_meta.authority
- )
- assert origin_metadata
-
-
-def test_retrying_proxy_storage_raw_extrinsic_metadata_add_with_retry(
- monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision,
-):
- """Multiple retries for hash collision and psycopg2 error but finally ok
-
- """
- origin = sample_data.origin
- ori_meta = sample_data.origin_metadata1
- assert origin.url == ori_meta.target
- swh_storage.origin_add([origin])
- swh_storage.metadata_authority_add([sample_data.metadata_authority])
- swh_storage.metadata_fetcher_add([sample_data.metadata_fetcher])
- mock_memory = mocker.patch(
- "swh.storage.in_memory.InMemoryStorage.raw_extrinsic_metadata_add"
- )
-
- mock_memory.side_effect = [
- # first try goes ko
- fake_hash_collision,
- # second try goes ko
- psycopg2.IntegrityError("foo bar"),
- # ok then!
- None,
- ]
-
- # No exception raised as insertion finally came through
- swh_storage.raw_extrinsic_metadata_add([ori_meta])
-
- mock_memory.assert_has_calls(
- [ # 3 calls, as long as error raised
- call([ori_meta]),
- call([ori_meta]),
- call([ori_meta]),
- ]
- )
-
-
-def test_retrying_proxy_swh_storage_raw_extrinsic_metadata_add_failure(
- swh_storage, sample_data, mocker
-):
- """Unfiltered errors are raising without retry
-
- """
- mock_memory = mocker.patch(
- "swh.storage.in_memory.InMemoryStorage.raw_extrinsic_metadata_add"
- )
- mock_memory.side_effect = StorageArgumentException("Refuse to add always!")
-
- origin = sample_data.origin
- ori_meta = sample_data.origin_metadata1
- assert origin.url == ori_meta.target
- swh_storage.origin_add([origin])
-
- with pytest.raises(StorageArgumentException, match="Refuse to add"):
- swh_storage.raw_extrinsic_metadata_add([ori_meta])
-
- assert mock_memory.call_count == 1
-
-
-def test_retrying_proxy_storage_directory_add(swh_storage, sample_data):
- """Standard directory_add works as before
-
- """
- sample_dir = sample_data.directory
-
- s = swh_storage.directory_add([sample_dir])
- assert s == {
- "directory:add": 1,
- }
-
- directory_id = swh_storage.directory_get_random() # only 1
- assert directory_id == sample_dir.id
-
-
-def test_retrying_proxy_storage_directory_add_with_retry(
- monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision
-):
- """Multiple retries for hash collision and psycopg2 error but finally ok
-
- """
- mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.directory_add")
- mock_memory.side_effect = [
- # first try goes ko
- fake_hash_collision,
- # second try goes ko
- psycopg2.IntegrityError("directory already inserted"),
- # ok then!
- {"directory:add": 1},
- ]
-
- sample_dir = sample_data.directories[1]
-
- s = swh_storage.directory_add([sample_dir])
- assert s == {
- "directory:add": 1,
- }
-
- mock_memory.assert_has_calls(
- [call([sample_dir]), call([sample_dir]), call([sample_dir]),]
- )
-
-
-def test_retrying_proxy_swh_storage_directory_add_failure(
- swh_storage, sample_data, mocker
-):
- """Unfiltered errors are raising without retry
-
- """
- mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.directory_add")
- mock_memory.side_effect = StorageArgumentException(
- "Refuse to add directory always!"
- )
-
- sample_dir = sample_data.directory
-
- with pytest.raises(StorageArgumentException, match="Refuse to add"):
- swh_storage.directory_add([sample_dir])
-
- assert mock_memory.call_count == 1
-
-
-def test_retrying_proxy_storage_revision_add(swh_storage, sample_data):
- """Standard revision_add works as before
-
- """
- sample_rev = sample_data.revision
-
- revision = swh_storage.revision_get([sample_rev.id])[0]
- assert revision is None
-
- s = swh_storage.revision_add([sample_rev])
- assert s == {
- "revision:add": 1,
- }
-
- revision = swh_storage.revision_get([sample_rev.id])[0]
- assert revision == sample_rev
-
-
-def test_retrying_proxy_storage_revision_add_with_retry(
- monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision
-):
- """Multiple retries for hash collision and psycopg2 error but finally ok
-
- """
- mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.revision_add")
- mock_memory.side_effect = [
- # first try goes ko
- fake_hash_collision,
- # second try goes ko
- psycopg2.IntegrityError("revision already inserted"),
- # ok then!
- {"revision:add": 1},
- ]
-
- sample_rev = sample_data.revision
-
- revision = swh_storage.revision_get([sample_rev.id])[0]
- assert revision is None
-
- s = swh_storage.revision_add([sample_rev])
- assert s == {
- "revision:add": 1,
- }
-
- mock_memory.assert_has_calls(
- [call([sample_rev]), call([sample_rev]), call([sample_rev]),]
- )
-
-
-def test_retrying_proxy_swh_storage_revision_add_failure(
- swh_storage, sample_data, mocker
-):
- """Unfiltered errors are raising without retry
-
- """
- mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.revision_add")
- mock_memory.side_effect = StorageArgumentException("Refuse to add revision always!")
-
- sample_rev = sample_data.revision
-
- revision = swh_storage.revision_get([sample_rev.id])[0]
- assert revision is None
-
- with pytest.raises(StorageArgumentException, match="Refuse to add"):
- swh_storage.revision_add([sample_rev])
-
- assert mock_memory.call_count == 1
-
-
-def test_retrying_proxy_storage_release_add(swh_storage, sample_data):
- """Standard release_add works as before
-
- """
- sample_rel = sample_data.release
-
- release = swh_storage.release_get([sample_rel.id])[0]
- assert release is None
-
- s = swh_storage.release_add([sample_rel])
- assert s == {
- "release:add": 1,
- }
-
- release = swh_storage.release_get([sample_rel.id])[0]
- assert release == sample_rel
-
-
-def test_retrying_proxy_storage_release_add_with_retry(
- monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision
-):
- """Multiple retries for hash collision and psycopg2 error but finally ok
-
- """
- mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.release_add")
- mock_memory.side_effect = [
- # first try goes ko
- fake_hash_collision,
- # second try goes ko
- psycopg2.IntegrityError("release already inserted"),
- # ok then!
- {"release:add": 1},
- ]
-
- sample_rel = sample_data.release
-
- release = swh_storage.release_get([sample_rel.id])[0]
- assert release is None
-
- s = swh_storage.release_add([sample_rel])
- assert s == {
- "release:add": 1,
- }
-
- mock_memory.assert_has_calls(
- [call([sample_rel]), call([sample_rel]), call([sample_rel]),]
- )
-
-
-def test_retrying_proxy_swh_storage_release_add_failure(
- swh_storage, sample_data, mocker
-):
- """Unfiltered errors are raising without retry
-
- """
- mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.release_add")
- mock_memory.side_effect = StorageArgumentException("Refuse to add release always!")
-
- sample_rel = sample_data.release
-
- release = swh_storage.release_get([sample_rel.id])[0]
- assert release is None
-
- with pytest.raises(StorageArgumentException, match="Refuse to add"):
- swh_storage.release_add([sample_rel])
-
- assert mock_memory.call_count == 1
-
-
-def test_retrying_proxy_storage_snapshot_add(swh_storage, sample_data):
- """Standard snapshot_add works as before
-
- """
- sample_snap = sample_data.snapshot
-
- snapshot = swh_storage.snapshot_get(sample_snap.id)
- assert not snapshot
-
- s = swh_storage.snapshot_add([sample_snap])
- assert s == {
- "snapshot:add": 1,
- }
-
- snapshot = swh_storage.snapshot_get(sample_snap.id)
- assert snapshot["id"] == sample_snap.id
-
-
-def test_retrying_proxy_storage_snapshot_add_with_retry(
- monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision
-):
- """Multiple retries for hash collision and psycopg2 error but finally ok
-
- """
- mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.snapshot_add")
- mock_memory.side_effect = [
- # first try goes ko
- fake_hash_collision,
- # second try goes ko
- psycopg2.IntegrityError("snapshot already inserted"),
- # ok then!
- {"snapshot:add": 1},
- ]
-
- sample_snap = sample_data.snapshot
-
- snapshot = swh_storage.snapshot_get(sample_snap.id)
- assert not snapshot
-
- s = swh_storage.snapshot_add([sample_snap])
- assert s == {
- "snapshot:add": 1,
- }
-
- mock_memory.assert_has_calls(
- [call([sample_snap]), call([sample_snap]), call([sample_snap]),]
- )
-
-
-def test_retrying_proxy_swh_storage_snapshot_add_failure(
- swh_storage, sample_data, mocker
-):
- """Unfiltered errors are raising without retry
-
- """
- mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.snapshot_add")
- mock_memory.side_effect = StorageArgumentException("Refuse to add snapshot always!")
-
- sample_snap = sample_data.snapshot
-
- snapshot = swh_storage.snapshot_get(sample_snap.id)
- assert not snapshot
-
- with pytest.raises(StorageArgumentException, match="Refuse to add"):
- swh_storage.snapshot_add([sample_snap])
-
- assert mock_memory.call_count == 1
-
-
def test_retrying_proxy_swh_storage_keyboardinterrupt(swh_storage, sample_data, mocker):
"""Unfiltered errors are raising without retry
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Wed, Dec 18, 11:10 AM (5 d, 20 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3232215
Attached To
D4434: Make the retry proxy work on all functions.
Event Timeline
Log In to Comment