Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/backend.py
# Copyright (C) 2017-2020 The Software Heritage developers | # Copyright (C) 2017-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import collections | import collections | ||||
from email.mime.text import MIMEText | from email.mime.text import MIMEText | ||||
import smtplib | import smtplib | ||||
from typing import Any, Dict, List, Optional, Tuple | from typing import Any, Dict, List, Optional, Tuple | ||||
import psycopg2.extras | import psycopg2.extras | ||||
import psycopg2.pool | import psycopg2.pool | ||||
from swh.core.db import BaseDb | from swh.core.db import BaseDb | ||||
from swh.core.db.common import db_transaction | from swh.core.db.common import db_transaction | ||||
from swh.model import hashutil | from swh.model.identifiers import CoreSWHID | ||||
from swh.scheduler import get_scheduler | from swh.scheduler import get_scheduler | ||||
from swh.scheduler.utils import create_oneshot_task_dict | from swh.scheduler.utils import create_oneshot_task_dict | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.vault.cache import VaultCache | from swh.vault.cache import VaultCache | ||||
from swh.vault.cookers import COOKER_TYPES, get_cooker_cls | from swh.vault.cookers import COOKER_TYPES, get_cooker_cls | ||||
from swh.vault.exc import NotFoundExc | from swh.vault.exc import NotFoundExc | ||||
from swh.vault.interface import ObjectId | |||||
cooking_task_name = "swh.vault.cooking_tasks.SWHCookingTask" | cooking_task_name = "swh.vault.cooking_tasks.SWHCookingTask" | ||||
NOTIF_EMAIL_FROM = '"Software Heritage Vault" ' "<bot@softwareheritage.org>" | NOTIF_EMAIL_FROM = '"Software Heritage Vault" ' "<bot@softwareheritage.org>" | ||||
NOTIF_EMAIL_SUBJECT_SUCCESS = "Bundle ready: {bundle_type} {short_id}" | NOTIF_EMAIL_SUBJECT_SUCCESS = "Bundle ready: {bundle_type} {short_id}" | ||||
NOTIF_EMAIL_SUBJECT_FAILURE = "Bundle failed: {bundle_type} {short_id}" | NOTIF_EMAIL_SUBJECT_FAILURE = "Bundle failed: {bundle_type} {short_id}" | ||||
NOTIF_EMAIL_BODY_SUCCESS = """ | NOTIF_EMAIL_BODY_SUCCESS = """ | ||||
You have requested the following bundle from the Software Heritage | You have requested the following bundle from the Software Heritage | ||||
Vault: | Vault: | ||||
Bundle Type: {bundle_type} | Bundle Type: {bundle_type} | ||||
Object ID: {hex_id} | Object SWHID: {swhid} | ||||
This bundle is now available for download at the following address: | This bundle is now available for download at the following address: | ||||
{url} | {url} | ||||
Please keep in mind that this link might expire at some point, in which | Please keep in mind that this link might expire at some point, in which | ||||
case you will need to request the bundle again. | case you will need to request the bundle again. | ||||
--\x20 | --\x20 | ||||
The Software Heritage Developers | The Software Heritage Developers | ||||
""" | """ | ||||
NOTIF_EMAIL_BODY_FAILURE = """ | NOTIF_EMAIL_BODY_FAILURE = """ | ||||
You have requested the following bundle from the Software Heritage | You have requested the following bundle from the Software Heritage | ||||
Vault: | Vault: | ||||
Bundle Type: {bundle_type} | Bundle Type: {bundle_type} | ||||
Object ID: {hex_id} | Object SWHID: {swhid} | ||||
This bundle could not be cooked for the following reason: | This bundle could not be cooked for the following reason: | ||||
{progress_msg} | {progress_msg} | ||||
We apologize for the inconvenience. | We apologize for the inconvenience. | ||||
--\x20 | --\x20 | ||||
The Software Heritage Developers | The Software Heritage Developers | ||||
""" | """ | ||||
def batch_to_bytes(batch: List[Tuple[str, str]]) -> List[Tuple[str, bytes]]: | |||||
return [ | |||||
(bundle_type, hashutil.hash_to_bytes(hex_id)) for bundle_type, hex_id in batch | |||||
] | |||||
class VaultBackend: | class VaultBackend: | ||||
""" | """ | ||||
Backend for the Software Heritage Vault. | Backend for the Software Heritage Vault. | ||||
""" | """ | ||||
def __init__(self, **config): | def __init__(self, **config): | ||||
self.config = config | self.config = config | ||||
self.cache = VaultCache(**config["cache"]) | self.cache = VaultCache(**config["cache"]) | ||||
Show All 14 Lines | def get_db(self): | ||||
if self._db: | if self._db: | ||||
return self._db | return self._db | ||||
return BaseDb.from_pool(self._pool) | return BaseDb.from_pool(self._pool) | ||||
def put_db(self, db): | def put_db(self, db): | ||||
if db is not self._db: | if db is not self._db: | ||||
db.put_conn() | db.put_conn() | ||||
def _compute_ids(self, obj_id: ObjectId) -> Tuple[str, bytes]: | |||||
"""Internal method to reconcile multiple possible inputs | |||||
""" | |||||
if isinstance(obj_id, str): | |||||
return obj_id, hashutil.hash_to_bytes(obj_id) | |||||
return hashutil.hash_to_hex(obj_id), obj_id | |||||
@db_transaction() | @db_transaction() | ||||
def progress( | def progress( | ||||
self, | self, | ||||
bundle_type: str, | bundle_type: str, | ||||
obj_id: ObjectId, | swhid: CoreSWHID, | ||||
raise_notfound: bool = True, | raise_notfound: bool = True, | ||||
db=None, | db=None, | ||||
cur=None, | cur=None, | ||||
) -> Optional[Dict[str, Any]]: | ) -> Optional[Dict[str, Any]]: | ||||
hex_id, obj_id = self._compute_ids(obj_id) | |||||
cur.execute( | cur.execute( | ||||
""" | """ | ||||
SELECT id, type, object_id, task_id, task_status, sticky, | SELECT id, type, swhid, task_id, task_status, sticky, | ||||
ts_created, ts_done, ts_last_access, progress_msg | ts_created, ts_done, ts_last_access, progress_msg | ||||
FROM vault_bundle | FROM vault_bundle | ||||
WHERE type = %s AND object_id = %s""", | WHERE type = %s AND swhid = %s""", | ||||
(bundle_type, obj_id), | (bundle_type, str(swhid)), | ||||
) | ) | ||||
res = cur.fetchone() | res = cur.fetchone() | ||||
if not res: | if not res: | ||||
if raise_notfound: | if raise_notfound: | ||||
raise NotFoundExc(f"{bundle_type} {hex_id} was not found.") | raise NotFoundExc(f"{bundle_type} {swhid} was not found.") | ||||
return None | return None | ||||
res["swhid"] = CoreSWHID.from_string(res["swhid"]) | |||||
res["object_id"] = hashutil.hash_to_hex(res["object_id"]) | |||||
return res | return res | ||||
def _send_task(self, bundle_type: str, hex_id: ObjectId): | def _send_task(self, bundle_type: str, swhid: CoreSWHID): | ||||
"""Send a cooking task to the celery scheduler""" | """Send a cooking task to the celery scheduler""" | ||||
task = create_oneshot_task_dict("cook-vault-bundle", bundle_type, hex_id) | task = create_oneshot_task_dict("cook-vault-bundle", bundle_type, str(swhid)) | ||||
added_tasks = self.scheduler.create_tasks([task]) | added_tasks = self.scheduler.create_tasks([task]) | ||||
return added_tasks[0]["id"] | return added_tasks[0]["id"] | ||||
@db_transaction() | @db_transaction() | ||||
def create_task( | def create_task( | ||||
self, bundle_type: str, obj_id: bytes, sticky: bool = False, db=None, cur=None | self, | ||||
bundle_type: str, | |||||
swhid: CoreSWHID, | |||||
sticky: bool = False, | |||||
db=None, | |||||
cur=None, | |||||
): | ): | ||||
"""Create and send a cooking task""" | """Create and send a cooking task""" | ||||
hex_id, obj_id = self._compute_ids(obj_id) | cooker_class = get_cooker_cls(bundle_type, swhid.object_type) | ||||
cooker = cooker_class(swhid, backend=self, storage=self.storage) | |||||
cooker_class = get_cooker_cls(bundle_type) | |||||
cooker = cooker_class(bundle_type, hex_id, backend=self, storage=self.storage) | |||||
if not cooker.check_exists(): | if not cooker.check_exists(): | ||||
raise NotFoundExc(f"{bundle_type} {hex_id} was not found.") | raise NotFoundExc(f"{bundle_type} {swhid} was not found.") | ||||
cur.execute( | cur.execute( | ||||
""" | """ | ||||
INSERT INTO vault_bundle (type, object_id, sticky) | INSERT INTO vault_bundle (type, swhid, sticky) | ||||
VALUES (%s, %s, %s)""", | VALUES (%s, %s, %s)""", | ||||
(bundle_type, obj_id, sticky), | (bundle_type, str(swhid), sticky), | ||||
) | ) | ||||
db.conn.commit() | db.conn.commit() | ||||
task_id = self._send_task(bundle_type, hex_id) | task_id = self._send_task(bundle_type, swhid) | ||||
cur.execute( | cur.execute( | ||||
""" | """ | ||||
UPDATE vault_bundle | UPDATE vault_bundle | ||||
SET task_id = %s | SET task_id = %s | ||||
WHERE type = %s AND object_id = %s""", | WHERE type = %s AND swhid = %s""", | ||||
(task_id, bundle_type, obj_id), | (task_id, bundle_type, str(swhid)), | ||||
) | ) | ||||
@db_transaction() | @db_transaction() | ||||
def add_notif_email( | def add_notif_email( | ||||
self, bundle_type: str, obj_id: bytes, email: str, db=None, cur=None | self, bundle_type: str, swhid: CoreSWHID, email: str, db=None, cur=None | ||||
): | ): | ||||
"""Add an e-mail address to notify when a given bundle is ready""" | """Add an e-mail address to notify when a given bundle is ready""" | ||||
cur.execute( | cur.execute( | ||||
""" | """ | ||||
INSERT INTO vault_notif_email (email, bundle_id) | INSERT INTO vault_notif_email (email, bundle_id) | ||||
VALUES (%s, (SELECT id FROM vault_bundle | VALUES (%s, (SELECT id FROM vault_bundle | ||||
WHERE type = %s AND object_id = %s))""", | WHERE type = %s AND swhid = %s))""", | ||||
(email, bundle_type, obj_id), | (email, bundle_type, str(swhid)), | ||||
) | ) | ||||
def put_bundle(self, bundle_type: str, obj_id: ObjectId, bundle) -> bool: | def put_bundle(self, bundle_type: str, swhid: CoreSWHID, bundle) -> bool: | ||||
_, obj_id = self._compute_ids(obj_id) | self.cache.add(bundle_type, swhid, bundle) | ||||
self.cache.add(bundle_type, obj_id, bundle) | |||||
return True | return True | ||||
@db_transaction() | @db_transaction() | ||||
def cook( | def cook( | ||||
self, | self, | ||||
bundle_type: str, | bundle_type: str, | ||||
obj_id: ObjectId, | swhid: CoreSWHID, | ||||
*, | *, | ||||
sticky: bool = False, | sticky: bool = False, | ||||
email: Optional[str] = None, | email: Optional[str] = None, | ||||
db=None, | db=None, | ||||
cur=None, | cur=None, | ||||
) -> Dict[str, Any]: | ) -> Dict[str, Any]: | ||||
hex_id, obj_id = self._compute_ids(obj_id) | info = self.progress(bundle_type, swhid, raise_notfound=False) | ||||
info = self.progress(bundle_type, obj_id, raise_notfound=False) | |||||
if bundle_type not in COOKER_TYPES: | if bundle_type not in COOKER_TYPES: | ||||
raise NotFoundExc(f"{bundle_type} is an unknown type.") | raise NotFoundExc(f"{bundle_type} is an unknown type.") | ||||
# If there's a failed bundle entry, delete it first. | # If there's a failed bundle entry, delete it first. | ||||
if info is not None and info["task_status"] == "failed": | if info is not None and info["task_status"] == "failed": | ||||
obj_id = hashutil.hash_to_bytes(obj_id) | |||||
cur.execute( | cur.execute( | ||||
"DELETE FROM vault_bundle WHERE type = %s AND object_id = %s", | "DELETE FROM vault_bundle WHERE type = %s AND swhid = %s", | ||||
(bundle_type, obj_id), | (bundle_type, str(swhid)), | ||||
) | ) | ||||
db.conn.commit() | db.conn.commit() | ||||
info = None | info = None | ||||
# If there's no bundle entry, create the task. | # If there's no bundle entry, create the task. | ||||
if info is None: | if info is None: | ||||
self.create_task(bundle_type, obj_id, sticky) | self.create_task(bundle_type, swhid, sticky) | ||||
if email is not None: | if email is not None: | ||||
# If the task is already done, send the email directly | # If the task is already done, send the email directly | ||||
if info is not None and info["task_status"] == "done": | if info is not None and info["task_status"] == "done": | ||||
self.send_notification( | self.send_notification( | ||||
None, email, bundle_type, hex_id, info["task_status"] | None, email, bundle_type, swhid, info["task_status"] | ||||
) | ) | ||||
# Else, add it to the notification queue | # Else, add it to the notification queue | ||||
else: | else: | ||||
self.add_notif_email(bundle_type, obj_id, email) | self.add_notif_email(bundle_type, swhid, email) | ||||
return self.progress(bundle_type, obj_id) | return self.progress(bundle_type, swhid) | ||||
@db_transaction() | @db_transaction() | ||||
def batch_cook( | def batch_cook( | ||||
self, batch: List[Tuple[str, str]], db=None, cur=None | self, batch: List[Tuple[str, str]], db=None, cur=None | ||||
) -> Dict[str, int]: | ) -> Dict[str, int]: | ||||
# Import execute_values at runtime only, because it requires | # Import execute_values at runtime only, because it requires | ||||
# psycopg2 >= 2.7 (only available on postgresql servers) | # psycopg2 >= 2.7 (only available on postgresql servers) | ||||
from psycopg2.extras import execute_values | from psycopg2.extras import execute_values | ||||
for bundle_type, _ in batch: | for bundle_type, _ in batch: | ||||
if bundle_type not in COOKER_TYPES: | if bundle_type not in COOKER_TYPES: | ||||
raise NotFoundExc(f"{bundle_type} is an unknown type.") | raise NotFoundExc(f"{bundle_type} is an unknown type.") | ||||
cur.execute( | cur.execute( | ||||
""" | """ | ||||
INSERT INTO vault_batch (id) | INSERT INTO vault_batch (id) | ||||
VALUES (DEFAULT) | VALUES (DEFAULT) | ||||
RETURNING id""" | RETURNING id""" | ||||
) | ) | ||||
batch_id = cur.fetchone()["id"] | batch_id = cur.fetchone()["id"] | ||||
batch_bytes = batch_to_bytes(batch) | |||||
# Delete all failed bundles from the batch | # Delete all failed bundles from the batch | ||||
cur.execute( | cur.execute( | ||||
""" | """ | ||||
DELETE FROM vault_bundle | DELETE FROM vault_bundle | ||||
WHERE task_status = 'failed' | WHERE task_status = 'failed' | ||||
AND (type, object_id) IN %s""", | AND (type, swhid) IN %s""", | ||||
(tuple(batch_bytes),), | (tuple(batch),), | ||||
) | ) | ||||
# Insert all the bundles, return the new ones | # Insert all the bundles, return the new ones | ||||
execute_values( | execute_values( | ||||
cur, | cur, | ||||
""" | """ | ||||
INSERT INTO vault_bundle (type, object_id) | INSERT INTO vault_bundle (type, swhid) | ||||
VALUES %s ON CONFLICT DO NOTHING""", | VALUES %s ON CONFLICT DO NOTHING""", | ||||
batch_bytes, | batch, | ||||
) | ) | ||||
# Get the bundle ids and task status | # Get the bundle ids and task status | ||||
cur.execute( | cur.execute( | ||||
""" | """ | ||||
SELECT id, type, object_id, task_id FROM vault_bundle | SELECT id, type, swhid, task_id FROM vault_bundle | ||||
WHERE (type, object_id) IN %s""", | WHERE (type, swhid) IN %s""", | ||||
(tuple(batch_bytes),), | (tuple(batch),), | ||||
) | ) | ||||
bundles = cur.fetchall() | bundles = cur.fetchall() | ||||
# Insert the batch-bundle entries | # Insert the batch-bundle entries | ||||
batch_id_bundle_ids = [(batch_id, row["id"]) for row in bundles] | batch_id_bundle_ids = [(batch_id, row["id"]) for row in bundles] | ||||
execute_values( | execute_values( | ||||
cur, | cur, | ||||
""" | """ | ||||
INSERT INTO vault_batch_bundle (batch_id, bundle_id) | INSERT INTO vault_batch_bundle (batch_id, bundle_id) | ||||
VALUES %s ON CONFLICT DO NOTHING""", | VALUES %s ON CONFLICT DO NOTHING""", | ||||
batch_id_bundle_ids, | batch_id_bundle_ids, | ||||
) | ) | ||||
db.conn.commit() | db.conn.commit() | ||||
# Get the tasks to fetch | # Get the tasks to fetch | ||||
batch_new = [ | batch_new = [ | ||||
(row["type"], bytes(row["object_id"])) | (row["type"], CoreSWHID.from_string(row["swhid"])) | ||||
for row in bundles | for row in bundles | ||||
if row["task_id"] is None | if row["task_id"] is None | ||||
] | ] | ||||
# Send the tasks | # Send the tasks | ||||
args_batch = [ | args_batch = [(bundle_type, swhid) for bundle_type, swhid in batch_new] | ||||
(bundle_type, hashutil.hash_to_hex(obj_id)) | |||||
for bundle_type, obj_id in batch_new | |||||
] | |||||
# TODO: change once the scheduler handles priority tasks | # TODO: change once the scheduler handles priority tasks | ||||
tasks = [ | tasks = [ | ||||
create_oneshot_task_dict("swh-vault-batch-cooking", *args) | create_oneshot_task_dict("swh-vault-batch-cooking", *args) | ||||
for args in args_batch | for args in args_batch | ||||
] | ] | ||||
added_tasks = self.scheduler.create_tasks(tasks) | added_tasks = self.scheduler.create_tasks(tasks) | ||||
tasks_ids_bundle_ids = [ | tasks_ids_bundle_ids = [ | ||||
(task_id, bundle_type, obj_id) | (task_id, bundle_type, swhid) | ||||
for task_id, (bundle_type, obj_id) in zip( | for task_id, (bundle_type, swhid) in zip( | ||||
[task["id"] for task in added_tasks], batch_new | [task["id"] for task in added_tasks], batch_new | ||||
) | ) | ||||
] | ] | ||||
# Update the task ids | # Update the task ids | ||||
execute_values( | execute_values( | ||||
cur, | cur, | ||||
""" | """ | ||||
UPDATE vault_bundle | UPDATE vault_bundle | ||||
SET task_id = s_task_id | SET task_id = s_task_id | ||||
FROM (VALUES %s) AS sub (s_task_id, s_type, s_object_id) | FROM (VALUES %s) AS sub (s_task_id, s_type, s_swhid) | ||||
WHERE type = s_type::cook_type AND object_id = s_object_id """, | WHERE type = s_type::cook_type AND swhid = s_swhid """, | ||||
tasks_ids_bundle_ids, | tasks_ids_bundle_ids, | ||||
) | ) | ||||
return {"id": batch_id} | return {"id": batch_id} | ||||
@db_transaction() | @db_transaction() | ||||
def batch_progress(self, batch_id: int, db=None, cur=None) -> Dict[str, Any]: | def batch_progress(self, batch_id: int, db=None, cur=None) -> Dict[str, Any]: | ||||
cur.execute( | cur.execute( | ||||
""" | """ | ||||
SELECT vault_bundle.id as id, | SELECT vault_bundle.id as id, | ||||
type, object_id, task_id, task_status, sticky, | type, swhid, task_id, task_status, sticky, | ||||
ts_created, ts_done, ts_last_access, progress_msg | ts_created, ts_done, ts_last_access, progress_msg | ||||
FROM vault_batch_bundle | FROM vault_batch_bundle | ||||
LEFT JOIN vault_bundle ON vault_bundle.id = bundle_id | LEFT JOIN vault_bundle ON vault_bundle.id = bundle_id | ||||
WHERE batch_id = %s""", | WHERE batch_id = %s""", | ||||
(batch_id,), | (batch_id,), | ||||
) | ) | ||||
bundles = cur.fetchall() | bundles = cur.fetchall() | ||||
if not bundles: | if not bundles: | ||||
raise NotFoundExc(f"Batch {batch_id} does not exist.") | raise NotFoundExc(f"Batch {batch_id} does not exist.") | ||||
for bundle in bundles: | for bundle in bundles: | ||||
bundle["object_id"] = hashutil.hash_to_hex(bundle["object_id"]) | bundle["swhid"] = CoreSWHID.from_string(bundle["swhid"]) | ||||
counter = collections.Counter(b["status"] for b in bundles) | counter = collections.Counter(b["status"] for b in bundles) | ||||
res = { | res = { | ||||
"bundles": bundles, | "bundles": bundles, | ||||
"total": len(bundles), | "total": len(bundles), | ||||
**{k: 0 for k in ("new", "pending", "done", "failed")}, | **{k: 0 for k in ("new", "pending", "done", "failed")}, | ||||
**dict(counter), | **dict(counter), | ||||
} | } | ||||
return res | return res | ||||
@db_transaction() | @db_transaction() | ||||
def is_available(self, bundle_type: str, obj_id: ObjectId, db=None, cur=None): | def is_available(self, bundle_type: str, swhid: CoreSWHID, db=None, cur=None): | ||||
"""Check whether a bundle is available for retrieval""" | """Check whether a bundle is available for retrieval""" | ||||
info = self.progress(bundle_type, obj_id, raise_notfound=False, cur=cur) | info = self.progress(bundle_type, swhid, raise_notfound=False, cur=cur) | ||||
obj_id = hashutil.hash_to_bytes(obj_id) | |||||
return ( | return ( | ||||
info is not None | info is not None | ||||
and info["task_status"] == "done" | and info["task_status"] == "done" | ||||
and self.cache.is_cached(bundle_type, obj_id) | and self.cache.is_cached(bundle_type, swhid) | ||||
) | ) | ||||
@db_transaction() | @db_transaction() | ||||
def fetch( | def fetch( | ||||
self, bundle_type: str, obj_id: ObjectId, raise_notfound=True, db=None, cur=None | self, bundle_type: str, swhid: CoreSWHID, raise_notfound=True, db=None, cur=None | ||||
) -> Optional[bytes]: | ) -> Optional[bytes]: | ||||
"""Retrieve a bundle from the cache""" | """Retrieve a bundle from the cache""" | ||||
hex_id, obj_id = self._compute_ids(obj_id) | available = self.is_available(bundle_type, swhid, cur=cur) | ||||
available = self.is_available(bundle_type, obj_id, cur=cur) | |||||
if not available: | if not available: | ||||
if raise_notfound: | if raise_notfound: | ||||
raise NotFoundExc(f"{bundle_type} {hex_id} is not available.") | raise NotFoundExc(f"{bundle_type} {swhid} is not available.") | ||||
return None | return None | ||||
self.update_access_ts(bundle_type, obj_id, cur=cur) | self.update_access_ts(bundle_type, swhid, cur=cur) | ||||
return self.cache.get(bundle_type, obj_id) | return self.cache.get(bundle_type, swhid) | ||||
@db_transaction() | @db_transaction() | ||||
def update_access_ts(self, bundle_type: str, obj_id: bytes, db=None, cur=None): | def update_access_ts(self, bundle_type: str, swhid: CoreSWHID, db=None, cur=None): | ||||
"""Update the last access timestamp of a bundle""" | """Update the last access timestamp of a bundle""" | ||||
cur.execute( | cur.execute( | ||||
""" | """ | ||||
UPDATE vault_bundle | UPDATE vault_bundle | ||||
SET ts_last_access = NOW() | SET ts_last_access = NOW() | ||||
WHERE type = %s AND object_id = %s""", | WHERE type = %s AND swhid = %s""", | ||||
(bundle_type, obj_id), | (bundle_type, str(swhid)), | ||||
) | ) | ||||
@db_transaction() | @db_transaction() | ||||
def set_status( | def set_status( | ||||
self, bundle_type: str, obj_id: ObjectId, status: str, db=None, cur=None | self, bundle_type: str, swhid: CoreSWHID, status: str, db=None, cur=None | ||||
) -> bool: | ) -> bool: | ||||
obj_id = hashutil.hash_to_bytes(obj_id) | |||||
req = ( | req = ( | ||||
""" | """ | ||||
UPDATE vault_bundle | UPDATE vault_bundle | ||||
SET task_status = %s """ | SET task_status = %s """ | ||||
+ (""", ts_done = NOW() """ if status == "done" else "") | + (""", ts_done = NOW() """ if status == "done" else "") | ||||
+ """WHERE type = %s AND object_id = %s""" | + """WHERE type = %s AND swhid = %s""" | ||||
) | ) | ||||
cur.execute(req, (status, bundle_type, obj_id)) | cur.execute(req, (status, bundle_type, str(swhid))) | ||||
return True | return True | ||||
@db_transaction() | @db_transaction() | ||||
def set_progress( | def set_progress( | ||||
self, bundle_type: str, obj_id: ObjectId, progress: str, db=None, cur=None | self, bundle_type: str, swhid: CoreSWHID, progress: str, db=None, cur=None | ||||
) -> bool: | ) -> bool: | ||||
obj_id = hashutil.hash_to_bytes(obj_id) | |||||
cur.execute( | cur.execute( | ||||
""" | """ | ||||
UPDATE vault_bundle | UPDATE vault_bundle | ||||
SET progress_msg = %s | SET progress_msg = %s | ||||
WHERE type = %s AND object_id = %s""", | WHERE type = %s AND swhid = %s""", | ||||
(progress, bundle_type, obj_id), | (progress, bundle_type, str(swhid)), | ||||
) | ) | ||||
return True | return True | ||||
@db_transaction() | @db_transaction() | ||||
def send_notif(self, bundle_type: str, obj_id: ObjectId, db=None, cur=None) -> bool: | def send_notif(self, bundle_type: str, swhid: CoreSWHID, db=None, cur=None) -> bool: | ||||
hex_id, obj_id = self._compute_ids(obj_id) | |||||
cur.execute( | cur.execute( | ||||
""" | """ | ||||
SELECT vault_notif_email.id AS id, email, task_status, progress_msg | SELECT vault_notif_email.id AS id, email, task_status, progress_msg | ||||
FROM vault_notif_email | FROM vault_notif_email | ||||
INNER JOIN vault_bundle ON bundle_id = vault_bundle.id | INNER JOIN vault_bundle ON bundle_id = vault_bundle.id | ||||
WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s""", | WHERE vault_bundle.type = %s AND vault_bundle.swhid = %s""", | ||||
(bundle_type, obj_id), | (bundle_type, str(swhid)), | ||||
) | ) | ||||
for d in cur: | for d in cur: | ||||
self.send_notification( | self.send_notification( | ||||
d["id"], | d["id"], | ||||
d["email"], | d["email"], | ||||
bundle_type, | bundle_type, | ||||
hex_id, | swhid, | ||||
status=d["task_status"], | status=d["task_status"], | ||||
progress_msg=d["progress_msg"], | progress_msg=d["progress_msg"], | ||||
) | ) | ||||
return True | return True | ||||
@db_transaction() | @db_transaction() | ||||
def send_notification( | def send_notification( | ||||
self, | self, | ||||
n_id: Optional[int], | n_id: Optional[int], | ||||
email: str, | email: str, | ||||
bundle_type: str, | bundle_type: str, | ||||
hex_id: str, | swhid: CoreSWHID, | ||||
status: str, | status: str, | ||||
progress_msg: Optional[str] = None, | progress_msg: Optional[str] = None, | ||||
db=None, | db=None, | ||||
cur=None, | cur=None, | ||||
) -> None: | ) -> None: | ||||
"""Send the notification of a bundle to a specific e-mail""" | """Send the notification of a bundle to a specific e-mail""" | ||||
short_id = hex_id[:7] | short_id = swhid.object_id.hex()[:7] | ||||
# TODO: instead of hardcoding this, we should probably: | # TODO: instead of hardcoding this, we should probably: | ||||
# * add a "fetch_url" field in the vault_notif_email table | # * add a "fetch_url" field in the vault_notif_email table | ||||
# * generate the url with flask.url_for() on the web-ui side | # * generate the url with flask.url_for() on the web-ui side | ||||
# * send this url as part of the cook request and store it in | # * send this url as part of the cook request and store it in | ||||
# the table | # the table | ||||
# * use this url for the notification e-mail | # * use this url for the notification e-mail | ||||
url = "https://archive.softwareheritage.org/api/1/vault/{}/{}/" "raw".format( | url = "https://archive.softwareheritage.org/api/1/vault/{}/{}/" "raw".format( | ||||
bundle_type, hex_id | bundle_type, swhid | ||||
) | ) | ||||
if status == "done": | if status == "done": | ||||
text = NOTIF_EMAIL_BODY_SUCCESS.strip() | text = NOTIF_EMAIL_BODY_SUCCESS.strip() | ||||
text = text.format(bundle_type=bundle_type, hex_id=hex_id, url=url) | text = text.format(bundle_type=bundle_type, swhid=swhid, url=url) | ||||
msg = MIMEText(text) | msg = MIMEText(text) | ||||
msg["Subject"] = NOTIF_EMAIL_SUBJECT_SUCCESS.format( | msg["Subject"] = NOTIF_EMAIL_SUBJECT_SUCCESS.format( | ||||
bundle_type=bundle_type, short_id=short_id | bundle_type=bundle_type, short_id=short_id | ||||
) | ) | ||||
elif status == "failed": | elif status == "failed": | ||||
text = NOTIF_EMAIL_BODY_FAILURE.strip() | text = NOTIF_EMAIL_BODY_FAILURE.strip() | ||||
text = text.format( | text = text.format( | ||||
bundle_type=bundle_type, hex_id=hex_id, progress_msg=progress_msg | bundle_type=bundle_type, swhid=swhid, progress_msg=progress_msg | ||||
) | ) | ||||
msg = MIMEText(text) | msg = MIMEText(text) | ||||
msg["Subject"] = NOTIF_EMAIL_SUBJECT_FAILURE.format( | msg["Subject"] = NOTIF_EMAIL_SUBJECT_FAILURE.format( | ||||
bundle_type=bundle_type, short_id=short_id | bundle_type=bundle_type, short_id=short_id | ||||
) | ) | ||||
else: | else: | ||||
raise RuntimeError( | raise RuntimeError( | ||||
"send_notification called on a '{}' bundle".format(status) | "send_notification called on a '{}' bundle".format(status) | ||||
Show All 32 Lines | def _cache_expire(self, cond, *args, db=None, cur=None) -> None: | ||||
""" | """ | ||||
DELETE FROM vault_bundle | DELETE FROM vault_bundle | ||||
WHERE ctid IN ( | WHERE ctid IN ( | ||||
SELECT ctid | SELECT ctid | ||||
FROM vault_bundle | FROM vault_bundle | ||||
WHERE sticky = false | WHERE sticky = false | ||||
{} | {} | ||||
) | ) | ||||
RETURNING type, object_id | RETURNING type, swhid | ||||
""".format( | """.format( | ||||
cond | cond | ||||
), | ), | ||||
args, | args, | ||||
) | ) | ||||
for d in cur: | for d in cur: | ||||
self.cache.delete(d["type"], bytes(d["object_id"])) | self.cache.delete(d["type"], CoreSWHID.from_string(d["swhid"])) | ||||
@db_transaction() | @db_transaction() | ||||
def cache_expire_oldest(self, n=1, by="last_access", db=None, cur=None) -> None: | def cache_expire_oldest(self, n=1, by="last_access", db=None, cur=None) -> None: | ||||
"""Expire the `n` oldest bundles""" | """Expire the `n` oldest bundles""" | ||||
assert by in ("created", "done", "last_access") | assert by in ("created", "done", "last_access") | ||||
filter = """ORDER BY ts_{} LIMIT {}""".format(by, n) | filter = """ORDER BY ts_{} LIMIT {}""".format(by, n) | ||||
return self._cache_expire(filter) | return self._cache_expire(filter) | ||||
@db_transaction() | @db_transaction() | ||||
def cache_expire_until(self, date, by="last_access", db=None, cur=None) -> None: | def cache_expire_until(self, date, by="last_access", db=None, cur=None) -> None: | ||||
"""Expire all the bundles until a certain date""" | """Expire all the bundles until a certain date""" | ||||
assert by in ("created", "done", "last_access") | assert by in ("created", "done", "last_access") | ||||
filter = """AND ts_{} <= %s""".format(by) | filter = """AND ts_{} <= %s""".format(by) | ||||
return self._cache_expire(filter, date) | return self._cache_expire(filter, date) |