Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7123017
D2511.id8929.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
7 KB
Subscribers
None
D2511.id8929.diff
View Options
diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -24,6 +24,9 @@
[mypy-pytest.*]
ignore_missing_imports = True
+[mypy-retrying.*]
+ignore_missing_imports = True
+
# temporary work-around for landing typing support in spite of the current
# journal<->storage dependency loop
[mypy-swh.journal.*]
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,5 +1,6 @@
hypothesis >= 3.11.0
pytest
+pytest-mock
pytest-postgresql >= 2.1.0
sqlalchemy-stubs
# pytz is in fact a dep of swh.model[testing] and should not be necessary, but
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -4,3 +4,4 @@
python-dateutil
vcversioner
aiohttp
+retrying
diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py
--- a/swh/storage/__init__.py
+++ b/swh/storage/__init__.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2015-2016 The Software Heritage developers
+# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -15,7 +15,8 @@
STORAGE_IMPLEMENTATION = {
- 'pipeline', 'local', 'remote', 'memory', 'filter', 'buffer'}
+ 'pipeline', 'local', 'remote', 'memory', 'filter', 'buffer', 'retry',
+}
def get_storage(cls, **kwargs):
@@ -58,6 +59,8 @@
from .filter import FilteringProxyStorage as Storage
elif cls == 'buffer':
from .buffer import BufferingProxyStorage as Storage
+ elif cls == 'retry':
+ from .retry import RetryingProxyStorage as Storage
return Storage(**kwargs)
diff --git a/swh/storage/retry.py b/swh/storage/retry.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/retry.py
@@ -0,0 +1,60 @@
+# Copyright (C) 2019-2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import logging
+import psycopg2
+import traceback
+
+from typing import Dict, List
+
+from retrying import retry
+
+from swh.storage import get_storage, HashCollision
+
+
+logger = logging.getLogger(__name__)
+
+
+RETRY_EXCEPTIONS = [
+ # raised when two parallel insertions insert the same data
+ psycopg2.IntegrityError,
+ HashCollision,
+]
+
+
+def should_retry_adding(error: Exception) -> bool:
+ """Retry policy when some kind of failures occur (database integrity error,
+ hash collision, etc...)
+
+ """
+ retry = any(isinstance(error, exc) for exc in RETRY_EXCEPTIONS)
+ if retry:
+ error_name = error.__module__ + '.' + error.__class__.__name__
+ logger.warning('Retry adding a batch', exc_info=False, extra={
+ 'swh_type': 'storage_retry',
+ 'swh_exception_type': error_name,
+ 'swh_exception': traceback.format_exception(
+ error.__class__,
+ error,
+ error.__traceback__,
+ ),
+ })
+ return retry
+
+
+class RetryingProxyStorage:
+ """Storage implementation which retries adding objects when it specifically
+ fails (hash collision, integrity error).
+
+ """
+ def __init__(self, storage):
+ self.storage = get_storage(**storage)
+
+ def __getattr__(self, key):
+ return getattr(self.storage, key)
+
+ @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3)
+ def content_add(self, content: List[Dict]) -> Dict:
+ return self.storage.content_add(content)
diff --git a/swh/storage/tests/test_init.py b/swh/storage/tests/test_init.py
--- a/swh/storage/tests/test_init.py
+++ b/swh/storage/tests/test_init.py
@@ -14,6 +14,7 @@
from swh.storage.in_memory import Storage as MemoryStorage
from swh.storage.buffer import BufferingProxyStorage
from swh.storage.filter import FilteringProxyStorage
+from swh.storage.retry import RetryingProxyStorage
@patch('swh.storage.storage.psycopg2.pool')
@@ -36,6 +37,9 @@
('buffer', BufferingProxyStorage, {'storage': {
'cls': 'memory'}
}),
+ ('retry', RetryingProxyStorage, {'storage': {
+ 'cls': 'memory'}
+ }),
]:
actual_storage = get_storage(cls, **dummy_args)
assert actual_storage is not None
diff --git a/swh/storage/tests/test_retry.py b/swh/storage/tests/test_retry.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/tests/test_retry.py
@@ -0,0 +1,89 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import psycopg2
+import pytest
+
+from swh.storage import HashCollision
+from swh.storage.retry import (
+ RetryingProxyStorage, should_retry_adding, RETRY_EXCEPTIONS
+)
+
+
+def test_should_retry_adding():
+ """Specific exceptions should be elected for retrial
+
+ """
+ for exc in RETRY_EXCEPTIONS:
+ assert should_retry_adding(exc('error')) is True
+
+
+def test_should_retry_adding_no_retry():
+ """Unspecific exceptions should raise as usual
+
+ """
+ for exc in [ValueError, Exception]:
+ assert should_retry_adding(exc('fail!')) is False
+
+
+def test_retrying_proxy_storage_content(sample_data):
+ """Standard content_add works as before
+
+ """
+ sample_content = sample_data['content'][0]
+ storage = RetryingProxyStorage(storage={'cls': 'memory'})
+
+ content = next(storage.content_get([sample_content['sha1']]))
+ assert not content
+
+ s = storage.content_add([sample_content])
+ assert s == {
+ 'content:add': 1,
+ 'content:add:bytes': sample_content['length'],
+ 'skipped_content:add': 0
+ }
+
+
+def test_retrying_proxy_storage_with_retry(sample_data, mocker):
+ """Multiple retries for hash collision and psycopg2 error but finally ok
+
+ """
+ mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add')
+ mock_memory.side_effect = [
+ # first try goes ko
+ HashCollision('content hash collision'),
+ # second try goes ko
+ psycopg2.IntegrityError('content already inserted'),
+ # ok then!
+ {'content:add': 1}
+ ]
+
+ sample_content = sample_data['content'][0]
+ storage = RetryingProxyStorage(storage={'cls': 'memory'})
+
+ content = next(storage.content_get([sample_content['sha1']]))
+ assert not content
+
+ s = storage.content_add([sample_content])
+ assert s == {
+ 'content:add': 1,
+ }
+
+
+def test_retrying_proxy_storage_failure_to_add(sample_data, mocker):
+ """Other errors are raising as usual
+
+ """
+ mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add')
+ mock_memory.side_effect = ValueError('Refuse to add content always!')
+
+ sample_content = sample_data['content'][0]
+ storage = RetryingProxyStorage(storage={'cls': 'memory'})
+
+ content = next(storage.content_get([sample_content['sha1']]))
+ assert not content
+
+ with pytest.raises(ValueError, match='Refuse to add'):
+ storage.content_add([sample_content])
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, Dec 17, 4:00 PM (2 d, 20 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3221962
Attached To
D2511: storage: Add proxy storage with retry policy
Event Timeline
Log In to Comment