Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/backend.py
Show All 19 Lines | |||||
from swh.vault.cache import VaultCache | from swh.vault.cache import VaultCache | ||||
from swh.vault.cookers import COOKER_TYPES, get_cooker_cls | from swh.vault.cookers import COOKER_TYPES, get_cooker_cls | ||||
from swh.vault.exc import NotFoundExc | from swh.vault.exc import NotFoundExc | ||||
from swh.vault.interface import ObjectId | from swh.vault.interface import ObjectId | ||||
cooking_task_name = "swh.vault.cooking_tasks.SWHCookingTask" | cooking_task_name = "swh.vault.cooking_tasks.SWHCookingTask" | ||||
NOTIF_EMAIL_FROM = '"Software Heritage Vault" ' "<bot@softwareheritage.org>" | NOTIF_EMAIL_FROM = '"Software Heritage Vault" ' "<bot@softwareheritage.org>" | ||||
NOTIF_EMAIL_SUBJECT_SUCCESS = "Bundle ready: {obj_type} {short_id}" | NOTIF_EMAIL_SUBJECT_SUCCESS = "Bundle ready: {bundle_type} {short_id}" | ||||
NOTIF_EMAIL_SUBJECT_FAILURE = "Bundle failed: {obj_type} {short_id}" | NOTIF_EMAIL_SUBJECT_FAILURE = "Bundle failed: {bundle_type} {short_id}" | ||||
NOTIF_EMAIL_BODY_SUCCESS = """ | NOTIF_EMAIL_BODY_SUCCESS = """ | ||||
You have requested the following bundle from the Software Heritage | You have requested the following bundle from the Software Heritage | ||||
Vault: | Vault: | ||||
Object Type: {obj_type} | Bundle Type: {bundle_type} | ||||
Object ID: {hex_id} | Object ID: {hex_id} | ||||
This bundle is now available for download at the following address: | This bundle is now available for download at the following address: | ||||
{url} | {url} | ||||
Please keep in mind that this link might expire at some point, in which | Please keep in mind that this link might expire at some point, in which | ||||
case you will need to request the bundle again. | case you will need to request the bundle again. | ||||
--\x20 | --\x20 | ||||
The Software Heritage Developers | The Software Heritage Developers | ||||
""" | """ | ||||
NOTIF_EMAIL_BODY_FAILURE = """ | NOTIF_EMAIL_BODY_FAILURE = """ | ||||
You have requested the following bundle from the Software Heritage | You have requested the following bundle from the Software Heritage | ||||
Vault: | Vault: | ||||
Object Type: {obj_type} | Bundle Type: {bundle_type} | ||||
Object ID: {hex_id} | Object ID: {hex_id} | ||||
This bundle could not be cooked for the following reason: | This bundle could not be cooked for the following reason: | ||||
{progress_msg} | {progress_msg} | ||||
We apologize for the inconvenience. | We apologize for the inconvenience. | ||||
--\x20 | --\x20 | ||||
The Software Heritage Developers | The Software Heritage Developers | ||||
""" | """ | ||||
def batch_to_bytes(batch: List[Tuple[str, str]]) -> List[Tuple[str, bytes]]: | def batch_to_bytes(batch: List[Tuple[str, str]]) -> List[Tuple[str, bytes]]: | ||||
return [(obj_type, hashutil.hash_to_bytes(hex_id)) for obj_type, hex_id in batch] | return [ | ||||
(bundle_type, hashutil.hash_to_bytes(hex_id)) for bundle_type, hex_id in batch | |||||
] | |||||
class VaultBackend: | class VaultBackend: | ||||
""" | """ | ||||
Backend for the Software Heritage Vault. | Backend for the Software Heritage Vault. | ||||
""" | """ | ||||
def __init__(self, **config): | def __init__(self, **config): | ||||
Show All 27 Lines | def _compute_ids(self, obj_id: ObjectId) -> Tuple[str, bytes]: | ||||
""" | """ | ||||
if isinstance(obj_id, str): | if isinstance(obj_id, str): | ||||
return obj_id, hashutil.hash_to_bytes(obj_id) | return obj_id, hashutil.hash_to_bytes(obj_id) | ||||
return hashutil.hash_to_hex(obj_id), obj_id | return hashutil.hash_to_hex(obj_id), obj_id | ||||
@db_transaction() | @db_transaction() | ||||
def progress( | def progress( | ||||
self, | self, | ||||
obj_type: str, | bundle_type: str, | ||||
obj_id: ObjectId, | obj_id: ObjectId, | ||||
raise_notfound: bool = True, | raise_notfound: bool = True, | ||||
db=None, | db=None, | ||||
cur=None, | cur=None, | ||||
) -> Optional[Dict[str, Any]]: | ) -> Optional[Dict[str, Any]]: | ||||
hex_id, obj_id = self._compute_ids(obj_id) | hex_id, obj_id = self._compute_ids(obj_id) | ||||
cur.execute( | cur.execute( | ||||
""" | """ | ||||
SELECT id, type, object_id, task_id, task_status, sticky, | SELECT id, type, object_id, task_id, task_status, sticky, | ||||
ts_created, ts_done, ts_last_access, progress_msg | ts_created, ts_done, ts_last_access, progress_msg | ||||
FROM vault_bundle | FROM vault_bundle | ||||
WHERE type = %s AND object_id = %s""", | WHERE type = %s AND object_id = %s""", | ||||
(obj_type, obj_id), | (bundle_type, obj_id), | ||||
) | ) | ||||
res = cur.fetchone() | res = cur.fetchone() | ||||
if not res: | if not res: | ||||
if raise_notfound: | if raise_notfound: | ||||
raise NotFoundExc(f"{obj_type} {hex_id} was not found.") | raise NotFoundExc(f"{bundle_type} {hex_id} was not found.") | ||||
return None | return None | ||||
res["object_id"] = hashutil.hash_to_hex(res["object_id"]) | res["object_id"] = hashutil.hash_to_hex(res["object_id"]) | ||||
return res | return res | ||||
def _send_task(self, obj_type: str, hex_id: ObjectId): | def _send_task(self, bundle_type: str, hex_id: ObjectId): | ||||
"""Send a cooking task to the celery scheduler""" | """Send a cooking task to the celery scheduler""" | ||||
task = create_oneshot_task_dict("cook-vault-bundle", obj_type, hex_id) | task = create_oneshot_task_dict("cook-vault-bundle", bundle_type, hex_id) | ||||
added_tasks = self.scheduler.create_tasks([task]) | added_tasks = self.scheduler.create_tasks([task]) | ||||
return added_tasks[0]["id"] | return added_tasks[0]["id"] | ||||
@db_transaction() | @db_transaction() | ||||
def create_task( | def create_task( | ||||
self, obj_type: str, obj_id: bytes, sticky: bool = False, db=None, cur=None | self, bundle_type: str, obj_id: bytes, sticky: bool = False, db=None, cur=None | ||||
): | ): | ||||
"""Create and send a cooking task""" | """Create and send a cooking task""" | ||||
hex_id, obj_id = self._compute_ids(obj_id) | hex_id, obj_id = self._compute_ids(obj_id) | ||||
cooker_class = get_cooker_cls(obj_type) | cooker_class = get_cooker_cls(bundle_type) | ||||
cooker = cooker_class(obj_type, hex_id, backend=self, storage=self.storage) | cooker = cooker_class(bundle_type, hex_id, backend=self, storage=self.storage) | ||||
if not cooker.check_exists(): | if not cooker.check_exists(): | ||||
raise NotFoundExc(f"{obj_type} {hex_id} was not found.") | raise NotFoundExc(f"{bundle_type} {hex_id} was not found.") | ||||
cur.execute( | cur.execute( | ||||
""" | """ | ||||
INSERT INTO vault_bundle (type, object_id, sticky) | INSERT INTO vault_bundle (type, object_id, sticky) | ||||
VALUES (%s, %s, %s)""", | VALUES (%s, %s, %s)""", | ||||
(obj_type, obj_id, sticky), | (bundle_type, obj_id, sticky), | ||||
) | ) | ||||
db.conn.commit() | db.conn.commit() | ||||
task_id = self._send_task(obj_type, hex_id) | task_id = self._send_task(bundle_type, hex_id) | ||||
cur.execute( | cur.execute( | ||||
""" | """ | ||||
UPDATE vault_bundle | UPDATE vault_bundle | ||||
SET task_id = %s | SET task_id = %s | ||||
WHERE type = %s AND object_id = %s""", | WHERE type = %s AND object_id = %s""", | ||||
(task_id, obj_type, obj_id), | (task_id, bundle_type, obj_id), | ||||
) | ) | ||||
@db_transaction() | @db_transaction() | ||||
def add_notif_email( | def add_notif_email( | ||||
self, obj_type: str, obj_id: bytes, email: str, db=None, cur=None | self, bundle_type: str, obj_id: bytes, email: str, db=None, cur=None | ||||
): | ): | ||||
"""Add an e-mail address to notify when a given bundle is ready""" | """Add an e-mail address to notify when a given bundle is ready""" | ||||
cur.execute( | cur.execute( | ||||
""" | """ | ||||
INSERT INTO vault_notif_email (email, bundle_id) | INSERT INTO vault_notif_email (email, bundle_id) | ||||
VALUES (%s, (SELECT id FROM vault_bundle | VALUES (%s, (SELECT id FROM vault_bundle | ||||
WHERE type = %s AND object_id = %s))""", | WHERE type = %s AND object_id = %s))""", | ||||
(email, obj_type, obj_id), | (email, bundle_type, obj_id), | ||||
) | ) | ||||
def put_bundle(self, obj_type: str, obj_id: ObjectId, bundle) -> bool: | def put_bundle(self, bundle_type: str, obj_id: ObjectId, bundle) -> bool: | ||||
_, obj_id = self._compute_ids(obj_id) | _, obj_id = self._compute_ids(obj_id) | ||||
self.cache.add(obj_type, obj_id, bundle) | self.cache.add(bundle_type, obj_id, bundle) | ||||
return True | return True | ||||
@db_transaction() | @db_transaction() | ||||
def cook( | def cook( | ||||
self, | self, | ||||
obj_type: str, | bundle_type: str, | ||||
obj_id: ObjectId, | obj_id: ObjectId, | ||||
*, | *, | ||||
sticky: bool = False, | sticky: bool = False, | ||||
email: Optional[str] = None, | email: Optional[str] = None, | ||||
db=None, | db=None, | ||||
cur=None, | cur=None, | ||||
) -> Dict[str, Any]: | ) -> Dict[str, Any]: | ||||
hex_id, obj_id = self._compute_ids(obj_id) | hex_id, obj_id = self._compute_ids(obj_id) | ||||
info = self.progress(obj_type, obj_id, raise_notfound=False) | info = self.progress(bundle_type, obj_id, raise_notfound=False) | ||||
if obj_type not in COOKER_TYPES: | if bundle_type not in COOKER_TYPES: | ||||
raise NotFoundExc(f"{obj_type} is an unknown type.") | raise NotFoundExc(f"{bundle_type} is an unknown type.") | ||||
# If there's a failed bundle entry, delete it first. | # If there's a failed bundle entry, delete it first. | ||||
if info is not None and info["task_status"] == "failed": | if info is not None and info["task_status"] == "failed": | ||||
obj_id = hashutil.hash_to_bytes(obj_id) | obj_id = hashutil.hash_to_bytes(obj_id) | ||||
cur.execute( | cur.execute( | ||||
"DELETE FROM vault_bundle WHERE type = %s AND object_id = %s", | "DELETE FROM vault_bundle WHERE type = %s AND object_id = %s", | ||||
(obj_type, obj_id), | (bundle_type, obj_id), | ||||
) | ) | ||||
db.conn.commit() | db.conn.commit() | ||||
info = None | info = None | ||||
# If there's no bundle entry, create the task. | # If there's no bundle entry, create the task. | ||||
if info is None: | if info is None: | ||||
self.create_task(obj_type, obj_id, sticky) | self.create_task(bundle_type, obj_id, sticky) | ||||
if email is not None: | if email is not None: | ||||
# If the task is already done, send the email directly | # If the task is already done, send the email directly | ||||
if info is not None and info["task_status"] == "done": | if info is not None and info["task_status"] == "done": | ||||
self.send_notification( | self.send_notification( | ||||
None, email, obj_type, hex_id, info["task_status"] | None, email, bundle_type, hex_id, info["task_status"] | ||||
) | ) | ||||
# Else, add it to the notification queue | # Else, add it to the notification queue | ||||
else: | else: | ||||
self.add_notif_email(obj_type, obj_id, email) | self.add_notif_email(bundle_type, obj_id, email) | ||||
return self.progress(obj_type, obj_id) | return self.progress(bundle_type, obj_id) | ||||
@db_transaction() | @db_transaction() | ||||
def batch_cook( | def batch_cook( | ||||
self, batch: List[Tuple[str, str]], db=None, cur=None | self, batch: List[Tuple[str, str]], db=None, cur=None | ||||
) -> Dict[str, int]: | ) -> Dict[str, int]: | ||||
# Import execute_values at runtime only, because it requires | # Import execute_values at runtime only, because it requires | ||||
# psycopg2 >= 2.7 (only available on postgresql servers) | # psycopg2 >= 2.7 (only available on postgresql servers) | ||||
from psycopg2.extras import execute_values | from psycopg2.extras import execute_values | ||||
for obj_type, _ in batch: | for bundle_type, _ in batch: | ||||
if obj_type not in COOKER_TYPES: | if bundle_type not in COOKER_TYPES: | ||||
raise NotFoundExc(f"{obj_type} is an unknown type.") | raise NotFoundExc(f"{bundle_type} is an unknown type.") | ||||
cur.execute( | cur.execute( | ||||
""" | """ | ||||
INSERT INTO vault_batch (id) | INSERT INTO vault_batch (id) | ||||
VALUES (DEFAULT) | VALUES (DEFAULT) | ||||
RETURNING id""" | RETURNING id""" | ||||
) | ) | ||||
batch_id = cur.fetchone()["id"] | batch_id = cur.fetchone()["id"] | ||||
▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | ) -> Dict[str, int]: | ||||
batch_new = [ | batch_new = [ | ||||
(row["type"], bytes(row["object_id"])) | (row["type"], bytes(row["object_id"])) | ||||
for row in bundles | for row in bundles | ||||
if row["task_id"] is None | if row["task_id"] is None | ||||
] | ] | ||||
# Send the tasks | # Send the tasks | ||||
args_batch = [ | args_batch = [ | ||||
(obj_type, hashutil.hash_to_hex(obj_id)) for obj_type, obj_id in batch_new | (bundle_type, hashutil.hash_to_hex(obj_id)) | ||||
for bundle_type, obj_id in batch_new | |||||
] | ] | ||||
# TODO: change once the scheduler handles priority tasks | # TODO: change once the scheduler handles priority tasks | ||||
tasks = [ | tasks = [ | ||||
create_oneshot_task_dict("swh-vault-batch-cooking", *args) | create_oneshot_task_dict("swh-vault-batch-cooking", *args) | ||||
for args in args_batch | for args in args_batch | ||||
] | ] | ||||
added_tasks = self.scheduler.create_tasks(tasks) | added_tasks = self.scheduler.create_tasks(tasks) | ||||
tasks_ids_bundle_ids = [ | tasks_ids_bundle_ids = [ | ||||
(task_id, obj_type, obj_id) | (task_id, bundle_type, obj_id) | ||||
for task_id, (obj_type, obj_id) in zip( | for task_id, (bundle_type, obj_id) in zip( | ||||
[task["id"] for task in added_tasks], batch_new | [task["id"] for task in added_tasks], batch_new | ||||
) | ) | ||||
] | ] | ||||
# Update the task ids | # Update the task ids | ||||
execute_values( | execute_values( | ||||
cur, | cur, | ||||
""" | """ | ||||
Show All 30 Lines | def batch_progress(self, batch_id: int, db=None, cur=None) -> Dict[str, Any]: | ||||
"total": len(bundles), | "total": len(bundles), | ||||
**{k: 0 for k in ("new", "pending", "done", "failed")}, | **{k: 0 for k in ("new", "pending", "done", "failed")}, | ||||
**dict(counter), | **dict(counter), | ||||
} | } | ||||
return res | return res | ||||
@db_transaction() | @db_transaction() | ||||
def is_available(self, obj_type: str, obj_id: ObjectId, db=None, cur=None): | def is_available(self, bundle_type: str, obj_id: ObjectId, db=None, cur=None): | ||||
"""Check whether a bundle is available for retrieval""" | """Check whether a bundle is available for retrieval""" | ||||
info = self.progress(obj_type, obj_id, raise_notfound=False, cur=cur) | info = self.progress(bundle_type, obj_id, raise_notfound=False, cur=cur) | ||||
obj_id = hashutil.hash_to_bytes(obj_id) | obj_id = hashutil.hash_to_bytes(obj_id) | ||||
return ( | return ( | ||||
info is not None | info is not None | ||||
and info["task_status"] == "done" | and info["task_status"] == "done" | ||||
and self.cache.is_cached(obj_type, obj_id) | and self.cache.is_cached(bundle_type, obj_id) | ||||
) | ) | ||||
@db_transaction() | @db_transaction() | ||||
def fetch( | def fetch( | ||||
self, obj_type: str, obj_id: ObjectId, raise_notfound=True, db=None, cur=None | self, bundle_type: str, obj_id: ObjectId, raise_notfound=True, db=None, cur=None | ||||
) -> Optional[bytes]: | ) -> Optional[bytes]: | ||||
"""Retrieve a bundle from the cache""" | """Retrieve a bundle from the cache""" | ||||
hex_id, obj_id = self._compute_ids(obj_id) | hex_id, obj_id = self._compute_ids(obj_id) | ||||
available = self.is_available(obj_type, obj_id, cur=cur) | available = self.is_available(bundle_type, obj_id, cur=cur) | ||||
if not available: | if not available: | ||||
if raise_notfound: | if raise_notfound: | ||||
raise NotFoundExc(f"{obj_type} {hex_id} is not available.") | raise NotFoundExc(f"{bundle_type} {hex_id} is not available.") | ||||
return None | return None | ||||
self.update_access_ts(obj_type, obj_id, cur=cur) | self.update_access_ts(bundle_type, obj_id, cur=cur) | ||||
return self.cache.get(obj_type, obj_id) | return self.cache.get(bundle_type, obj_id) | ||||
@db_transaction() | @db_transaction() | ||||
def update_access_ts(self, obj_type: str, obj_id: bytes, db=None, cur=None): | def update_access_ts(self, bundle_type: str, obj_id: bytes, db=None, cur=None): | ||||
"""Update the last access timestamp of a bundle""" | """Update the last access timestamp of a bundle""" | ||||
cur.execute( | cur.execute( | ||||
""" | """ | ||||
UPDATE vault_bundle | UPDATE vault_bundle | ||||
SET ts_last_access = NOW() | SET ts_last_access = NOW() | ||||
WHERE type = %s AND object_id = %s""", | WHERE type = %s AND object_id = %s""", | ||||
(obj_type, obj_id), | (bundle_type, obj_id), | ||||
) | ) | ||||
@db_transaction() | @db_transaction() | ||||
def set_status( | def set_status( | ||||
self, obj_type: str, obj_id: ObjectId, status: str, db=None, cur=None | self, bundle_type: str, obj_id: ObjectId, status: str, db=None, cur=None | ||||
) -> bool: | ) -> bool: | ||||
obj_id = hashutil.hash_to_bytes(obj_id) | obj_id = hashutil.hash_to_bytes(obj_id) | ||||
req = ( | req = ( | ||||
""" | """ | ||||
UPDATE vault_bundle | UPDATE vault_bundle | ||||
SET task_status = %s """ | SET task_status = %s """ | ||||
+ (""", ts_done = NOW() """ if status == "done" else "") | + (""", ts_done = NOW() """ if status == "done" else "") | ||||
+ """WHERE type = %s AND object_id = %s""" | + """WHERE type = %s AND object_id = %s""" | ||||
) | ) | ||||
cur.execute(req, (status, obj_type, obj_id)) | cur.execute(req, (status, bundle_type, obj_id)) | ||||
return True | return True | ||||
@db_transaction() | @db_transaction() | ||||
def set_progress( | def set_progress( | ||||
self, obj_type: str, obj_id: ObjectId, progress: str, db=None, cur=None | self, bundle_type: str, obj_id: ObjectId, progress: str, db=None, cur=None | ||||
) -> bool: | ) -> bool: | ||||
obj_id = hashutil.hash_to_bytes(obj_id) | obj_id = hashutil.hash_to_bytes(obj_id) | ||||
cur.execute( | cur.execute( | ||||
""" | """ | ||||
UPDATE vault_bundle | UPDATE vault_bundle | ||||
SET progress_msg = %s | SET progress_msg = %s | ||||
WHERE type = %s AND object_id = %s""", | WHERE type = %s AND object_id = %s""", | ||||
(progress, obj_type, obj_id), | (progress, bundle_type, obj_id), | ||||
) | ) | ||||
return True | return True | ||||
@db_transaction() | @db_transaction() | ||||
def send_notif(self, obj_type: str, obj_id: ObjectId, db=None, cur=None) -> bool: | def send_notif(self, bundle_type: str, obj_id: ObjectId, db=None, cur=None) -> bool: | ||||
hex_id, obj_id = self._compute_ids(obj_id) | hex_id, obj_id = self._compute_ids(obj_id) | ||||
cur.execute( | cur.execute( | ||||
""" | """ | ||||
SELECT vault_notif_email.id AS id, email, task_status, progress_msg | SELECT vault_notif_email.id AS id, email, task_status, progress_msg | ||||
FROM vault_notif_email | FROM vault_notif_email | ||||
INNER JOIN vault_bundle ON bundle_id = vault_bundle.id | INNER JOIN vault_bundle ON bundle_id = vault_bundle.id | ||||
WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s""", | WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s""", | ||||
(obj_type, obj_id), | (bundle_type, obj_id), | ||||
) | ) | ||||
for d in cur: | for d in cur: | ||||
self.send_notification( | self.send_notification( | ||||
d["id"], | d["id"], | ||||
d["email"], | d["email"], | ||||
obj_type, | bundle_type, | ||||
hex_id, | hex_id, | ||||
status=d["task_status"], | status=d["task_status"], | ||||
progress_msg=d["progress_msg"], | progress_msg=d["progress_msg"], | ||||
) | ) | ||||
return True | return True | ||||
@db_transaction() | @db_transaction() | ||||
def send_notification( | def send_notification( | ||||
self, | self, | ||||
n_id: Optional[int], | n_id: Optional[int], | ||||
email: str, | email: str, | ||||
obj_type: str, | bundle_type: str, | ||||
hex_id: str, | hex_id: str, | ||||
status: str, | status: str, | ||||
progress_msg: Optional[str] = None, | progress_msg: Optional[str] = None, | ||||
db=None, | db=None, | ||||
cur=None, | cur=None, | ||||
) -> None: | ) -> None: | ||||
"""Send the notification of a bundle to a specific e-mail""" | """Send the notification of a bundle to a specific e-mail""" | ||||
short_id = hex_id[:7] | short_id = hex_id[:7] | ||||
# TODO: instead of hardcoding this, we should probably: | # TODO: instead of hardcoding this, we should probably: | ||||
# * add a "fetch_url" field in the vault_notif_email table | # * add a "fetch_url" field in the vault_notif_email table | ||||
# * generate the url with flask.url_for() on the web-ui side | # * generate the url with flask.url_for() on the web-ui side | ||||
# * send this url as part of the cook request and store it in | # * send this url as part of the cook request and store it in | ||||
# the table | # the table | ||||
# * use this url for the notification e-mail | # * use this url for the notification e-mail | ||||
url = "https://archive.softwareheritage.org/api/1/vault/{}/{}/" "raw".format( | url = "https://archive.softwareheritage.org/api/1/vault/{}/{}/" "raw".format( | ||||
obj_type, hex_id | bundle_type, hex_id | ||||
) | ) | ||||
if status == "done": | if status == "done": | ||||
text = NOTIF_EMAIL_BODY_SUCCESS.strip() | text = NOTIF_EMAIL_BODY_SUCCESS.strip() | ||||
text = text.format(obj_type=obj_type, hex_id=hex_id, url=url) | text = text.format(bundle_type=bundle_type, hex_id=hex_id, url=url) | ||||
msg = MIMEText(text) | msg = MIMEText(text) | ||||
msg["Subject"] = NOTIF_EMAIL_SUBJECT_SUCCESS.format( | msg["Subject"] = NOTIF_EMAIL_SUBJECT_SUCCESS.format( | ||||
obj_type=obj_type, short_id=short_id | bundle_type=bundle_type, short_id=short_id | ||||
) | ) | ||||
elif status == "failed": | elif status == "failed": | ||||
text = NOTIF_EMAIL_BODY_FAILURE.strip() | text = NOTIF_EMAIL_BODY_FAILURE.strip() | ||||
text = text.format( | text = text.format( | ||||
obj_type=obj_type, hex_id=hex_id, progress_msg=progress_msg | bundle_type=bundle_type, hex_id=hex_id, progress_msg=progress_msg | ||||
) | ) | ||||
msg = MIMEText(text) | msg = MIMEText(text) | ||||
msg["Subject"] = NOTIF_EMAIL_SUBJECT_FAILURE.format( | msg["Subject"] = NOTIF_EMAIL_SUBJECT_FAILURE.format( | ||||
obj_type=obj_type, short_id=short_id | bundle_type=bundle_type, short_id=short_id | ||||
) | ) | ||||
else: | else: | ||||
raise RuntimeError( | raise RuntimeError( | ||||
"send_notification called on a '{}' bundle".format(status) | "send_notification called on a '{}' bundle".format(status) | ||||
) | ) | ||||
msg["From"] = NOTIF_EMAIL_FROM | msg["From"] = NOTIF_EMAIL_FROM | ||||
msg["To"] = email | msg["To"] = email | ||||
▲ Show 20 Lines • Show All 59 Lines • Show Last 20 Lines |