diff --git a/mypy.ini b/mypy.ini
index 9da07308..43606ce6 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -1,33 +1,36 @@
 [mypy]
 namespace_packages = True
 
 # due to the conditional import logic on swh.journal, in some cases a specific
 # type: ignore is needed, in other it isn't...
 warn_unused_ignores = False
 
 # support for sqlalchemy magic: see https://github.com/dropbox/sqlalchemy-stubs
 plugins = sqlmypy
 
 
 # 3rd party libraries without stubs (yet)
 
 # only shipped indirectly via hypothesis
 [mypy-django.*]
 ignore_missing_imports = True
 
 [mypy-pkg_resources.*]
 ignore_missing_imports = True
 
 [mypy-psycopg2.*]
 ignore_missing_imports = True
 
 [mypy-pytest.*]
 ignore_missing_imports = True
 
+[mypy-retrying.*]
+ignore_missing_imports = True
+
 # temporary work-around for landing typing support in spite of the current
 # journal<->storage dependency loop
 [mypy-swh.journal.*]
 ignore_missing_imports = True
 
 [mypy-pytest_postgresql.*]
 ignore_missing_imports = True
diff --git a/requirements-test.txt b/requirements-test.txt
index 6c90bf5f..a61e13d0 100644
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,9 +1,10 @@
 hypothesis >= 3.11.0
 pytest
+pytest-mock
 pytest-postgresql >= 2.1.0
 sqlalchemy-stubs
 # pytz is in fact a dep of swh.model[testing] and should not be necessary, but
 # the dep on swh.model in the main requirements-swh.txt file shadows this one
 # adding the [testing] extra.
 swh.model[testing] >= 0.0.50
 pytz
diff --git a/requirements.txt b/requirements.txt
index 2cbc284d..96791c82 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,6 +1,7 @@
 click
 flask
 psycopg2
 python-dateutil
 vcversioner
 aiohttp
+retrying
diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py
index 189e99cd..9741394a 100644
--- a/swh/storage/__init__.py
+++ b/swh/storage/__init__.py
@@ -1,94 +1,97 @@
-# Copyright (C) 2015-2016  The Software Heritage developers
+# Copyright (C) 2015-2020  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 import warnings
 
 from . import storage
 
 Storage = storage.Storage
 
 
 class HashCollision(Exception):
     pass
 
 
 STORAGE_IMPLEMENTATION = {
-    'pipeline', 'local', 'remote', 'memory', 'filter', 'buffer'}
+    'pipeline', 'local', 'remote', 'memory', 'filter', 'buffer', 'retry',
+}
 
 
 def get_storage(cls, **kwargs):
     """Get a storage object of class `storage_class` with arguments
     `storage_args`.
 
     Args:
         storage (dict): dictionary with keys:
         - cls (str): storage's class, either local, remote, memory, filter,
             buffer
         - args (dict): dictionary with keys
 
     Returns:
         an instance of swh.storage.Storage or compatible class
 
     Raises:
         ValueError if passed an unknown storage class.
 
     """
     if cls not in STORAGE_IMPLEMENTATION:
         raise ValueError('Unknown storage class `%s`. Supported: %s' % (
             cls, ', '.join(STORAGE_IMPLEMENTATION)))
 
     if 'args' in kwargs:
         warnings.warn(
             'Explicit "args" key is deprecated, use keys directly instead.',
             DeprecationWarning)
         kwargs = kwargs['args']
 
     if cls == 'pipeline':
         return get_storage_pipeline(**kwargs)
 
     if cls == 'remote':
         from .api.client import RemoteStorage as Storage
     elif cls == 'local':
         from .storage import Storage
     elif cls == 'memory':
         from .in_memory import Storage
     elif cls == 'filter':
         from .filter import FilteringProxyStorage as Storage
     elif cls == 'buffer':
         from .buffer import BufferingProxyStorage as Storage
+    elif cls == 'retry':
+        from .retry import RetryingProxyStorage as Storage
 
     return Storage(**kwargs)
 
 
 def get_storage_pipeline(steps):
     """Recursively get a storage object that may use other storage objects
     as backends.
 
     Args:
         steps (List[dict]): List of dicts that may be used as kwargs for
             `get_storage`.
 
     Returns:
         an instance of swh.storage.Storage or compatible class
 
     Raises:
         ValueError if passed an unknown storage class.
     """
     storage_config = None
     for step in reversed(steps):
         if 'args' in step:
             warnings.warn(
                 'Explicit "args" key is deprecated, use keys directly '
                 'instead.',
                 DeprecationWarning)
             step = {
                 'cls': step['cls'],
                 **step['args'],
             }
         if storage_config:
             step['storage'] = storage_config
         storage_config = step
 
     return get_storage(**storage_config)
