Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/utils.py
Show First 20 Lines • Show All 49 Lines • ▼ Show 20 Lines | def is_retryable_exception(e: Exception) -> bool: | ||||
Checks if an exception is worth retrying (connection, throttling or a server error). | Checks if an exception is worth retrying (connection, throttling or a server error). | ||||
""" | """ | ||||
is_connection_error = isinstance(e, ConnectionError) | is_connection_error = isinstance(e, ConnectionError) | ||||
is_500_error = isinstance(e, HTTPError) and e.response.status_code >= 500 | is_500_error = isinstance(e, HTTPError) and e.response.status_code >= 500 | ||||
return is_connection_error or is_throttling_exception(e) or is_500_error | return is_connection_error or is_throttling_exception(e) or is_500_error | ||||
def retry_attempt(retry_state): | |||||
""" | |||||
Utility function to get last retry attempt info based on the | |||||
tenacity version (as debian buster packages version 4.12). | |||||
""" | |||||
try: | |||||
attempt = retry_state.outcome | |||||
except AttributeError: | |||||
# tenacity < 5.0 | |||||
attempt = retry_state | |||||
return attempt | |||||
def retry_if_exception(retry_state, predicate: Callable[[Exception], bool]) -> bool: | def retry_if_exception(retry_state, predicate: Callable[[Exception], bool]) -> bool: | ||||
""" | """ | ||||
Custom tenacity retry predicate for handling exceptions with the given predicate. | Custom tenacity retry predicate for handling exceptions with the given predicate. | ||||
""" | """ | ||||
attempt = retry_attempt(retry_state) | attempt = retry_state.outcome | ||||
if attempt.failed: | if attempt.failed: | ||||
exception = attempt.exception() | exception = attempt.exception() | ||||
return predicate(exception) | return predicate(exception) | ||||
return False | return False | ||||
def retry_if_throttling(retry_state) -> bool: | def retry_if_throttling(retry_state) -> bool: | ||||
""" | """ | ||||
▲ Show 20 Lines • Show All 51 Lines • Show Last 20 Lines |