Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7122904
D2511.id8923.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
6 KB
Subscribers
None
D2511.id8923.diff
View Options
diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -24,6 +24,9 @@
[mypy-pytest.*]
ignore_missing_imports = True
+[mypy-retrying.*]
+ignore_missing_imports = True
+
# temporary work-around for landing typing support in spite of the current
# journal<->storage dependency loop
[mypy-swh.journal.*]
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,5 +1,6 @@
hypothesis >= 3.11.0
pytest
+pytest-mock
pytest-postgresql >= 2.1.0
sqlalchemy-stubs
# pytz is in fact a dep of swh.model[testing] and should not be necessary, but
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -4,3 +4,4 @@
python-dateutil
vcversioner
aiohttp
+retrying
diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py
--- a/swh/storage/__init__.py
+++ b/swh/storage/__init__.py
@@ -15,7 +15,8 @@
STORAGE_IMPLEMENTATION = {
- 'pipeline', 'local', 'remote', 'memory', 'filter', 'buffer'}
+ 'pipeline', 'local', 'remote', 'memory', 'filter', 'buffer', 'retry',
+}
def get_storage(cls, **kwargs):
@@ -58,6 +59,8 @@
from .filter import FilteringProxyStorage as Storage
elif cls == 'buffer':
from .buffer import BufferingProxyStorage as Storage
+ elif cls == 'retry':
+ from .retrying import RetryingProxyStorage as Storage
return Storage(**kwargs)
diff --git a/swh/storage/retrying.py b/swh/storage/retrying.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/retrying.py
@@ -0,0 +1,61 @@
+# Copyright (C) 2019-2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import logging
+import psycopg2
+import traceback
+
+from typing import Dict, List
+
+from retrying import retry
+
+from swh.storage import get_storage, HashCollision
+
+
+logger = logging.getLogger(__name__)
+
+
+def retry_loading(error):
+ """Retry policy when some kind of failures occur (database integrity error,
+ hash collision, etc...)
+
+ """
+ exception_classes = [
+ # raised when two parallel insertions insert the same data.
+ psycopg2.IntegrityError,
+ HashCollision,
+ ]
+
+ if not any(isinstance(error, exc) for exc in exception_classes):
+ return False
+
+ error_name = error.__module__ + '.' + error.__class__.__name__
+ logger.warning('Retry loading a batch', exc_info=False, extra={
+ 'swh_type': 'storage_retry',
+ 'swh_exception_type': error_name,
+ 'swh_exception': traceback.format_exception(
+ error.__class__,
+ error,
+ error.__traceback__,
+ ),
+ })
+
+ return True
+
+
+class RetryingProxyStorage:
+ """Retrying proxy storage. For the storage's `_add` methods, retry to add
+ data when failing.
+
+ """
+ def __init__(self, storage):
+ self.storage = get_storage(**storage)
+
+ def __getattr__(self, key):
+ return getattr(self.storage, key)
+
+ @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3)
+ def content_add(self, content: List[Dict]) -> Dict:
+ return self.storage.content_add(content)
diff --git a/swh/storage/tests/test_init.py b/swh/storage/tests/test_init.py
--- a/swh/storage/tests/test_init.py
+++ b/swh/storage/tests/test_init.py
@@ -14,6 +14,7 @@
from swh.storage.in_memory import Storage as MemoryStorage
from swh.storage.buffer import BufferingProxyStorage
from swh.storage.filter import FilteringProxyStorage
+from swh.storage.retrying import RetryingProxyStorage
@patch('swh.storage.storage.psycopg2.pool')
@@ -36,6 +37,9 @@
('buffer', BufferingProxyStorage, {'storage': {
'cls': 'memory'}
}),
+ ('retry', RetryingProxyStorage, {'storage': {
+ 'cls': 'memory'}
+ }),
]:
actual_storage = get_storage(cls, **dummy_args)
assert actual_storage is not None
diff --git a/swh/storage/tests/test_retrying.py b/swh/storage/tests/test_retrying.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/tests/test_retrying.py
@@ -0,0 +1,86 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import psycopg2
+import pytest
+
+from swh.storage import HashCollision
+from swh.storage.retrying import RetryingProxyStorage
+
+
+def test_retrying_proxy_storage_content(sample_data):
+ """Standard content_add works as before
+
+ """
+ sample_content = sample_data['content'][0]
+ storage = RetryingProxyStorage(storage={'cls': 'memory'})
+
+ content = next(storage.content_get([sample_content['sha1']]))
+ assert not content
+
+ s = storage.content_add([sample_content])
+ assert s == {
+ 'content:add': 1,
+ 'content:add:bytes': sample_content['length'],
+ 'skipped_content:add': 0
+ }
+
+
+count_local_content_add_call = 0
+
+
+def _count_content_add(content):
+ """Mimic content_add with failures
+
+ """
+ global count_local_content_add_call
+ count_local_content_add_call += 1
+ error = {
+ # first try
+ 1: HashCollision('Refuse to add content'),
+ # second try
+ 2: psycopg2.IntegrityError('Refuse differently to add content'),
+ }
+ exception = error.get(count_local_content_add_call)
+ if exception: # we raise exceptions
+ raise exception
+ # at last, we accept to add content
+ return {'content:add': 1}
+
+
+def test_retrying_proxy_storage_with_retry(sample_data, mocker):
+ """Multiple retries for hash collision and psycopg2 error
+
+ """
+ mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add')
+ mock_memory.side_effect = _count_content_add
+
+ sample_content = sample_data['content'][0]
+ storage = RetryingProxyStorage(storage={'cls': 'memory'})
+
+ content = next(storage.content_get([sample_content['sha1']]))
+ assert not content
+
+ s = storage.content_add([sample_content])
+ assert s == {
+ 'content:add': 1,
+ }
+
+
+def test_retrying_proxy_storage_failure_to_add(sample_data, mocker):
+ """Other errors are raising as usual
+
+ """
+ mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add')
+ mock_memory.side_effect = ValueError('Refuse to add')
+
+ sample_content = sample_data['content'][0]
+ storage = RetryingProxyStorage(storage={'cls': 'memory'})
+
+ content = next(storage.content_get([sample_content['sha1']]))
+ assert not content
+
+ with pytest.raises(ValueError, match='Refuse to add'):
+ storage.content_add([sample_content])
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, Dec 17, 10:54 AM (2 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3232900
Attached To
D2511: storage: Add proxy storage with retry policy
Event Timeline
Log In to Comment