Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/backend.py
# Copyright (C) 2017 The Software Heritage developers | # Copyright (C) 2017 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import smtplib | |||||
import psycopg2 | import psycopg2 | ||||
import psycopg2.extras | import psycopg2.extras | ||||
import smtplib | |||||
from functools import wraps | from functools import wraps | ||||
from email.mime.text import MIMEText | from email.mime.text import MIMEText | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.scheduler.backend import SchedulerBackend | from swh.scheduler.backend import SchedulerBackend | ||||
from swh.scheduler.utils import create_oneshot_task_dict | from swh.scheduler.utils import create_oneshot_task_dict | ||||
from swh.vault.cache import VaultCache | from swh.vault.cache import VaultCache | ||||
▲ Show 20 Lines • Show All 43 Lines • ▼ Show 20 Lines | |||||
""" | """ | ||||
class NotFoundExc(Exception): | class NotFoundExc(Exception): | ||||
"""Bundle was not found.""" | """Bundle was not found.""" | ||||
pass | pass | ||||
def batch_to_bytes(batch): | |||||
return [(obj_type, hashutil.hash_to_bytes(obj_id)) | |||||
for obj_type, obj_id in batch] | |||||
# TODO: Imported from swh.scheduler.backend. Factorization needed. | # TODO: Imported from swh.scheduler.backend. Factorization needed. | ||||
def autocommit(fn): | def autocommit(fn): | ||||
@wraps(fn) | @wraps(fn) | ||||
def wrapped(self, *args, **kwargs): | def wrapped(self, *args, **kwargs): | ||||
autocommit = False | autocommit = False | ||||
# TODO: I don't like using None, it's confusing for the user. how about | # TODO: I don't like using None, it's confusing for the user. how about | ||||
# a NEW_CURSOR object()? | # a NEW_CURSOR object()? | ||||
if 'cursor' not in kwargs or not kwargs['cursor']: | if 'cursor' not in kwargs or not kwargs['cursor']: | ||||
▲ Show 20 Lines • Show All 155 Lines • ▼ Show 20 Lines | def cook_request(self, obj_type, obj_id, *, sticky=False, | ||||
# Else, add it to the notification queue | # Else, add it to the notification queue | ||||
else: | else: | ||||
self.add_notif_email(obj_type, obj_id, email) | self.add_notif_email(obj_type, obj_id, email) | ||||
info = self.task_info(obj_type, obj_id) | info = self.task_info(obj_type, obj_id) | ||||
return info | return info | ||||
@autocommit | @autocommit | ||||
def batch_cook(self, batch, cursor=None): | |||||
"""Cook a batch of bundles and returns the cooking id.""" | |||||
cursor.execute('''INSERT INTO vault_batch (id) VALUES (DEFAULT) | |||||
RETURNING id''') | |||||
batch_id = cursor.fetchone()['id'] | |||||
batch = batch_to_bytes(batch) | |||||
# Ideally, if we start to do a lot of batch inserts and performance | |||||
# becomes an issue, we should be able to rewrite all the following | |||||
# function calls to work on batches. It requires a significant amount | |||||
# of work (using UPDATE FROM to update task_id, using DELETE with tuple | |||||
# unpacking, etc), so we're doing a simple loop in the meantime. | |||||
for obj_type, obj_id in batch: | |||||
info = self.cook_request(obj_type, obj_id) | |||||
cursor.execute('''INSERT INTO vault_batch_bundle | |||||
(batch_id, bundle_id) | |||||
VALUES (%s, %s)''', (batch_id, info['id'])) | |||||
return batch_id | |||||
@autocommit | |||||
def batch_info(self, batch_id, cursor=None): | |||||
"""Fetch information from a batch of bundles""" | |||||
cursor.execute(''' | |||||
SELECT vault_bundle.id as id, | |||||
type, object_id, task_id, task_status, sticky, | |||||
ts_created, ts_done, ts_last_access, progress_msg | |||||
FROM vault_batch_bundle | |||||
LEFT JOIN vault_bundle ON vault_bundle.id = bundle_id | |||||
WHERE batch_id = %s''', (batch_id,)) | |||||
res = cursor.fetchall() | |||||
if res: | |||||
for d in res: | |||||
d['object_id'] = bytes(d['object_id']) | |||||
return res | |||||
@autocommit | |||||
def is_available(self, obj_type, obj_id, cursor=None): | def is_available(self, obj_type, obj_id, cursor=None): | ||||
"""Check whether a bundle is available for retrieval""" | """Check whether a bundle is available for retrieval""" | ||||
info = self.task_info(obj_type, obj_id, cursor=cursor) | info = self.task_info(obj_type, obj_id, cursor=cursor) | ||||
return (info is not None | return (info is not None | ||||
and info['task_status'] == 'done' | and info['task_status'] == 'done' | ||||
and self.cache.is_cached(obj_type, obj_id)) | and self.cache.is_cached(obj_type, obj_id)) | ||||
@autocommit | @autocommit | ||||
▲ Show 20 Lines • Show All 139 Lines • Show Last 20 Lines |