Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/retry.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import logging | import logging | ||||
import psycopg2 | |||||
import traceback | import traceback | ||||
from datetime import datetime | from datetime import datetime | ||||
from typing import Dict, Iterable, List, Optional, Union | from typing import Dict, Iterable, List, Optional, Union | ||||
from requests.exceptions import ConnectionError | |||||
from tenacity import ( | from tenacity import ( | ||||
retry, stop_after_attempt, wait_random_exponential, retry_if_exception_type | retry, stop_after_attempt, wait_random_exponential, | ||||
) | ) | ||||
from swh.storage import get_storage, HashCollision | from swh.storage import get_storage | ||||
from swh.storage.exc import StorageArgumentException | |||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
RETRY_EXCEPTIONS = [ | def should_retry_adding(retry_state) -> bool: | ||||
# raised when two parallel insertions insert the same data | """Retry if the error/exception is (probably) not about a caller error | ||||
psycopg2.IntegrityError, | |||||
HashCollision, | |||||
# when the server is restarting | |||||
ConnectionError, | |||||
] | |||||
def should_retry_adding(error: Exception) -> bool: | |||||
"""Retry if the error/exception if one of the RETRY_EXCEPTIONS type. | |||||
""" | """ | ||||
for exc in RETRY_EXCEPTIONS: | if retry_state.outcome.failed: | ||||
if retry_if_exception_type(exc)(error): | error = retry_state.outcome.exception() | ||||
if isinstance(error, StorageArgumentException): | |||||
# Exception is due to an invalid argument | |||||
return False | |||||
else: | |||||
# Other exception | |||||
module = getattr(error, '__module__', None) | |||||
if module: | |||||
error_name = error.__module__ + '.' + error.__class__.__name__ | error_name = error.__module__ + '.' + error.__class__.__name__ | ||||
else: | |||||
error_name = error.__class__.__name__ | |||||
logger.warning('Retry adding a batch', exc_info=False, extra={ | logger.warning('Retry adding a batch', exc_info=False, extra={ | ||||
'swh_type': 'storage_retry', | 'swh_type': 'storage_retry', | ||||
'swh_exception_type': error_name, | 'swh_exception_type': error_name, | ||||
'swh_exception': traceback.format_exc(), | 'swh_exception': traceback.format_exc(), | ||||
}) | }) | ||||
return True | return True | ||||
else: | |||||
# No exception | |||||
return False | return False | ||||
vlorentz: Note that I changed this function to retry all errors that aren't related to argument validation | |||||
swh_retry = retry(retry=should_retry_adding, | swh_retry = retry(retry=should_retry_adding, | ||||
wait=wait_random_exponential(multiplier=1, max=10), | wait=wait_random_exponential(multiplier=1, max=10), | ||||
stop=stop_after_attempt(3)) | stop=stop_after_attempt(3)) | ||||
class RetryingProxyStorage: | class RetryingProxyStorage: | ||||
"""Storage implementation which retries adding objects when it specifically | """Storage implementation which retries adding objects when it specifically | ||||
▲ Show 20 Lines • Show All 89 Lines • Show Last 20 Lines |
Note that I changed this function to retry all errors that aren't related to argument validation