diff --git a/swh/storage/retry.py b/swh/storage/retry.py
new file mode 100644
index 00000000..756ec927
--- /dev/null
+++ b/swh/storage/retry.py
@@ -0,0 +1,60 @@
+# Copyright (C) 2019-2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import logging
+import psycopg2
+import traceback
+
+from typing import Dict, List
+
+from retrying import retry
+
+from swh.storage import get_storage, HashCollision
+
+
+logger = logging.getLogger(__name__)
+
+
+RETRY_EXCEPTIONS = [
+    # raised when two parallel insertions insert the same data
+    psycopg2.IntegrityError,
+    HashCollision,
+]
+
+
+def should_retry_adding(error: Exception) -> bool:
+    """Retry policy when some kind of failures occur (database integrity error,
+       hash collision, etc...)
+
+    """
+    retry = any(isinstance(error, exc) for exc in RETRY_EXCEPTIONS)
+    if retry:
+        error_name = error.__module__ + '.' + error.__class__.__name__
+        logger.warning('Retry adding a batch', exc_info=False, extra={
+            'swh_type': 'storage_retry',
+            'swh_exception_type': error_name,
+            'swh_exception': traceback.format_exception(
+                error.__class__,
+                error,
+                error.__traceback__,
+            ),
+        })
+    return retry
+
+
+class RetryingProxyStorage:
+    """Storage implementation which retries adding objects when it specifically
+       fails (hash collision, integrity error).
+
+    """
+    def __init__(self, storage):
+        self.storage = get_storage(**storage)
+
+    def __getattr__(self, key):
+        return getattr(self.storage, key)
+
+    @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3)
+    def content_add(self, content: List[Dict]) -> Dict:
+        return self.storage.content_add(content)
diff --git a/swh/storage/tests/test_init.py b/swh/storage/tests/test_init.py
index 2c1d493b..e95d4ea9 100644
--- a/swh/storage/tests/test_init.py
+++ b/swh/storage/tests/test_init.py
@@ -1,133 +1,137 @@
 # Copyright (C) 2019 The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 import pytest
 
 from unittest.mock import patch
 
 from swh.storage import get_storage
 
 from swh.storage.api.client import RemoteStorage
 from swh.storage.storage import Storage as DbStorage
 from swh.storage.in_memory import Storage as MemoryStorage
 from swh.storage.buffer import BufferingProxyStorage
 from swh.storage.filter import FilteringProxyStorage
+from swh.storage.retry import RetryingProxyStorage
 
 
 @patch('swh.storage.storage.psycopg2.pool')
 def test_get_storage(mock_pool):
     """Instantiating an existing storage should be ok
 
     """
     mock_pool.ThreadedConnectionPool.return_value = None
     for cls, real_class, dummy_args in [
             ('remote', RemoteStorage, {'url': 'url'}),
             ('memory', MemoryStorage, {}),
             ('local', DbStorage, {
                 'db': 'postgresql://db', 'objstorage': {
                     'cls': 'memory', 'args': {},
                 },
             }),
             ('filter', FilteringProxyStorage, {'storage': {
                 'cls': 'memory'}
             }),
             ('buffer', BufferingProxyStorage, {'storage': {
                 'cls': 'memory'}
             }),
+            ('retry', RetryingProxyStorage, {'storage': {
+                'cls': 'memory'}
+            }),
     ]:
         actual_storage = get_storage(cls, **dummy_args)
         assert actual_storage is not None
         assert isinstance(actual_storage, real_class)
 
 
 @patch('swh.storage.storage.psycopg2.pool')
 def test_get_storage_legacy_args(mock_pool):
     """Instantiating an existing storage should be ok even with the legacy
     explicit 'args' keys
 
     """
     mock_pool.ThreadedConnectionPool.return_value = None
     for cls, real_class, dummy_args in [
             ('remote', RemoteStorage, {'url': 'url'}),
             ('memory', MemoryStorage, {}),
             ('local', DbStorage, {
                 'db': 'postgresql://db', 'objstorage': {
                     'cls': 'memory', 'args': {},
                 },
             }),
             ('filter', FilteringProxyStorage, {'storage': {
                 'cls': 'memory', 'args': {}}
             }),
             ('buffer', BufferingProxyStorage, {'storage': {
                 'cls': 'memory', 'args': {}}
             }),
     ]:
         with pytest.warns(DeprecationWarning):
             actual_storage = get_storage(cls, args=dummy_args)
         assert actual_storage is not None
         assert isinstance(actual_storage, real_class)
 
 
 def test_get_storage_failure():
     """Instantiating an unknown storage should raise
 
     """
     with pytest.raises(ValueError, match='Unknown storage class `unknown`'):
         get_storage('unknown', args=[])
 
 
 def test_get_storage_pipeline():
     config = {
         'cls': 'pipeline',
         'steps': [
             {
                 'cls': 'filter',
             },
             {
                 'cls': 'buffer',
                 'min_batch_size': {
                     'content': 10,
                 },
             },
             {
                 'cls': 'memory',
             }
         ]
     }
 
     storage = get_storage(**config)
 
     assert isinstance(storage, FilteringProxyStorage)
     assert isinstance(storage.storage, BufferingProxyStorage)
     assert isinstance(storage.storage.storage, MemoryStorage)
 
 
 def test_get_storage_pipeline_legacy_args():
     config = {
         'cls': 'pipeline',
         'steps': [
             {
                 'cls': 'filter',
             },
             {
                 'cls': 'buffer',
                 'args': {
                     'min_batch_size': {
                         'content': 10,
                     },
                 }
             },
             {
                 'cls': 'memory',
             }
         ]
     }
 
     with pytest.warns(DeprecationWarning):
         storage = get_storage(**config)
 
     assert isinstance(storage, FilteringProxyStorage)
     assert isinstance(storage.storage, BufferingProxyStorage)
     assert isinstance(storage.storage.storage, MemoryStorage)
