Page MenuHomeSoftware Heritage

D4434.diff
No OneTemporary

D4434.diff

diff --git a/swh/storage/retry.py b/swh/storage/retry.py
--- a/swh/storage/retry.py
+++ b/swh/storage/retry.py
@@ -5,22 +5,9 @@
import logging
import traceback
-from typing import Dict, Iterable, List
from tenacity import retry, stop_after_attempt, wait_random_exponential
-from swh.model.model import (
- Content,
- Directory,
- MetadataAuthority,
- MetadataFetcher,
- OriginVisit,
- RawExtrinsicMetadata,
- Release,
- Revision,
- SkippedContent,
- Snapshot,
-)
from swh.storage import get_storage
from swh.storage.exc import StorageArgumentException
from swh.storage.interface import StorageInterface
@@ -53,7 +40,7 @@
else:
error_name = error.__class__.__name__
logger.warning(
- "Retry adding a batch",
+ "Retrying RPC call",
exc_info=False,
extra={
"swh_type": "storage_retry",
@@ -74,6 +61,14 @@
)
+def retry_function(storage, attribute_name):
+ @swh_retry
+ def newf(*args, **kwargs):
+ return getattr(storage, attribute_name)(*args, **kwargs)
+
+ return newf
+
+
class RetryingProxyStorage:
"""Storage implementation which retries adding objects when it specifically
fails (hash collision, integrity error).
@@ -82,52 +77,11 @@
def __init__(self, storage):
self.storage: StorageInterface = get_storage(**storage)
-
- def __getattr__(self, key):
- if key == "storage":
- raise AttributeError(key)
- return getattr(self.storage, key)
-
- @swh_retry
- def content_add(self, content: List[Content]) -> Dict:
- return self.storage.content_add(content)
-
- @swh_retry
- def content_add_metadata(self, content: List[Content]) -> Dict:
- return self.storage.content_add_metadata(content)
-
- @swh_retry
- def skipped_content_add(self, content: List[SkippedContent]) -> Dict:
- return self.storage.skipped_content_add(content)
-
- @swh_retry
- def origin_visit_add(self, visits: List[OriginVisit]) -> Iterable[OriginVisit]:
- return self.storage.origin_visit_add(visits)
-
- @swh_retry
- def metadata_fetcher_add(self, fetchers: List[MetadataFetcher],) -> None:
- return self.storage.metadata_fetcher_add(fetchers)
-
- @swh_retry
- def metadata_authority_add(self, authorities: List[MetadataAuthority]) -> None:
- return self.storage.metadata_authority_add(authorities)
-
- @swh_retry
- def raw_extrinsic_metadata_add(self, metadata: List[RawExtrinsicMetadata],) -> None:
- return self.storage.raw_extrinsic_metadata_add(metadata)
-
- @swh_retry
- def directory_add(self, directories: List[Directory]) -> Dict:
- return self.storage.directory_add(directories)
-
- @swh_retry
- def revision_add(self, revisions: List[Revision]) -> Dict:
- return self.storage.revision_add(revisions)
-
- @swh_retry
- def release_add(self, releases: List[Release]) -> Dict:
- return self.storage.release_add(releases)
-
- @swh_retry
- def snapshot_add(self, snapshots: List[Snapshot]) -> Dict:
- return self.storage.snapshot_add(snapshots)
+ for attribute_name in dir(StorageInterface):
+ if attribute_name.startswith("_"):
+ continue
+ attribute = getattr(self.storage, attribute_name)
+ if hasattr(attribute, "__call__"):
+ setattr(
+ self, attribute_name, retry_function(self.storage, attribute_name)
+ )
diff --git a/swh/storage/tests/test_retry.py b/swh/storage/tests/test_retry.py
--- a/swh/storage/tests/test_retry.py
+++ b/swh/storage/tests/test_retry.py
@@ -9,7 +9,6 @@
import psycopg2
import pytest
-from swh.model.model import MetadataTargetType
from swh.storage.exc import HashCollision, StorageArgumentException
from swh.storage.utils import now
@@ -180,645 +179,6 @@
assert mock_memory.call_count == 1
-def test_retrying_proxy_storage_skipped_content_add(swh_storage, sample_data):
- """Standard skipped_content_add works as before
-
- """
- sample_content = sample_data.skipped_content
- sample_content_dict = sample_content.to_dict()
-
- skipped_contents = list(swh_storage.skipped_content_missing([sample_content_dict]))
- assert len(skipped_contents) == 1
-
- s = swh_storage.skipped_content_add([sample_content])
- assert s == {
- "skipped_content:add": 1,
- }
-
- skipped_content = list(swh_storage.skipped_content_missing([sample_content_dict]))
- assert len(skipped_content) == 0
-
-
-def test_retrying_proxy_storage_skipped_content_add_with_retry(
- monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision
-):
- """Multiple retries for hash collision and psycopg2 error but finally ok
-
- """
- mock_memory = mocker.patch(
- "swh.storage.in_memory.InMemoryStorage.skipped_content_add"
- )
- mock_memory.side_effect = [
- # 1st & 2nd try goes ko
- fake_hash_collision,
- psycopg2.IntegrityError("skipped_content already inserted"),
- # ok then!
- {"skipped_content:add": 1},
- ]
-
- sample_content = sample_data.skipped_content
-
- s = swh_storage.skipped_content_add([sample_content])
- assert s == {"skipped_content:add": 1}
-
- mock_memory.assert_has_calls(
- [call([sample_content]), call([sample_content]), call([sample_content]),]
- )
-
-
-def test_retrying_proxy_swh_storage_skipped_content_add_failure(
- swh_storage, sample_data, mocker
-):
- """Unfiltered errors are raising without retry
-
- """
- mock_memory = mocker.patch(
- "swh.storage.in_memory.InMemoryStorage.skipped_content_add"
- )
- mock_memory.side_effect = StorageArgumentException(
- "Refuse to add content_metadata!"
- )
-
- sample_content = sample_data.skipped_content
- sample_content_dict = sample_content.to_dict()
-
- skipped_contents = list(swh_storage.skipped_content_missing([sample_content_dict]))
- assert len(skipped_contents) == 1
-
- with pytest.raises(StorageArgumentException, match="Refuse to add"):
- swh_storage.skipped_content_add([sample_content])
-
- skipped_contents = list(swh_storage.skipped_content_missing([sample_content_dict]))
- assert len(skipped_contents) == 1
-
- assert mock_memory.call_count == 1
-
-
-def test_retrying_proxy_swh_storage_origin_visit_add(swh_storage, sample_data):
- """Standard origin_visit_add works as before
-
- """
- origin = sample_data.origin
- visit = sample_data.origin_visit
- assert visit.origin == origin.url
-
- swh_storage.origin_add([origin])
-
- origins = swh_storage.origin_visit_get(origin.url).results
- assert not origins
-
- origin_visit = swh_storage.origin_visit_add([visit])[0]
- assert origin_visit.origin == origin.url
- assert isinstance(origin_visit.visit, int)
-
- actual_visit = swh_storage.origin_visit_get(origin.url).results[0]
- assert actual_visit == visit
-
-
-def test_retrying_proxy_swh_storage_origin_visit_add_retry(
- monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision
-):
- """Multiple retries for hash collision and psycopg2 error but finally ok
-
- """
- origin = sample_data.origin
- visit = sample_data.origin_visit
- assert visit.origin == origin.url
-
- swh_storage.origin_add([origin])
-
- mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.origin_visit_add")
- mock_memory.side_effect = [
- # first try goes ko
- fake_hash_collision,
- # second try goes ko
- psycopg2.IntegrityError("origin already inserted"),
- # ok then!
- [visit],
- ]
-
- origins = swh_storage.origin_visit_get(origin.url).results
- assert not origins
-
- r = swh_storage.origin_visit_add([visit])
- assert r == [visit]
-
- mock_memory.assert_has_calls(
- [call([visit]), call([visit]), call([visit]),]
- )
-
-
-def test_retrying_proxy_swh_storage_origin_visit_add_failure(
- swh_storage, sample_data, mocker
-):
- """Unfiltered errors are raising without retry
-
- """
- mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.origin_visit_add")
- mock_memory.side_effect = StorageArgumentException("Refuse to add origin always!")
-
- origin = sample_data.origin
- visit = sample_data.origin_visit
- assert visit.origin == origin.url
-
- origins = swh_storage.origin_visit_get(origin.url).results
- assert not origins
-
- with pytest.raises(StorageArgumentException, match="Refuse to add"):
- swh_storage.origin_visit_add([visit])
-
- mock_memory.assert_has_calls(
- [call([visit]),]
- )
-
-
-def test_retrying_proxy_storage_metadata_fetcher_add(swh_storage, sample_data):
- """Standard metadata_fetcher_add works as before
-
- """
- fetcher = sample_data.metadata_fetcher
-
- metadata_fetcher = swh_storage.metadata_fetcher_get(fetcher.name, fetcher.version)
- assert not metadata_fetcher
-
- swh_storage.metadata_fetcher_add([fetcher])
-
- actual_fetcher = swh_storage.metadata_fetcher_get(fetcher.name, fetcher.version)
- assert actual_fetcher == fetcher
-
-
-def test_retrying_proxy_storage_metadata_fetcher_add_with_retry(
- monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision,
-):
- """Multiple retries for hash collision and psycopg2 error but finally ok
-
- """
- fetcher = sample_data.metadata_fetcher
- mock_memory = mocker.patch(
- "swh.storage.in_memory.InMemoryStorage.metadata_fetcher_add"
- )
- mock_memory.side_effect = [
- # first try goes ko
- fake_hash_collision,
- # second try goes ko
- psycopg2.IntegrityError("metadata_fetcher already inserted"),
- # ok then!
- [fetcher],
- ]
-
- actual_fetcher = swh_storage.metadata_fetcher_get(fetcher.name, fetcher.version)
- assert not actual_fetcher
-
- swh_storage.metadata_fetcher_add([fetcher])
-
- mock_memory.assert_has_calls(
- [call([fetcher]), call([fetcher]), call([fetcher]),]
- )
-
-
-def test_retrying_proxy_swh_storage_metadata_fetcher_add_failure(
- swh_storage, sample_data, mocker
-):
- """Unfiltered errors are raising without retry
-
- """
- mock_memory = mocker.patch(
- "swh.storage.in_memory.InMemoryStorage.metadata_fetcher_add"
- )
- mock_memory.side_effect = StorageArgumentException(
- "Refuse to add metadata_fetcher always!"
- )
-
- fetcher = sample_data.metadata_fetcher
-
- actual_fetcher = swh_storage.metadata_fetcher_get(fetcher.name, fetcher.version)
- assert not actual_fetcher
-
- with pytest.raises(StorageArgumentException, match="Refuse to add"):
- swh_storage.metadata_fetcher_add([fetcher])
-
- assert mock_memory.call_count == 1
-
-
-def test_retrying_proxy_storage_metadata_authority_add(swh_storage, sample_data):
- """Standard metadata_authority_add works as before
-
- """
- authority = sample_data.metadata_authority
-
- assert not swh_storage.metadata_authority_get(authority.type, authority.url)
-
- swh_storage.metadata_authority_add([authority])
-
- actual_authority = swh_storage.metadata_authority_get(authority.type, authority.url)
- assert actual_authority == authority
-
-
-def test_retrying_proxy_storage_metadata_authority_add_with_retry(
- monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision,
-):
- """Multiple retries for hash collision and psycopg2 error but finally ok
-
- """
- authority = sample_data.metadata_authority
-
- mock_memory = mocker.patch(
- "swh.storage.in_memory.InMemoryStorage.metadata_authority_add"
- )
- mock_memory.side_effect = [
- # first try goes ko
- fake_hash_collision,
- # second try goes ko
- psycopg2.IntegrityError("foo bar"),
- # ok then!
- None,
- ]
-
- assert not swh_storage.metadata_authority_get(authority.type, authority.url)
-
- swh_storage.metadata_authority_add([authority])
-
- mock_memory.assert_has_calls(
- [call([authority]), call([authority]), call([authority])]
- )
-
-
-def test_retrying_proxy_swh_storage_metadata_authority_add_failure(
- swh_storage, sample_data, mocker
-):
- """Unfiltered errors are raising without retry
-
- """
- mock_memory = mocker.patch(
- "swh.storage.in_memory.InMemoryStorage.metadata_authority_add"
- )
- mock_memory.side_effect = StorageArgumentException(
- "Refuse to add authority_id always!"
- )
-
- authority = sample_data.metadata_authority
-
- swh_storage.metadata_authority_get(authority.type, authority.url)
-
- with pytest.raises(StorageArgumentException, match="Refuse to add"):
- swh_storage.metadata_authority_add([authority])
-
- assert mock_memory.call_count == 1
-
-
-def test_retrying_proxy_storage_raw_extrinsic_metadata_add(swh_storage, sample_data):
- """Standard raw_extrinsic_metadata_add works as before
-
- """
- origin = sample_data.origin
- ori_meta = sample_data.origin_metadata1
- assert origin.url == ori_meta.target
- swh_storage.origin_add([origin])
- swh_storage.metadata_authority_add([sample_data.metadata_authority])
- swh_storage.metadata_fetcher_add([sample_data.metadata_fetcher])
-
- origin_metadata = swh_storage.raw_extrinsic_metadata_get(
- MetadataTargetType.ORIGIN, ori_meta.target, ori_meta.authority
- )
- assert origin_metadata.next_page_token is None
- assert not origin_metadata.results
-
- swh_storage.raw_extrinsic_metadata_add([ori_meta])
-
- origin_metadata = swh_storage.raw_extrinsic_metadata_get(
- MetadataTargetType.ORIGIN, ori_meta.target, ori_meta.authority
- )
- assert origin_metadata
-
-
-def test_retrying_proxy_storage_raw_extrinsic_metadata_add_with_retry(
- monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision,
-):
- """Multiple retries for hash collision and psycopg2 error but finally ok
-
- """
- origin = sample_data.origin
- ori_meta = sample_data.origin_metadata1
- assert origin.url == ori_meta.target
- swh_storage.origin_add([origin])
- swh_storage.metadata_authority_add([sample_data.metadata_authority])
- swh_storage.metadata_fetcher_add([sample_data.metadata_fetcher])
- mock_memory = mocker.patch(
- "swh.storage.in_memory.InMemoryStorage.raw_extrinsic_metadata_add"
- )
-
- mock_memory.side_effect = [
- # first try goes ko
- fake_hash_collision,
- # second try goes ko
- psycopg2.IntegrityError("foo bar"),
- # ok then!
- None,
- ]
-
- # No exception raised as insertion finally came through
- swh_storage.raw_extrinsic_metadata_add([ori_meta])
-
- mock_memory.assert_has_calls(
- [ # 3 calls, as long as error raised
- call([ori_meta]),
- call([ori_meta]),
- call([ori_meta]),
- ]
- )
-
-
-def test_retrying_proxy_swh_storage_raw_extrinsic_metadata_add_failure(
- swh_storage, sample_data, mocker
-):
- """Unfiltered errors are raising without retry
-
- """
- mock_memory = mocker.patch(
- "swh.storage.in_memory.InMemoryStorage.raw_extrinsic_metadata_add"
- )
- mock_memory.side_effect = StorageArgumentException("Refuse to add always!")
-
- origin = sample_data.origin
- ori_meta = sample_data.origin_metadata1
- assert origin.url == ori_meta.target
- swh_storage.origin_add([origin])
-
- with pytest.raises(StorageArgumentException, match="Refuse to add"):
- swh_storage.raw_extrinsic_metadata_add([ori_meta])
-
- assert mock_memory.call_count == 1
-
-
-def test_retrying_proxy_storage_directory_add(swh_storage, sample_data):
- """Standard directory_add works as before
-
- """
- sample_dir = sample_data.directory
-
- s = swh_storage.directory_add([sample_dir])
- assert s == {
- "directory:add": 1,
- }
-
- directory_id = swh_storage.directory_get_random() # only 1
- assert directory_id == sample_dir.id
-
-
-def test_retrying_proxy_storage_directory_add_with_retry(
- monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision
-):
- """Multiple retries for hash collision and psycopg2 error but finally ok
-
- """
- mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.directory_add")
- mock_memory.side_effect = [
- # first try goes ko
- fake_hash_collision,
- # second try goes ko
- psycopg2.IntegrityError("directory already inserted"),
- # ok then!
- {"directory:add": 1},
- ]
-
- sample_dir = sample_data.directories[1]
-
- s = swh_storage.directory_add([sample_dir])
- assert s == {
- "directory:add": 1,
- }
-
- mock_memory.assert_has_calls(
- [call([sample_dir]), call([sample_dir]), call([sample_dir]),]
- )
-
-
-def test_retrying_proxy_swh_storage_directory_add_failure(
- swh_storage, sample_data, mocker
-):
- """Unfiltered errors are raising without retry
-
- """
- mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.directory_add")
- mock_memory.side_effect = StorageArgumentException(
- "Refuse to add directory always!"
- )
-
- sample_dir = sample_data.directory
-
- with pytest.raises(StorageArgumentException, match="Refuse to add"):
- swh_storage.directory_add([sample_dir])
-
- assert mock_memory.call_count == 1
-
-
-def test_retrying_proxy_storage_revision_add(swh_storage, sample_data):
- """Standard revision_add works as before
-
- """
- sample_rev = sample_data.revision
-
- revision = swh_storage.revision_get([sample_rev.id])[0]
- assert revision is None
-
- s = swh_storage.revision_add([sample_rev])
- assert s == {
- "revision:add": 1,
- }
-
- revision = swh_storage.revision_get([sample_rev.id])[0]
- assert revision == sample_rev
-
-
-def test_retrying_proxy_storage_revision_add_with_retry(
- monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision
-):
- """Multiple retries for hash collision and psycopg2 error but finally ok
-
- """
- mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.revision_add")
- mock_memory.side_effect = [
- # first try goes ko
- fake_hash_collision,
- # second try goes ko
- psycopg2.IntegrityError("revision already inserted"),
- # ok then!
- {"revision:add": 1},
- ]
-
- sample_rev = sample_data.revision
-
- revision = swh_storage.revision_get([sample_rev.id])[0]
- assert revision is None
-
- s = swh_storage.revision_add([sample_rev])
- assert s == {
- "revision:add": 1,
- }
-
- mock_memory.assert_has_calls(
- [call([sample_rev]), call([sample_rev]), call([sample_rev]),]
- )
-
-
-def test_retrying_proxy_swh_storage_revision_add_failure(
- swh_storage, sample_data, mocker
-):
- """Unfiltered errors are raising without retry
-
- """
- mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.revision_add")
- mock_memory.side_effect = StorageArgumentException("Refuse to add revision always!")
-
- sample_rev = sample_data.revision
-
- revision = swh_storage.revision_get([sample_rev.id])[0]
- assert revision is None
-
- with pytest.raises(StorageArgumentException, match="Refuse to add"):
- swh_storage.revision_add([sample_rev])
-
- assert mock_memory.call_count == 1
-
-
-def test_retrying_proxy_storage_release_add(swh_storage, sample_data):
- """Standard release_add works as before
-
- """
- sample_rel = sample_data.release
-
- release = swh_storage.release_get([sample_rel.id])[0]
- assert release is None
-
- s = swh_storage.release_add([sample_rel])
- assert s == {
- "release:add": 1,
- }
-
- release = swh_storage.release_get([sample_rel.id])[0]
- assert release == sample_rel
-
-
-def test_retrying_proxy_storage_release_add_with_retry(
- monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision
-):
- """Multiple retries for hash collision and psycopg2 error but finally ok
-
- """
- mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.release_add")
- mock_memory.side_effect = [
- # first try goes ko
- fake_hash_collision,
- # second try goes ko
- psycopg2.IntegrityError("release already inserted"),
- # ok then!
- {"release:add": 1},
- ]
-
- sample_rel = sample_data.release
-
- release = swh_storage.release_get([sample_rel.id])[0]
- assert release is None
-
- s = swh_storage.release_add([sample_rel])
- assert s == {
- "release:add": 1,
- }
-
- mock_memory.assert_has_calls(
- [call([sample_rel]), call([sample_rel]), call([sample_rel]),]
- )
-
-
-def test_retrying_proxy_swh_storage_release_add_failure(
- swh_storage, sample_data, mocker
-):
- """Unfiltered errors are raising without retry
-
- """
- mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.release_add")
- mock_memory.side_effect = StorageArgumentException("Refuse to add release always!")
-
- sample_rel = sample_data.release
-
- release = swh_storage.release_get([sample_rel.id])[0]
- assert release is None
-
- with pytest.raises(StorageArgumentException, match="Refuse to add"):
- swh_storage.release_add([sample_rel])
-
- assert mock_memory.call_count == 1
-
-
-def test_retrying_proxy_storage_snapshot_add(swh_storage, sample_data):
- """Standard snapshot_add works as before
-
- """
- sample_snap = sample_data.snapshot
-
- snapshot = swh_storage.snapshot_get(sample_snap.id)
- assert not snapshot
-
- s = swh_storage.snapshot_add([sample_snap])
- assert s == {
- "snapshot:add": 1,
- }
-
- snapshot = swh_storage.snapshot_get(sample_snap.id)
- assert snapshot["id"] == sample_snap.id
-
-
-def test_retrying_proxy_storage_snapshot_add_with_retry(
- monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision
-):
- """Multiple retries for hash collision and psycopg2 error but finally ok
-
- """
- mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.snapshot_add")
- mock_memory.side_effect = [
- # first try goes ko
- fake_hash_collision,
- # second try goes ko
- psycopg2.IntegrityError("snapshot already inserted"),
- # ok then!
- {"snapshot:add": 1},
- ]
-
- sample_snap = sample_data.snapshot
-
- snapshot = swh_storage.snapshot_get(sample_snap.id)
- assert not snapshot
-
- s = swh_storage.snapshot_add([sample_snap])
- assert s == {
- "snapshot:add": 1,
- }
-
- mock_memory.assert_has_calls(
- [call([sample_snap]), call([sample_snap]), call([sample_snap]),]
- )
-
-
-def test_retrying_proxy_swh_storage_snapshot_add_failure(
- swh_storage, sample_data, mocker
-):
- """Unfiltered errors are raising without retry
-
- """
- mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.snapshot_add")
- mock_memory.side_effect = StorageArgumentException("Refuse to add snapshot always!")
-
- sample_snap = sample_data.snapshot
-
- snapshot = swh_storage.snapshot_get(sample_snap.id)
- assert not snapshot
-
- with pytest.raises(StorageArgumentException, match="Refuse to add"):
- swh_storage.snapshot_add([sample_snap])
-
- assert mock_memory.call_count == 1
-
-
def test_retrying_proxy_swh_storage_keyboardinterrupt(swh_storage, sample_data, mocker):
"""Unfiltered errors are raising without retry

File Metadata

Mime Type
text/plain
Expires
Wed, Dec 18, 11:10 AM (1 d, 10 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3232215

Event Timeline