Page MenuHomeSoftware Heritage

D2511.id8929.diff
No OneTemporary

D2511.id8929.diff

diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -24,6 +24,9 @@
[mypy-pytest.*]
ignore_missing_imports = True
+[mypy-retrying.*]
+ignore_missing_imports = True
+
# temporary work-around for landing typing support in spite of the current
# journal<->storage dependency loop
[mypy-swh.journal.*]
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,5 +1,6 @@
hypothesis >= 3.11.0
pytest
+pytest-mock
pytest-postgresql >= 2.1.0
sqlalchemy-stubs
# pytz is in fact a dep of swh.model[testing] and should not be necessary, but
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -4,3 +4,4 @@
python-dateutil
vcversioner
aiohttp
+retrying
diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py
--- a/swh/storage/__init__.py
+++ b/swh/storage/__init__.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2015-2016 The Software Heritage developers
+# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -15,7 +15,8 @@
STORAGE_IMPLEMENTATION = {
- 'pipeline', 'local', 'remote', 'memory', 'filter', 'buffer'}
+ 'pipeline', 'local', 'remote', 'memory', 'filter', 'buffer', 'retry',
+}
def get_storage(cls, **kwargs):
@@ -58,6 +59,8 @@
from .filter import FilteringProxyStorage as Storage
elif cls == 'buffer':
from .buffer import BufferingProxyStorage as Storage
+ elif cls == 'retry':
+ from .retry import RetryingProxyStorage as Storage
return Storage(**kwargs)
diff --git a/swh/storage/retry.py b/swh/storage/retry.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/retry.py
@@ -0,0 +1,60 @@
+# Copyright (C) 2019-2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import logging
+import psycopg2
+import traceback
+
+from typing import Dict, List
+
+from retrying import retry
+
+from swh.storage import get_storage, HashCollision
+
+
+logger = logging.getLogger(__name__)
+
+
+RETRY_EXCEPTIONS = [
+ # raised when two parallel insertions insert the same data
+ psycopg2.IntegrityError,
+ HashCollision,
+]
+
+
+def should_retry_adding(error: Exception) -> bool:
+ """Retry policy when some kind of failures occur (database integrity error,
+ hash collision, etc...)
+
+ """
+ retry = any(isinstance(error, exc) for exc in RETRY_EXCEPTIONS)
+ if retry:
+ error_name = error.__module__ + '.' + error.__class__.__name__
+ logger.warning('Retry adding a batch', exc_info=False, extra={
+ 'swh_type': 'storage_retry',
+ 'swh_exception_type': error_name,
+ 'swh_exception': traceback.format_exception(
+ error.__class__,
+ error,
+ error.__traceback__,
+ ),
+ })
+ return retry
+
+
+class RetryingProxyStorage:
+ """Storage implementation which retries adding objects when it specifically
+ fails (hash collision, integrity error).
+
+ """
+ def __init__(self, storage):
+ self.storage = get_storage(**storage)
+
+ def __getattr__(self, key):
+ return getattr(self.storage, key)
+
+ @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3)
+ def content_add(self, content: List[Dict]) -> Dict:
+ return self.storage.content_add(content)
diff --git a/swh/storage/tests/test_init.py b/swh/storage/tests/test_init.py
--- a/swh/storage/tests/test_init.py
+++ b/swh/storage/tests/test_init.py
@@ -14,6 +14,7 @@
from swh.storage.in_memory import Storage as MemoryStorage
from swh.storage.buffer import BufferingProxyStorage
from swh.storage.filter import FilteringProxyStorage
+from swh.storage.retry import RetryingProxyStorage
@patch('swh.storage.storage.psycopg2.pool')
@@ -36,6 +37,9 @@
('buffer', BufferingProxyStorage, {'storage': {
'cls': 'memory'}
}),
+ ('retry', RetryingProxyStorage, {'storage': {
+ 'cls': 'memory'}
+ }),
]:
actual_storage = get_storage(cls, **dummy_args)
assert actual_storage is not None
diff --git a/swh/storage/tests/test_retry.py b/swh/storage/tests/test_retry.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/tests/test_retry.py
@@ -0,0 +1,89 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import psycopg2
+import pytest
+
+from swh.storage import HashCollision
+from swh.storage.retry import (
+ RetryingProxyStorage, should_retry_adding, RETRY_EXCEPTIONS
+)
+
+
+def test_should_retry_adding():
+ """Specific exceptions should be elected for retrial
+
+ """
+ for exc in RETRY_EXCEPTIONS:
+ assert should_retry_adding(exc('error')) is True
+
+
+def test_should_retry_adding_no_retry():
+ """Unspecific exceptions should raise as usual
+
+ """
+ for exc in [ValueError, Exception]:
+ assert should_retry_adding(exc('fail!')) is False
+
+
+def test_retrying_proxy_storage_content(sample_data):
+ """Standard content_add works as before
+
+ """
+ sample_content = sample_data['content'][0]
+ storage = RetryingProxyStorage(storage={'cls': 'memory'})
+
+ content = next(storage.content_get([sample_content['sha1']]))
+ assert not content
+
+ s = storage.content_add([sample_content])
+ assert s == {
+ 'content:add': 1,
+ 'content:add:bytes': sample_content['length'],
+ 'skipped_content:add': 0
+ }
+
+
+def test_retrying_proxy_storage_with_retry(sample_data, mocker):
+ """Multiple retries for hash collision and psycopg2 error but finally ok
+
+ """
+ mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add')
+ mock_memory.side_effect = [
+ # first try goes ko
+ HashCollision('content hash collision'),
+ # second try goes ko
+ psycopg2.IntegrityError('content already inserted'),
+ # ok then!
+ {'content:add': 1}
+ ]
+
+ sample_content = sample_data['content'][0]
+ storage = RetryingProxyStorage(storage={'cls': 'memory'})
+
+ content = next(storage.content_get([sample_content['sha1']]))
+ assert not content
+
+ s = storage.content_add([sample_content])
+ assert s == {
+ 'content:add': 1,
+ }
+
+
+def test_retrying_proxy_storage_failure_to_add(sample_data, mocker):
+ """Other errors are raising as usual
+
+ """
+ mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add')
+ mock_memory.side_effect = ValueError('Refuse to add content always!')
+
+ sample_content = sample_data['content'][0]
+ storage = RetryingProxyStorage(storage={'cls': 'memory'})
+
+ content = next(storage.content_get([sample_content['sha1']]))
+ assert not content
+
+ with pytest.raises(ValueError, match='Refuse to add'):
+ storage.content_add([sample_content])

File Metadata

Mime Type
text/plain
Expires
Tue, Dec 17, 4:00 PM (3 d, 2 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3221962

Event Timeline