diff --git a/swh/vault/backend.py b/swh/vault/backend.py --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -6,10 +6,11 @@ import collections from email.mime.text import MIMEText import smtplib -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, Iterable, List, Optional, Tuple import psycopg2.extras import psycopg2.pool +from tenacity import retry, stop_after_attempt, wait_random_exponential from swh.core.db import BaseDb from swh.core.db.common import db_transaction @@ -64,6 +65,44 @@ """ +def retry_io_methods(obj: Any, method_names: Iterable[str] = None): + def should_retry(retry_state) -> bool: + """Retry if the exception is (probably) not about a caller error + + """ + try: + attempt = retry_state.outcome + except AttributeError: + # tenacity < 5.0 + attempt = retry_state + + if attempt.failed: + exc = attempt.exception() + if isinstance(exc, IOError): + return True + else: + return False + else: + # No exception + return False + + retry( + retry=should_retry, + wait=wait_random_exponential(multiplier=1), + stop=stop_after_attempt(3), + ) + + if method_names is None: + # Defaults to all public members of object + method_names = [mname for mname in dir(obj) if not mname.startswith("_")] + + for meth_name in method_names: + meth = getattr(obj, meth_name) + setattr(obj, meth_name, retry(meth)) + + return obj + + def batch_to_bytes(batch: List[Tuple[str, str]]) -> List[Tuple[str, bytes]]: return [(obj_type, hashutil.hash_to_bytes(hex_id)) for obj_type, hex_id in batch] @@ -76,8 +115,8 @@ def __init__(self, db, **config): self.config = config self.cache = VaultCache(**config["cache"]) - self.scheduler = get_scheduler(**config["scheduler"]) - self.storage = get_storage(**config["storage"]) + self.scheduler = retry_io_methods(get_scheduler(**config["scheduler"])) + self.storage = retry_io_methods(get_storage(**config["storage"])) self.smtp_server = smtplib.SMTP() self._pool = psycopg2.pool.ThreadedConnectionPool(