Page MenuHomeSoftware Heritage

D2511.id8923.diff
No OneTemporary

D2511.id8923.diff

diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -24,6 +24,9 @@
[mypy-pytest.*]
ignore_missing_imports = True
+[mypy-retrying.*]
+ignore_missing_imports = True
+
# temporary work-around for landing typing support in spite of the current
# journal<->storage dependency loop
[mypy-swh.journal.*]
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,5 +1,6 @@
hypothesis >= 3.11.0
pytest
+pytest-mock
pytest-postgresql >= 2.1.0
sqlalchemy-stubs
# pytz is in fact a dep of swh.model[testing] and should not be necessary, but
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -4,3 +4,4 @@
python-dateutil
vcversioner
aiohttp
+retrying
diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py
--- a/swh/storage/__init__.py
+++ b/swh/storage/__init__.py
@@ -15,7 +15,8 @@
STORAGE_IMPLEMENTATION = {
- 'pipeline', 'local', 'remote', 'memory', 'filter', 'buffer'}
+ 'pipeline', 'local', 'remote', 'memory', 'filter', 'buffer', 'retry',
+}
def get_storage(cls, **kwargs):
@@ -58,6 +59,8 @@
from .filter import FilteringProxyStorage as Storage
elif cls == 'buffer':
from .buffer import BufferingProxyStorage as Storage
+ elif cls == 'retry':
+ from .retrying import RetryingProxyStorage as Storage
return Storage(**kwargs)
diff --git a/swh/storage/retrying.py b/swh/storage/retrying.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/retrying.py
@@ -0,0 +1,61 @@
+# Copyright (C) 2019-2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import logging
+import psycopg2
+import traceback
+
+from typing import Dict, List
+
+from retrying import retry
+
+from swh.storage import get_storage, HashCollision
+
+
+logger = logging.getLogger(__name__)
+
+
+def retry_loading(error):
+ """Retry policy when some kind of failures occur (database integrity error,
+ hash collision, etc...)
+
+ """
+ exception_classes = [
+ # raised when two parallel insertions insert the same data.
+ psycopg2.IntegrityError,
+ HashCollision,
+ ]
+
+ if not any(isinstance(error, exc) for exc in exception_classes):
+ return False
+
+ error_name = error.__module__ + '.' + error.__class__.__name__
+ logger.warning('Retry loading a batch', exc_info=False, extra={
+ 'swh_type': 'storage_retry',
+ 'swh_exception_type': error_name,
+ 'swh_exception': traceback.format_exception(
+ error.__class__,
+ error,
+ error.__traceback__,
+ ),
+ })
+
+ return True
+
+
+class RetryingProxyStorage:
+ """Retrying proxy storage. For the storage's `_add` methods, retry to add
+ data when failing.
+
+ """
+ def __init__(self, storage):
+ self.storage = get_storage(**storage)
+
+ def __getattr__(self, key):
+ return getattr(self.storage, key)
+
+ @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3)
+ def content_add(self, content: List[Dict]) -> Dict:
+ return self.storage.content_add(content)
diff --git a/swh/storage/tests/test_init.py b/swh/storage/tests/test_init.py
--- a/swh/storage/tests/test_init.py
+++ b/swh/storage/tests/test_init.py
@@ -14,6 +14,7 @@
from swh.storage.in_memory import Storage as MemoryStorage
from swh.storage.buffer import BufferingProxyStorage
from swh.storage.filter import FilteringProxyStorage
+from swh.storage.retrying import RetryingProxyStorage
@patch('swh.storage.storage.psycopg2.pool')
@@ -36,6 +37,9 @@
('buffer', BufferingProxyStorage, {'storage': {
'cls': 'memory'}
}),
+ ('retry', RetryingProxyStorage, {'storage': {
+ 'cls': 'memory'}
+ }),
]:
actual_storage = get_storage(cls, **dummy_args)
assert actual_storage is not None
diff --git a/swh/storage/tests/test_retrying.py b/swh/storage/tests/test_retrying.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/tests/test_retrying.py
@@ -0,0 +1,86 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import psycopg2
+import pytest
+
+from swh.storage import HashCollision
+from swh.storage.retrying import RetryingProxyStorage
+
+
+def test_retrying_proxy_storage_content(sample_data):
+ """Standard content_add works as before
+
+ """
+ sample_content = sample_data['content'][0]
+ storage = RetryingProxyStorage(storage={'cls': 'memory'})
+
+ content = next(storage.content_get([sample_content['sha1']]))
+ assert not content
+
+ s = storage.content_add([sample_content])
+ assert s == {
+ 'content:add': 1,
+ 'content:add:bytes': sample_content['length'],
+ 'skipped_content:add': 0
+ }
+
+
+count_local_content_add_call = 0
+
+
+def _count_content_add(content):
+ """Mimic content_add with failures
+
+ """
+ global count_local_content_add_call
+ count_local_content_add_call += 1
+ error = {
+ # first try
+ 1: HashCollision('Refuse to add content'),
+ # second try
+ 2: psycopg2.IntegrityError('Refuse differently to add content'),
+ }
+ exception = error.get(count_local_content_add_call)
+ if exception: # we raise exceptions
+ raise exception
+ # at last, we accept to add content
+ return {'content:add': 1}
+
+
+def test_retrying_proxy_storage_with_retry(sample_data, mocker):
+ """Multiple retries for hash collision and psycopg2 error
+
+ """
+ mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add')
+ mock_memory.side_effect = _count_content_add
+
+ sample_content = sample_data['content'][0]
+ storage = RetryingProxyStorage(storage={'cls': 'memory'})
+
+ content = next(storage.content_get([sample_content['sha1']]))
+ assert not content
+
+ s = storage.content_add([sample_content])
+ assert s == {
+ 'content:add': 1,
+ }
+
+
+def test_retrying_proxy_storage_failure_to_add(sample_data, mocker):
+ """Other errors are raising as usual
+
+ """
+ mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add')
+ mock_memory.side_effect = ValueError('Refuse to add')
+
+ sample_content = sample_data['content'][0]
+ storage = RetryingProxyStorage(storage={'cls': 'memory'})
+
+ content = next(storage.content_get([sample_content['sha1']]))
+ assert not content
+
+ with pytest.raises(ValueError, match='Refuse to add'):
+ storage.content_add([sample_content])

File Metadata

Mime Type
text/plain
Expires
Tue, Dec 17, 10:54 AM (1 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3232900

Event Timeline