diff --git a/swh/vault/api/server.py b/swh/vault/api/server.py index 9bddd0e..9bd4c36 100644 --- a/swh/vault/api/server.py +++ b/swh/vault/api/server.py @@ -1,99 +1,107 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import aiohttp.web import click from swh.core import config from swh.core.api_async import (SWHRemoteAPI, encode_data_server as encode_data) +from swh.model import hashutil from swh.vault.cookers import COOKER_TYPES from swh.vault.backend import VaultBackend DEFAULT_CONFIG = { 'storage': ('dict', { 'cls': 'local', 'args': { 'db': 'dbname=softwareheritage-dev', 'objstorage': { 'root': '/tmp/objects', 'slicing': '0:2/2:4/4:6', }, }, }), 'cache': ('dict', {'root': '/tmp/vaultcache'}), 'vault_db': ('str', 'dbname=swh-vault') } @asyncio.coroutine def index(request): return aiohttp.web.Response(body="SWH Vault API server") @asyncio.coroutine def vault_fetch(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] if not request.app['backend'].is_available(obj_type, obj_id): raise aiohttp.web.HTTPNotFound return encode_data(request.app['backend'].fetch(obj_type, obj_id)) +def user_info(task_info): + return {'task_uuid': str(task_info['task_uuid']), + 'status': task_info['task_status'], + 'progress_message': task_info['progress_msg'], + 'obj_type': task_info['type'], + 'obj_id': hashutil.hash_to_hex(task_info['object_id'])} + + @asyncio.coroutine def vault_cook(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] email = request.query.get('email') if obj_type not in COOKER_TYPES: raise aiohttp.web.HTTPNotFound - request.app['backend'].cook_request(obj_type, obj_id, email) + info = request.app['backend'].cook_request(obj_type, obj_id, email) - # Return url to get the content and 201 CREATED - return encode_data('/vault/{}/{}/'.format(obj_type, obj_id), status=201) + return encode_data(user_info(info), status=201) @asyncio.coroutine def vault_progress(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] info = request.app['backend'].task_info(obj_type, obj_id) if not info: raise aiohttp.web.HTTPNotFound - return encode_data(info['progress_msg']) + return encode_data(user_info(info)) def make_app(config, **kwargs): app = SWHRemoteAPI(**kwargs) app.router.add_route('GET', '/', index) app.router.add_route('GET', '/fetch/{type}/{id}', vault_fetch) app.router.add_route('POST', '/cook/{type}/{id}', vault_cook) app.router.add_route('GET', '/progress/{type}/{id}', vault_progress) app['backend'] = VaultBackend(config) return app @click.command() @click.argument('config-path', required=1) @click.option('--host', default='0.0.0.0', help="Host to run the server") @click.option('--port', default=5005, type=click.INT, help="Binding port of the server") @click.option('--debug/--nodebug', default=True, help="Indicates if the server should run in debug mode") def launch(config_path, host, port, debug): app = make_app(config.read(config_path, DEFAULT_CONFIG), debug=bool(debug)) aiohttp.web.run_app(app, host=host, port=int(port)) if __name__ == '__main__': launch() diff --git a/swh/vault/backend.py b/swh/vault/backend.py index cadbac9..c6800f7 100644 --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -1,224 +1,250 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import textwrap import smtplib import celery import psycopg2 import psycopg2.extras from functools import wraps from email.mime.text import MIMEText from swh.model import hashutil from swh.scheduler.utils import get_task from swh.vault.cache import VaultCache from swh.vault.cookers import COOKER_TYPES from swh.vault.cooking_tasks import SWHCookingTask # noqa cooking_task_name = 'swh.vault.cooking_tasks.SWHCookingTask' +NOTIF_EMAIL_FROM = ('"Software Heritage Vault" ' + '') +NOTIF_EMAIL_SUBJECT = ("Bundle ready: {obj_type} {short_id}") +NOTIF_EMAIL_BODY = """ +You have requested the following bundle from the Software Heritage +Vault: + +Object Type: {obj_type} +Object ID: {hex_id} + +This bundle is now available for download at the following address: + +{url} + +Please keep in mind that this link might expire at some point, in which +case you will need to request the bundle again. + +--\x20 +The Software Heritage Developers +""" + # TODO: Imported from swh.scheduler.backend. Factorization needed. def autocommit(fn): @wraps(fn) def wrapped(self, *args, **kwargs): autocommit = False # TODO: I don't like using None, it's confusing for the user. how about # a NEW_CURSOR object()? if 'cursor' not in kwargs or not kwargs['cursor']: autocommit = True kwargs['cursor'] = self.cursor() try: ret = fn(self, *args, **kwargs) except: if autocommit: self.rollback() raise if autocommit: self.commit() return ret return wrapped # TODO: This has to be factorized with other database base classes and helpers # (swh.scheduler.backend.SchedulerBackend, swh.storage.db.BaseDb, ...) # The three first methods are imported from swh.scheduler.backend. class VaultBackend: """ Backend for the Software Heritage vault. """ def __init__(self, config): self.config = config self.cache = VaultCache(**self.config['cache']) self.db = None self.reconnect() self.smtp_server = smtplib.SMTP('localhost') def reconnect(self): if not self.db or self.db.closed: self.db = psycopg2.connect( dsn=self.config['vault_db'], cursor_factory=psycopg2.extras.RealDictCursor, ) def cursor(self): """Return a fresh cursor on the database, with auto-reconnection in case of failure""" cur = None # Get a fresh cursor and reconnect at most three times tries = 0 while True: tries += 1 try: cur = self.db.cursor() cur.execute('select 1') break except psycopg2.OperationalError: if tries < 3: self.reconnect() else: raise return cur def commit(self): """Commit a transaction""" self.db.commit() def rollback(self): """Rollback a transaction""" self.db.rollback() @autocommit def task_info(self, obj_type, obj_id, cursor=None): obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' SELECT id, type, object_id, task_uuid, task_status, ts_created, ts_done, progress_msg FROM vault_bundle WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) - return cursor.fetchone() + res = cursor.fetchone() + if res: + res['object_id'] = bytes(res['object_id']) + return res @autocommit def create_task(self, obj_type, obj_id, cursor=None): obj_id = hashutil.hash_to_bytes(obj_id) assert obj_type in COOKER_TYPES task_uuid = celery.uuid() cursor.execute(''' INSERT INTO vault_bundle (type, object_id, task_uuid) VALUES (%s, %s, %s)''', (obj_type, obj_id, task_uuid)) args = [self.config, obj_type, obj_id] task = get_task(cooking_task_name) self.commit() task.apply_async(args, task_id=task_uuid) @autocommit def add_notif_email(self, obj_type, obj_id, email, cursor=None): obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' INSERT INTO vault_notif_email (email, bundle_id) VALUES (%s, (SELECT id FROM vault_bundle WHERE type = %s AND object_id = %s))''', (email, obj_type, obj_id)) @autocommit def cook_request(self, obj_type, obj_id, email=None, cursor=None): info = self.task_info(obj_type, obj_id) if info is None: self.create_task(obj_type, obj_id) if email is not None: if info is not None and info['task_status'] == 'done': self.send_notification(None, email, obj_type, obj_id) else: self.add_notif_email(obj_type, obj_id, email) + info = self.task_info(obj_type, obj_id) + return info @autocommit def is_available(self, obj_type, obj_id, cursor=None): info = self.task_info(obj_type, obj_id, cursor=cursor) return (info is not None and info['task_status'] == 'done' and self.cache.is_cached(obj_type, obj_id)) @autocommit def fetch(self, obj_type, obj_id, cursor=None): if not self.is_available(obj_type, obj_id, cursor=cursor): return None self.update_access_ts(obj_type, obj_id, cursor=cursor) return self.cache.get(obj_type, obj_id) @autocommit def update_access_ts(self, obj_type, obj_id, cursor=None): obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' UPDATE vault_bundle SET ts_last_access = NOW() WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) @autocommit def set_status(self, obj_type, obj_id, status, cursor=None): obj_id = hashutil.hash_to_bytes(obj_id) req = (''' UPDATE vault_bundle SET task_status = %s ''' + (''', ts_done = NOW() ''' if status == 'done' else '') + '''WHERE type = %s AND object_id = %s''') cursor.execute(req, (status, obj_type, obj_id)) @autocommit def set_progress(self, obj_type, obj_id, progress, cursor=None): obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' UPDATE vault_bundle SET progress_msg = %s WHERE type = %s AND object_id = %s''', (progress, obj_type, obj_id)) @autocommit def send_all_notifications(self, obj_type, obj_id, cursor=None): obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' SELECT vault_notif_email.id AS id, email FROM vault_notif_email - RIGHT JOIN vault_bundle ON bundle_id = vault_bundle.id + INNER JOIN vault_bundle ON bundle_id = vault_bundle.id WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s''', (obj_type, obj_id)) for d in cursor: self.send_notification(d['id'], d['email'], obj_type, obj_id) @autocommit def send_notification(self, n_id, email, obj_type, obj_id, cursor=None): hex_id = hashutil.hash_to_hex(obj_id) - text = ( - "You have requested a bundle of type `{obj_type}` for the object " - "`{hex_id}` from the Software Heritage Archive.\n\n" - "The bundle you requested is now available for download at the " - "following address:\n\n" - "{url}\n\n" - "Please keep in mind that this link might expire at some point, " - "in which case you will need to request the bundle again.") - - text = text.format(obj_type=obj_type, hex_id=hex_id, url='URL_TODO') - text = textwrap.dedent(text) - text = '\n'.join(textwrap.wrap(text, 72, replace_whitespace=False)) + short_id = hex_id[:7] + + # TODO: instead of hardcoding this, we should probably: + # * add a "fetch_url" field in the vault_notif_email table + # * generate the url with flask.url_for() on the web-ui side + # * send this url as part of the cook request and store it in + # the table + # * use this url for the notification e-mail + url = ('https://archive.softwareheritage.org/api/1/vault/{}/{}/' + 'raw'.format(obj_type, hex_id)) + + text = NOTIF_EMAIL_BODY.strip() + text = text.format(obj_type=obj_type, hex_id=hex_id, url=url) msg = MIMEText(text) - msg['Subject'] = ("The `{obj_type}` bundle of `{hex_id}` is ready" - .format(obj_type=obj_type, hex_id=hex_id)) - msg['From'] = '"Software Heritage Vault" ' + msg['Subject'] = (NOTIF_EMAIL_SUBJECT + .format(obj_type=obj_type, short_id=short_id)) + msg['From'] = NOTIF_EMAIL_FROM msg['To'] = email self.smtp_server.send_message(msg) if n_id is not None: cursor.execute(''' DELETE FROM vault_notif_email WHERE id = %s''', (n_id,))