Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/backend.py
# Copyright (C) 2017-2018 The Software Heritage developers | # Copyright (C) 2017-2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import smtplib | import smtplib | ||||
import psycopg2 | import psycopg2.extras | ||||
from psycopg2.extras import RealDictCursor | |||||
from functools import wraps | |||||
from email.mime.text import MIMEText | from email.mime.text import MIMEText | ||||
from swh.core.db import BaseDb | |||||
from swh.core.db.common import db_transaction | |||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.scheduler import get_scheduler | |||||
from swh.scheduler.utils import create_oneshot_task_dict | from swh.scheduler.utils import create_oneshot_task_dict | ||||
from swh.vault.cache import VaultCache | from swh.vault.cookers import get_cooker_cls | ||||
from swh.vault.cookers import get_cooker | |||||
cooking_task_name = 'swh.vault.cooking_tasks.SWHCookingTask' | cooking_task_name = 'swh.vault.cooking_tasks.SWHCookingTask' | ||||
NOTIF_EMAIL_FROM = ('"Software Heritage Vault" ' | NOTIF_EMAIL_FROM = ('"Software Heritage Vault" ' | ||||
'<info@softwareheritage.org>') | '<info@softwareheritage.org>') | ||||
NOTIF_EMAIL_SUBJECT_SUCCESS = ("Bundle ready: {obj_type} {short_id}") | NOTIF_EMAIL_SUBJECT_SUCCESS = ("Bundle ready: {obj_type} {short_id}") | ||||
NOTIF_EMAIL_SUBJECT_FAILURE = ("Bundle failed: {obj_type} {short_id}") | NOTIF_EMAIL_SUBJECT_FAILURE = ("Bundle failed: {obj_type} {short_id}") | ||||
Show All 37 Lines | class NotFoundExc(Exception): | ||||
"""Bundle was not found.""" | """Bundle was not found.""" | ||||
pass | pass | ||||
def batch_to_bytes(batch): | def batch_to_bytes(batch): | ||||
return [(obj_type, hashutil.hash_to_bytes(obj_id)) | return [(obj_type, hashutil.hash_to_bytes(obj_id)) | ||||
for obj_type, obj_id in batch] | for obj_type, obj_id in batch] | ||||
# TODO: Imported from swh.scheduler.backend. Factorization needed. | |||||
def autocommit(fn): | |||||
@wraps(fn) | |||||
def wrapped(self, *args, **kwargs): | |||||
autocommit = False | |||||
# TODO: I don't like using None, it's confusing for the user. how about | |||||
# a NEW_CURSOR object()? | |||||
if 'cursor' not in kwargs or not kwargs['cursor']: | |||||
autocommit = True | |||||
kwargs['cursor'] = self.cursor() | |||||
try: | |||||
ret = fn(self, *args, **kwargs) | |||||
except BaseException: | |||||
if autocommit: | |||||
self.rollback() | |||||
raise | |||||
if autocommit: | |||||
self.commit() | |||||
return ret | |||||
return wrapped | |||||
# TODO: This has to be factorized with other database base classes and helpers | |||||
# (swh.scheduler.backend.SchedulerBackend, swh.storage.db.BaseDb, ...) | |||||
# The three first methods are imported from swh.scheduler.backend. | |||||
class VaultBackend: | class VaultBackend: | ||||
ardumont: this todo can go away with this, can't it? | |||||
""" | """ | ||||
Backend for the Software Heritage vault. | Backend for the Software Heritage vault. | ||||
""" | """ | ||||
def __init__(self, config): | def __init__(self, db, cache, scheduler, storage=None, **config): | ||||
self.config = config | self.config = config | ||||
self.cache = VaultCache(self.config['cache']) | self.cache = cache | ||||
self.db = None | self.scheduler = scheduler | ||||
self.reconnect() | self.storage = storage | ||||
self.smtp_server = smtplib.SMTP() | self.smtp_server = smtplib.SMTP() | ||||
if self.config['scheduler'] is not None: | |||||
self.scheduler = get_scheduler(**self.config['scheduler']) | |||||
def reconnect(self): | self._pool = psycopg2.pool.ThreadedConnectionPool( | ||||
"""Reconnect to the database.""" | config.get('min_pool_conns', 1), | ||||
if not self.db or self.db.closed: | config.get('max_pool_conns', 10), | ||||
self.db = psycopg2.connect( | db, | ||||
dsn=self.config['db'], | cursor_factory=psycopg2.extras.RealDictCursor, | ||||
cursor_factory=RealDictCursor, | |||||
) | ) | ||||
self._db = None | |||||
def close(self): | def get_db(self): | ||||
"""Close the underlying database connection.""" | if self._db: | ||||
self.db.close() | return self._db | ||||
return BaseDb.from_pool(self._pool) | |||||
def cursor(self): | |||||
"""Return a fresh cursor on the database, with auto-reconnection in | |||||
case of failure""" | |||||
cur = None | |||||
# Get a fresh cursor and reconnect at most three times | |||||
tries = 0 | |||||
while True: | |||||
tries += 1 | |||||
try: | |||||
cur = self.db.cursor() | |||||
cur.execute('select 1') | |||||
break | |||||
except psycopg2.OperationalError: | |||||
if tries < 3: | |||||
self.reconnect() | |||||
else: | |||||
raise | |||||
return cur | |||||
def commit(self): | |||||
"""Commit a transaction""" | |||||
self.db.commit() | |||||
def rollback(self): | |||||
"""Rollback a transaction""" | |||||
self.db.rollback() | |||||
@autocommit | @db_transaction() | ||||
def task_info(self, obj_type, obj_id, cursor=None): | def task_info(self, obj_type, obj_id, db=None, cur=None): | ||||
"""Fetch information from a bundle""" | """Fetch information from a bundle""" | ||||
obj_id = hashutil.hash_to_bytes(obj_id) | obj_id = hashutil.hash_to_bytes(obj_id) | ||||
cursor.execute(''' | cur.execute(''' | ||||
SELECT id, type, object_id, task_id, task_status, sticky, | SELECT id, type, object_id, task_id, task_status, sticky, | ||||
ts_created, ts_done, ts_last_access, progress_msg | ts_created, ts_done, ts_last_access, progress_msg | ||||
FROM vault_bundle | FROM vault_bundle | ||||
WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) | WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) | ||||
res = cursor.fetchone() | res = cur.fetchone() | ||||
if res: | if res: | ||||
res['object_id'] = bytes(res['object_id']) | res['object_id'] = bytes(res['object_id']) | ||||
return res | return res | ||||
def _send_task(self, args): | def _send_task(self, *args): | ||||
"""Send a cooking task to the celery scheduler""" | """Send a cooking task to the celery scheduler""" | ||||
task = create_oneshot_task_dict('swh-vault-cooking', *args) | task = create_oneshot_task_dict('swh-vault-cooking', *args) | ||||
added_tasks = self.scheduler.create_tasks([task]) | added_tasks = self.scheduler.create_tasks([task]) | ||||
return added_tasks[0]['id'] | return added_tasks[0]['id'] | ||||
@autocommit | @db_transaction() | ||||
def create_task(self, obj_type, obj_id, sticky=False, cursor=None): | def create_task(self, obj_type, obj_id, sticky=False, db=None, cur=None): | ||||
"""Create and send a cooking task""" | """Create and send a cooking task""" | ||||
obj_id = hashutil.hash_to_bytes(obj_id) | obj_id = hashutil.hash_to_bytes(obj_id) | ||||
hex_id = hashutil.hash_to_hex(obj_id) | hex_id = hashutil.hash_to_hex(obj_id) | ||||
args = [obj_type, hex_id] | |||||
backend_storage_config = {'storage': self.config['storage']} | cooker_class = get_cooker_cls(obj_type) | ||||
cooker_class = get_cooker(obj_type) | cooker = cooker_class(obj_type, hex_id, | ||||
cooker = cooker_class(*args, override_cfg=backend_storage_config) | backend=self, storage=self.storage) | ||||
if not cooker.check_exists(): | if not cooker.check_exists(): | ||||
raise NotFoundExc("Object {} was not found.".format(hex_id)) | raise NotFoundExc("Object {} was not found.".format(hex_id)) | ||||
cursor.execute(''' | cur.execute(''' | ||||
INSERT INTO vault_bundle (type, object_id, sticky) | INSERT INTO vault_bundle (type, object_id, sticky) | ||||
VALUES (%s, %s, %s)''', (obj_type, obj_id, sticky)) | VALUES (%s, %s, %s)''', (obj_type, obj_id, sticky)) | ||||
self.commit() | db.conn.commit() | ||||
task_id = self._send_task(args) | task_id = self._send_task(obj_type, hex_id) | ||||
cursor.execute(''' | cur.execute(''' | ||||
UPDATE vault_bundle | UPDATE vault_bundle | ||||
SET task_id = %s | SET task_id = %s | ||||
WHERE type = %s AND object_id = %s''', (task_id, obj_type, obj_id)) | WHERE type = %s AND object_id = %s''', (task_id, obj_type, obj_id)) | ||||
@autocommit | @db_transaction() | ||||
def add_notif_email(self, obj_type, obj_id, email, cursor=None): | def add_notif_email(self, obj_type, obj_id, email, db=None, cur=None): | ||||
"""Add an e-mail address to notify when a given bundle is ready""" | """Add an e-mail address to notify when a given bundle is ready""" | ||||
obj_id = hashutil.hash_to_bytes(obj_id) | obj_id = hashutil.hash_to_bytes(obj_id) | ||||
cursor.execute(''' | cur.execute(''' | ||||
INSERT INTO vault_notif_email (email, bundle_id) | INSERT INTO vault_notif_email (email, bundle_id) | ||||
VALUES (%s, (SELECT id FROM vault_bundle | VALUES (%s, (SELECT id FROM vault_bundle | ||||
WHERE type = %s AND object_id = %s))''', | WHERE type = %s AND object_id = %s))''', | ||||
(email, obj_type, obj_id)) | (email, obj_type, obj_id)) | ||||
@autocommit | @db_transaction() | ||||
def cook_request(self, obj_type, obj_id, *, sticky=False, | def cook_request(self, obj_type, obj_id, *, sticky=False, | ||||
email=None, cursor=None): | email=None, db=None, cur=None): | ||||
"""Main entry point for cooking requests. This starts a cooking task if | """Main entry point for cooking requests. This starts a cooking task if | ||||
needed, and add the given e-mail to the notify list""" | needed, and add the given e-mail to the notify list""" | ||||
obj_id = hashutil.hash_to_bytes(obj_id) | obj_id = hashutil.hash_to_bytes(obj_id) | ||||
info = self.task_info(obj_type, obj_id) | info = self.task_info(obj_type, obj_id) | ||||
# If there's a failed bundle entry, delete it first. | # If there's a failed bundle entry, delete it first. | ||||
if info is not None and info['task_status'] == 'failed': | if info is not None and info['task_status'] == 'failed': | ||||
cursor.execute('''DELETE FROM vault_bundle | cur.execute('''DELETE FROM vault_bundle | ||||
WHERE type = %s AND object_id = %s''', | WHERE type = %s AND object_id = %s''', | ||||
(obj_type, obj_id)) | (obj_type, obj_id)) | ||||
self.commit() | db.conn.commit() | ||||
info = None | info = None | ||||
# If there's no bundle entry, create the task. | # If there's no bundle entry, create the task. | ||||
if info is None: | if info is None: | ||||
self.create_task(obj_type, obj_id, sticky) | self.create_task(obj_type, obj_id, sticky) | ||||
if email is not None: | if email is not None: | ||||
# If the task is already done, send the email directly | # If the task is already done, send the email directly | ||||
if info is not None and info['task_status'] == 'done': | if info is not None and info['task_status'] == 'done': | ||||
self.send_notification(None, email, obj_type, obj_id, | self.send_notification(None, email, obj_type, obj_id, | ||||
info['task_status']) | info['task_status']) | ||||
# Else, add it to the notification queue | # Else, add it to the notification queue | ||||
else: | else: | ||||
self.add_notif_email(obj_type, obj_id, email) | self.add_notif_email(obj_type, obj_id, email) | ||||
info = self.task_info(obj_type, obj_id) | info = self.task_info(obj_type, obj_id) | ||||
return info | return info | ||||
@autocommit | @db_transaction() | ||||
def batch_cook(self, batch, cursor=None): | def batch_cook(self, batch, db=None, cur=None): | ||||
"""Cook a batch of bundles and returns the cooking id.""" | """Cook a batch of bundles and returns the cooking id.""" | ||||
# Import execute_values at runtime only, because it requires | # Import execute_values at runtime only, because it requires | ||||
# psycopg2 >= 2.7 (only available on postgresql servers) | # psycopg2 >= 2.7 (only available on postgresql servers) | ||||
from psycopg2.extras import execute_values | from psycopg2.extras import execute_values | ||||
cursor.execute(''' | cur.execute(''' | ||||
INSERT INTO vault_batch (id) | INSERT INTO vault_batch (id) | ||||
VALUES (DEFAULT) | VALUES (DEFAULT) | ||||
RETURNING id''') | RETURNING id''') | ||||
batch_id = cursor.fetchone()['id'] | batch_id = cur.fetchone()['id'] | ||||
batch = batch_to_bytes(batch) | batch = batch_to_bytes(batch) | ||||
# Delete all failed bundles from the batch | # Delete all failed bundles from the batch | ||||
cursor.execute(''' | cur.execute(''' | ||||
DELETE FROM vault_bundle | DELETE FROM vault_bundle | ||||
WHERE task_status = 'failed' | WHERE task_status = 'failed' | ||||
AND (type, object_id) IN %s''', (tuple(batch),)) | AND (type, object_id) IN %s''', (tuple(batch),)) | ||||
# Insert all the bundles, return the new ones | # Insert all the bundles, return the new ones | ||||
execute_values(cursor, ''' | execute_values(cur, ''' | ||||
INSERT INTO vault_bundle (type, object_id) | INSERT INTO vault_bundle (type, object_id) | ||||
VALUES %s ON CONFLICT DO NOTHING''', batch) | VALUES %s ON CONFLICT DO NOTHING''', batch) | ||||
# Get the bundle ids and task status | # Get the bundle ids and task status | ||||
cursor.execute(''' | cur.execute(''' | ||||
SELECT id, type, object_id, task_id FROM vault_bundle | SELECT id, type, object_id, task_id FROM vault_bundle | ||||
WHERE (type, object_id) IN %s''', (tuple(batch),)) | WHERE (type, object_id) IN %s''', (tuple(batch),)) | ||||
bundles = cursor.fetchall() | bundles = cur.fetchall() | ||||
# Insert the batch-bundle entries | # Insert the batch-bundle entries | ||||
batch_id_bundle_ids = [(batch_id, row['id']) for row in bundles] | batch_id_bundle_ids = [(batch_id, row['id']) for row in bundles] | ||||
execute_values(cursor, ''' | execute_values(cur, ''' | ||||
INSERT INTO vault_batch_bundle (batch_id, bundle_id) | INSERT INTO vault_batch_bundle (batch_id, bundle_id) | ||||
VALUES %s ON CONFLICT DO NOTHING''', | VALUES %s ON CONFLICT DO NOTHING''', | ||||
batch_id_bundle_ids) | batch_id_bundle_ids) | ||||
self.commit() | db.conn.commit() | ||||
# Get the tasks to fetch | # Get the tasks to fetch | ||||
batch_new = [(row['type'], bytes(row['object_id'])) | batch_new = [(row['type'], bytes(row['object_id'])) | ||||
for row in bundles if row['task_id'] is None] | for row in bundles if row['task_id'] is None] | ||||
# Send the tasks | # Send the tasks | ||||
args_batch = [(obj_type, hashutil.hash_to_hex(obj_id)) | args_batch = [(obj_type, hashutil.hash_to_hex(obj_id)) | ||||
for obj_type, obj_id in batch_new] | for obj_type, obj_id in batch_new] | ||||
# TODO: change once the scheduler handles priority tasks | # TODO: change once the scheduler handles priority tasks | ||||
tasks = [create_oneshot_task_dict('swh-vault-batch-cooking', *args) | tasks = [create_oneshot_task_dict('swh-vault-batch-cooking', *args) | ||||
for args in args_batch] | for args in args_batch] | ||||
added_tasks = self.scheduler.create_tasks(tasks) | added_tasks = self.scheduler.create_tasks(tasks) | ||||
tasks_ids_bundle_ids = zip([task['id'] for task in added_tasks], | tasks_ids_bundle_ids = zip([task['id'] for task in added_tasks], | ||||
batch_new) | batch_new) | ||||
tasks_ids_bundle_ids = [(task_id, obj_type, obj_id) | tasks_ids_bundle_ids = [(task_id, obj_type, obj_id) | ||||
for task_id, (obj_type, obj_id) | for task_id, (obj_type, obj_id) | ||||
in tasks_ids_bundle_ids] | in tasks_ids_bundle_ids] | ||||
# Update the task ids | # Update the task ids | ||||
execute_values(cursor, ''' | execute_values(cur, ''' | ||||
UPDATE vault_bundle | UPDATE vault_bundle | ||||
SET task_id = s_task_id | SET task_id = s_task_id | ||||
FROM (VALUES %s) AS sub (s_task_id, s_type, s_object_id) | FROM (VALUES %s) AS sub (s_task_id, s_type, s_object_id) | ||||
WHERE type = s_type::cook_type AND object_id = s_object_id ''', | WHERE type = s_type::cook_type AND object_id = s_object_id ''', | ||||
tasks_ids_bundle_ids) | tasks_ids_bundle_ids) | ||||
return batch_id | return batch_id | ||||
@autocommit | @db_transaction() | ||||
def batch_info(self, batch_id, cursor=None): | def batch_info(self, batch_id, db=None, cur=None): | ||||
"""Fetch information from a batch of bundles""" | """Fetch information from a batch of bundles""" | ||||
cursor.execute(''' | cur.execute(''' | ||||
SELECT vault_bundle.id as id, | SELECT vault_bundle.id as id, | ||||
type, object_id, task_id, task_status, sticky, | type, object_id, task_id, task_status, sticky, | ||||
ts_created, ts_done, ts_last_access, progress_msg | ts_created, ts_done, ts_last_access, progress_msg | ||||
FROM vault_batch_bundle | FROM vault_batch_bundle | ||||
LEFT JOIN vault_bundle ON vault_bundle.id = bundle_id | LEFT JOIN vault_bundle ON vault_bundle.id = bundle_id | ||||
WHERE batch_id = %s''', (batch_id,)) | WHERE batch_id = %s''', (batch_id,)) | ||||
res = cursor.fetchall() | res = cur.fetchall() | ||||
if res: | if res: | ||||
for d in res: | for d in res: | ||||
d['object_id'] = bytes(d['object_id']) | d['object_id'] = bytes(d['object_id']) | ||||
return res | return res | ||||
@autocommit | @db_transaction() | ||||
def is_available(self, obj_type, obj_id, cursor=None): | def is_available(self, obj_type, obj_id, db=None, cur=None): | ||||
"""Check whether a bundle is available for retrieval""" | """Check whether a bundle is available for retrieval""" | ||||
info = self.task_info(obj_type, obj_id, cursor=cursor) | info = self.task_info(obj_type, obj_id, cur=cur) | ||||
return (info is not None | return (info is not None | ||||
and info['task_status'] == 'done' | and info['task_status'] == 'done' | ||||
and self.cache.is_cached(obj_type, obj_id)) | and self.cache.is_cached(obj_type, obj_id)) | ||||
@autocommit | @db_transaction() | ||||
def fetch(self, obj_type, obj_id, cursor=None): | def fetch(self, obj_type, obj_id, db=None, cur=None): | ||||
"""Retrieve a bundle from the cache""" | """Retrieve a bundle from the cache""" | ||||
if not self.is_available(obj_type, obj_id, cursor=cursor): | if not self.is_available(obj_type, obj_id, cur=cur): | ||||
return None | return None | ||||
self.update_access_ts(obj_type, obj_id, cursor=cursor) | self.update_access_ts(obj_type, obj_id, cur=cur) | ||||
return self.cache.get(obj_type, obj_id) | return self.cache.get(obj_type, obj_id) | ||||
@autocommit | @db_transaction() | ||||
def update_access_ts(self, obj_type, obj_id, cursor=None): | def update_access_ts(self, obj_type, obj_id, db=None, cur=None): | ||||
"""Update the last access timestamp of a bundle""" | """Update the last access timestamp of a bundle""" | ||||
obj_id = hashutil.hash_to_bytes(obj_id) | obj_id = hashutil.hash_to_bytes(obj_id) | ||||
cursor.execute(''' | cur.execute(''' | ||||
UPDATE vault_bundle | UPDATE vault_bundle | ||||
SET ts_last_access = NOW() | SET ts_last_access = NOW() | ||||
WHERE type = %s AND object_id = %s''', | WHERE type = %s AND object_id = %s''', | ||||
(obj_type, obj_id)) | (obj_type, obj_id)) | ||||
@autocommit | @db_transaction() | ||||
def set_status(self, obj_type, obj_id, status, cursor=None): | def set_status(self, obj_type, obj_id, status, db=None, cur=None): | ||||
"""Set the cooking status of a bundle""" | """Set the cooking status of a bundle""" | ||||
obj_id = hashutil.hash_to_bytes(obj_id) | obj_id = hashutil.hash_to_bytes(obj_id) | ||||
req = (''' | req = (''' | ||||
UPDATE vault_bundle | UPDATE vault_bundle | ||||
SET task_status = %s ''' | SET task_status = %s ''' | ||||
+ (''', ts_done = NOW() ''' if status == 'done' else '') | + (''', ts_done = NOW() ''' if status == 'done' else '') | ||||
+ '''WHERE type = %s AND object_id = %s''') | + '''WHERE type = %s AND object_id = %s''') | ||||
cursor.execute(req, (status, obj_type, obj_id)) | cur.execute(req, (status, obj_type, obj_id)) | ||||
@autocommit | @db_transaction() | ||||
def set_progress(self, obj_type, obj_id, progress, cursor=None): | def set_progress(self, obj_type, obj_id, progress, db=None, cur=None): | ||||
"""Set the cooking progress of a bundle""" | """Set the cooking progress of a bundle""" | ||||
obj_id = hashutil.hash_to_bytes(obj_id) | obj_id = hashutil.hash_to_bytes(obj_id) | ||||
cursor.execute(''' | cur.execute(''' | ||||
UPDATE vault_bundle | UPDATE vault_bundle | ||||
SET progress_msg = %s | SET progress_msg = %s | ||||
WHERE type = %s AND object_id = %s''', | WHERE type = %s AND object_id = %s''', | ||||
(progress, obj_type, obj_id)) | (progress, obj_type, obj_id)) | ||||
@autocommit | @db_transaction() | ||||
def send_all_notifications(self, obj_type, obj_id, cursor=None): | def send_all_notifications(self, obj_type, obj_id, db=None, cur=None): | ||||
"""Send all the e-mails in the notification list of a bundle""" | """Send all the e-mails in the notification list of a bundle""" | ||||
obj_id = hashutil.hash_to_bytes(obj_id) | obj_id = hashutil.hash_to_bytes(obj_id) | ||||
cursor.execute(''' | cur.execute(''' | ||||
SELECT vault_notif_email.id AS id, email, task_status, progress_msg | SELECT vault_notif_email.id AS id, email, task_status, progress_msg | ||||
FROM vault_notif_email | FROM vault_notif_email | ||||
INNER JOIN vault_bundle ON bundle_id = vault_bundle.id | INNER JOIN vault_bundle ON bundle_id = vault_bundle.id | ||||
WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s''', | WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s''', | ||||
(obj_type, obj_id)) | (obj_type, obj_id)) | ||||
for d in cursor: | for d in cur: | ||||
self.send_notification(d['id'], d['email'], obj_type, obj_id, | self.send_notification(d['id'], d['email'], obj_type, obj_id, | ||||
status=d['task_status'], | status=d['task_status'], | ||||
progress_msg=d['progress_msg']) | progress_msg=d['progress_msg']) | ||||
@autocommit | @db_transaction() | ||||
def send_notification(self, n_id, email, obj_type, obj_id, status, | def send_notification(self, n_id, email, obj_type, obj_id, status, | ||||
progress_msg=None, cursor=None): | progress_msg=None, db=None, cur=None): | ||||
"""Send the notification of a bundle to a specific e-mail""" | """Send the notification of a bundle to a specific e-mail""" | ||||
hex_id = hashutil.hash_to_hex(obj_id) | hex_id = hashutil.hash_to_hex(obj_id) | ||||
short_id = hex_id[:7] | short_id = hex_id[:7] | ||||
# TODO: instead of hardcoding this, we should probably: | # TODO: instead of hardcoding this, we should probably: | ||||
# * add a "fetch_url" field in the vault_notif_email table | # * add a "fetch_url" field in the vault_notif_email table | ||||
# * generate the url with flask.url_for() on the web-ui side | # * generate the url with flask.url_for() on the web-ui side | ||||
# * send this url as part of the cook request and store it in | # * send this url as part of the cook request and store it in | ||||
Show All 20 Lines | def send_notification(self, n_id, email, obj_type, obj_id, status, | ||||
.format(status)) | .format(status)) | ||||
msg['From'] = NOTIF_EMAIL_FROM | msg['From'] = NOTIF_EMAIL_FROM | ||||
msg['To'] = email | msg['To'] = email | ||||
self._smtp_send(msg) | self._smtp_send(msg) | ||||
if n_id is not None: | if n_id is not None: | ||||
cursor.execute(''' | cur.execute(''' | ||||
DELETE FROM vault_notif_email | DELETE FROM vault_notif_email | ||||
WHERE id = %s''', (n_id,)) | WHERE id = %s''', (n_id,)) | ||||
def _smtp_send(self, msg): | def _smtp_send(self, msg): | ||||
# Reconnect if needed | # Reconnect if needed | ||||
try: | try: | ||||
status = self.smtp_server.noop()[0] | status = self.smtp_server.noop()[0] | ||||
except smtplib.SMTPException: | except smtplib.SMTPException: | ||||
status = -1 | status = -1 | ||||
if status != 250: | if status != 250: | ||||
self.smtp_server.connect('localhost', 25) | self.smtp_server.connect('localhost', 25) | ||||
# Send the message | # Send the message | ||||
self.smtp_server.send_message(msg) | self.smtp_server.send_message(msg) | ||||
@autocommit | @db_transaction() | ||||
def _cache_expire(self, cond, *args, cursor=None): | def _cache_expire(self, cond, *args, db=None, cur=None): | ||||
"""Low-level expiration method, used by cache_expire_* methods""" | """Low-level expiration method, used by cache_expire_* methods""" | ||||
# Embedded SELECT query to be able to use ORDER BY and LIMIT | # Embedded SELECT query to be able to use ORDER BY and LIMIT | ||||
cursor.execute(''' | cur.execute(''' | ||||
DELETE FROM vault_bundle | DELETE FROM vault_bundle | ||||
WHERE ctid IN ( | WHERE ctid IN ( | ||||
SELECT ctid | SELECT ctid | ||||
FROM vault_bundle | FROM vault_bundle | ||||
WHERE sticky = false | WHERE sticky = false | ||||
{} | {} | ||||
) | ) | ||||
RETURNING type, object_id | RETURNING type, object_id | ||||
'''.format(cond), args) | '''.format(cond), args) | ||||
for d in cursor: | for d in cur: | ||||
self.cache.delete(d['type'], bytes(d['object_id'])) | self.cache.delete(d['type'], bytes(d['object_id'])) | ||||
@autocommit | @db_transaction() | ||||
def cache_expire_oldest(self, n=1, by='last_access', cursor=None): | def cache_expire_oldest(self, n=1, by='last_access', db=None, cur=None): | ||||
"""Expire the `n` oldest bundles""" | """Expire the `n` oldest bundles""" | ||||
assert by in ('created', 'done', 'last_access') | assert by in ('created', 'done', 'last_access') | ||||
filter = '''ORDER BY ts_{} LIMIT {}'''.format(by, n) | filter = '''ORDER BY ts_{} LIMIT {}'''.format(by, n) | ||||
return self._cache_expire(filter) | return self._cache_expire(filter) | ||||
@autocommit | @db_transaction() | ||||
def cache_expire_until(self, date, by='last_access', cursor=None): | def cache_expire_until(self, date, by='last_access', db=None, cur=None): | ||||
"""Expire all the bundles until a certain date""" | """Expire all the bundles until a certain date""" | ||||
assert by in ('created', 'done', 'last_access') | assert by in ('created', 'done', 'last_access') | ||||
filter = '''AND ts_{} <= %s'''.format(by) | filter = '''AND ts_{} <= %s'''.format(by) | ||||
return self._cache_expire(filter, date) | return self._cache_expire(filter, date) |
this todo can go away with this, can't it?