diff --git a/swh/storage/tests/test_retry.py b/swh/storage/tests/test_retry.py
new file mode 100644
index 00000000..70af6356
--- /dev/null
+++ b/swh/storage/tests/test_retry.py
@@ -0,0 +1,89 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import psycopg2
+import pytest
+
+from swh.storage import HashCollision
+from swh.storage.retry import (
+    RetryingProxyStorage, should_retry_adding, RETRY_EXCEPTIONS
+)
+
+
+def test_should_retry_adding():
+    """Specific exceptions should be elected for retrial
+
+    """
+    for exc in RETRY_EXCEPTIONS:
+        assert should_retry_adding(exc('error')) is True
+
+
+def test_should_retry_adding_no_retry():
+    """Unspecific exceptions should raise as usual
+
+    """
+    for exc in [ValueError, Exception]:
+        assert should_retry_adding(exc('fail!')) is False
+
+
+def test_retrying_proxy_storage_content(sample_data):
+    """Standard content_add works as before
+
+    """
+    sample_content = sample_data['content'][0]
+    storage = RetryingProxyStorage(storage={'cls': 'memory'})
+
+    content = next(storage.content_get([sample_content['sha1']]))
+    assert not content
+
+    s = storage.content_add([sample_content])
+    assert s == {
+        'content:add': 1,
+        'content:add:bytes': sample_content['length'],
+        'skipped_content:add': 0
+    }
+
+
+def test_retrying_proxy_storage_with_retry(sample_data, mocker):
+    """Multiple retries for hash collision and psycopg2 error but finally ok
+
+    """
+    mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add')
+    mock_memory.side_effect = [
+        # first try goes ko
+        HashCollision('content hash collision'),
+        # second try goes ko
+        psycopg2.IntegrityError('content already inserted'),
+        # ok then!
+        {'content:add': 1}
+    ]
+
+    sample_content = sample_data['content'][0]
+    storage = RetryingProxyStorage(storage={'cls': 'memory'})
+
+    content = next(storage.content_get([sample_content['sha1']]))
+    assert not content
+
+    s = storage.content_add([sample_content])
+    assert s == {
+        'content:add': 1,
+    }
+
+
+def test_retrying_proxy_storage_failure_to_add(sample_data, mocker):
+    """Other errors are raising as usual
+
+    """
+    mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add')
+    mock_memory.side_effect = ValueError('Refuse to add content always!')
+
+    sample_content = sample_data['content'][0]
+    storage = RetryingProxyStorage(storage={'cls': 'memory'})
+
+    content = next(storage.content_get([sample_content['sha1']]))
+    assert not content
+
+    with pytest.raises(ValueError, match='Refuse to add'):
+        storage.content_add([sample_content])