Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/backend.py
# Copyright (C) 2017-2020 The Software Heritage developers | # Copyright (C) 2017-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import collections | import collections | ||||
from email.mime.text import MIMEText | from email.mime.text import MIMEText | ||||
import smtplib | import smtplib | ||||
from typing import Any, Dict, List, Optional, Tuple | from typing import Any, Dict, Iterable, List, Optional, Tuple | ||||
import psycopg2.extras | import psycopg2.extras | ||||
import psycopg2.pool | import psycopg2.pool | ||||
from tenacity import retry, stop_after_attempt, wait_random_exponential | |||||
from swh.core.db import BaseDb | from swh.core.db import BaseDb | ||||
from swh.core.db.common import db_transaction | from swh.core.db.common import db_transaction | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.scheduler import get_scheduler | from swh.scheduler import get_scheduler | ||||
from swh.scheduler.utils import create_oneshot_task_dict | from swh.scheduler.utils import create_oneshot_task_dict | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.vault.cache import VaultCache | from swh.vault.cache import VaultCache | ||||
Show All 38 Lines | |||||
We apologize for the inconvenience. | We apologize for the inconvenience. | ||||
--\x20 | --\x20 | ||||
The Software Heritage Developers | The Software Heritage Developers | ||||
""" | """ | ||||
def retry_io_methods(obj: Any, method_names: Iterable[str] = None): | |||||
def should_retry(retry_state) -> bool: | |||||
"""Retry if the exception is (probably) not about a caller error | |||||
""" | |||||
try: | |||||
attempt = retry_state.outcome | |||||
except AttributeError: | |||||
# tenacity < 5.0 | |||||
attempt = retry_state | |||||
if attempt.failed: | |||||
exc = attempt.exception() | |||||
if isinstance(exc, IOError): | |||||
return True | |||||
else: | |||||
return False | |||||
else: | |||||
# No exception | |||||
return False | |||||
retry( | |||||
retry=should_retry, | |||||
wait=wait_random_exponential(multiplier=1), | |||||
stop=stop_after_attempt(3), | |||||
) | |||||
if method_names is None: | |||||
# Defaults to all public members of object | |||||
method_names = [mname for mname in dir(obj) if not mname.startswith("_")] | |||||
for meth_name in method_names: | |||||
meth = getattr(obj, meth_name) | |||||
setattr(obj, meth_name, retry(meth)) | |||||
return obj | |||||
def batch_to_bytes(batch: List[Tuple[str, str]]) -> List[Tuple[str, bytes]]: | def batch_to_bytes(batch: List[Tuple[str, str]]) -> List[Tuple[str, bytes]]: | ||||
return [(obj_type, hashutil.hash_to_bytes(hex_id)) for obj_type, hex_id in batch] | return [(obj_type, hashutil.hash_to_bytes(hex_id)) for obj_type, hex_id in batch] | ||||
class VaultBackend: | class VaultBackend: | ||||
""" | """ | ||||
Backend for the Software Heritage vault. | Backend for the Software Heritage vault. | ||||
""" | """ | ||||
def __init__(self, db, **config): | def __init__(self, db, **config): | ||||
self.config = config | self.config = config | ||||
self.cache = VaultCache(**config["cache"]) | self.cache = VaultCache(**config["cache"]) | ||||
self.scheduler = get_scheduler(**config["scheduler"]) | self.scheduler = retry_io_methods(get_scheduler(**config["scheduler"])) | ||||
self.storage = get_storage(**config["storage"]) | self.storage = retry_io_methods(get_storage(**config["storage"])) | ||||
self.smtp_server = smtplib.SMTP() | self.smtp_server = smtplib.SMTP() | ||||
self._pool = psycopg2.pool.ThreadedConnectionPool( | self._pool = psycopg2.pool.ThreadedConnectionPool( | ||||
config.get("min_pool_conns", 1), | config.get("min_pool_conns", 1), | ||||
config.get("max_pool_conns", 10), | config.get("max_pool_conns", 10), | ||||
db, | db, | ||||
cursor_factory=psycopg2.extras.RealDictCursor, | cursor_factory=psycopg2.extras.RealDictCursor, | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 463 Lines • Show Last 20 Lines |