Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/backend.py
# Copyright (C) 2017-2022 The Software Heritage developers | # Copyright (C) 2017-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import collections | import collections | ||||
from email.mime.text import MIMEText | from email.mime.text import MIMEText | ||||
import logging | |||||
import smtplib | import smtplib | ||||
from typing import Any, Dict, List, Optional, Tuple | from typing import Any, Dict, List, Optional, Tuple | ||||
import psycopg2.extras | import psycopg2.extras | ||||
import psycopg2.pool | import psycopg2.pool | ||||
import sentry_sdk | |||||
from swh.core.db import BaseDb | from swh.core.db import BaseDb | ||||
from swh.core.db.common import db_transaction | from swh.core.db.common import db_transaction | ||||
from swh.model.swhids import CoreSWHID | from swh.model.swhids import CoreSWHID | ||||
from swh.scheduler import get_scheduler | from swh.scheduler import get_scheduler | ||||
from swh.scheduler.utils import create_oneshot_task_dict | from swh.scheduler.utils import create_oneshot_task_dict | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.vault.cache import VaultCache | from swh.vault.cache import VaultCache | ||||
from swh.vault.cookers import COOKER_TYPES, get_cooker_cls | from swh.vault.cookers import COOKER_TYPES, get_cooker_cls | ||||
from swh.vault.exc import NotFoundExc | from swh.vault.exc import NotFoundExc | ||||
logger = logging.getLogger(__name__) | |||||
cooking_task_name = "swh.vault.cooking_tasks.SWHCookingTask" | cooking_task_name = "swh.vault.cooking_tasks.SWHCookingTask" | ||||
NOTIF_EMAIL_FROM = '"Software Heritage Vault" ' "<bot@softwareheritage.org>" | NOTIF_EMAIL_FROM = '"Software Heritage Vault" ' "<bot@softwareheritage.org>" | ||||
NOTIF_EMAIL_SUBJECT_SUCCESS = "Bundle ready: {bundle_type} {short_id}" | NOTIF_EMAIL_SUBJECT_SUCCESS = "Bundle ready: {bundle_type} {short_id}" | ||||
NOTIF_EMAIL_SUBJECT_FAILURE = "Bundle failed: {bundle_type} {short_id}" | NOTIF_EMAIL_SUBJECT_FAILURE = "Bundle failed: {bundle_type} {short_id}" | ||||
NOTIF_EMAIL_BODY_SUCCESS = """ | NOTIF_EMAIL_BODY_SUCCESS = """ | ||||
You have requested the following bundle from the Software Heritage | You have requested the following bundle from the Software Heritage | ||||
Show All 38 Lines | class VaultBackend: | ||||
current_version = 4 | current_version = 4 | ||||
def __init__(self, **config): | def __init__(self, **config): | ||||
self.config = config | self.config = config | ||||
self.cache = VaultCache(**config["cache"]) | self.cache = VaultCache(**config["cache"]) | ||||
self.scheduler = get_scheduler(**config["scheduler"]) | self.scheduler = get_scheduler(**config["scheduler"]) | ||||
self.storage = get_storage(**config["storage"]) | self.storage = get_storage(**config["storage"]) | ||||
self.smtp_server = smtplib.SMTP(**config.get("smtp", {})) | |||||
if "db" not in self.config: | if "db" not in self.config: | ||||
raise ValueError( | raise ValueError( | ||||
"The 'db' configuration entry is missing " | "The 'db' configuration entry is missing " | ||||
"in the vault configuration file" | "in the vault configuration file" | ||||
) | ) | ||||
db_conn = config["db"] | db_conn = config["db"] | ||||
self._pool = psycopg2.pool.ThreadedConnectionPool( | self._pool = psycopg2.pool.ThreadedConnectionPool( | ||||
▲ Show 20 Lines • Show All 395 Lines • ▼ Show 20 Lines | ) -> None: | ||||
cur.execute( | cur.execute( | ||||
""" | """ | ||||
DELETE FROM vault_notif_email | DELETE FROM vault_notif_email | ||||
WHERE id = %s""", | WHERE id = %s""", | ||||
(n_id,), | (n_id,), | ||||
) | ) | ||||
def _smtp_send(self, msg: MIMEText): | def _smtp_send(self, msg: MIMEText): | ||||
# Reconnect if needed | smtp_server = smtplib.SMTP(**self.config.get("smtp", {})) | ||||
try: | try: | ||||
status = self.smtp_server.noop()[0] | status = smtp_server.noop()[0] | ||||
except smtplib.SMTPException: | except smtplib.SMTPException: | ||||
status = -1 | status = -1 | ||||
if status != 250: | if status != 250: | ||||
self.smtp_server.connect("localhost", 25) | error_message = ( | ||||
f"Unable to send SMTP message '{msg['Subject']}' to " | |||||
f"{msg['To']}: cannot connect to server" | |||||
) | |||||
logger.error(error_message) | |||||
vlorentz: We can keep the previous behavior of leaving connections open.
Why "this is doomed"? | |||||
Done Inline Actions
why would we want this? What are the pros of keeping it open?
because there is no reason to believe there is someone listening on localhost:25 (especially in elastic envs etc). douardda: > We can keep the previous behavior of leaving connections open.
why would we want this? What… | |||||
Not Done Inline Actions
less latency, but it's not a big deal
I'm fine with removing the fallback, I just didn't understand the comment. vlorentz: > why would we want this? What are the pros of keeping it open?
less latency, but it's not a… | |||||
sentry_sdk.capture_message(error_message, "error") | |||||
else: | |||||
try: | |||||
# Send the message | # Send the message | ||||
self.smtp_server.send_message(msg) | smtp_server.send_message(msg) | ||||
except smtplib.SMTPException as exc: | |||||
logger.exception(exc) | |||||
error_message = ( | |||||
f"Unable to send SMTP message '{msg['Subject']}' to " | |||||
f"{msg['To']}: {exc}" | |||||
) | |||||
sentry_sdk.capture_message(error_message, "error") | |||||
@db_transaction() | @db_transaction() | ||||
def _cache_expire(self, cond, *args, db=None, cur=None) -> None: | def _cache_expire(self, cond, *args, db=None, cur=None) -> None: | ||||
"""Low-level expiration method, used by cache_expire_* methods""" | """Low-level expiration method, used by cache_expire_* methods""" | ||||
# Embedded SELECT query to be able to use ORDER BY and LIMIT | # Embedded SELECT query to be able to use ORDER BY and LIMIT | ||||
cur.execute( | cur.execute( | ||||
""" | """ | ||||
DELETE FROM vault_bundle | DELETE FROM vault_bundle | ||||
Show All 29 Lines |
We can keep the previous behavior of leaving connections open.
Why "this is doomed"?