diff --git a/sql/swh-vault-schema.sql b/sql/swh-vault-schema.sql new file mode 100644 --- /dev/null +++ b/sql/swh-vault-schema.sql @@ -0,0 +1,41 @@ +create table dbversion +( + version int primary key, + release timestamptz not null, + description text not null +); +comment on table dbversion is 'Schema update tracking'; +insert into dbversion (version, release, description) + values (1, now(), 'Initial version'); + +create domain obj_hash as bytea; + +create type cook_type as enum ('directory', 'revision_gitfast'); +comment on type cook_type is 'Type of the requested bundle'; + +create type cook_status as enum ('new', 'pending', 'done'); +comment on type cook_status is 'Status of the cooking'; + +create table vault_bundle ( + id bigserial primary key, + + type cook_type not null, -- requested cooking type + object_id obj_hash not null, -- requested object ID + + task_uuid uuid not null, -- celery UUID of the cooking task + task_status cook_status not null default 'new', -- status of the task + + ts_created timestamptz not null default now(), -- timestamp of creation + ts_done timestamptz, -- timestamp of the cooking result + ts_last_access timestamptz not null default now(), -- last access + + progress_msg text, -- progress message + + unique(type, object_id), +); + +create table vault_notif_email ( + id bigserial primary key, + email text not null, -- e-mail to notify + bundle_id bigint not null references vault_bundle(id) +); diff --git a/swh/vault/api/client.py b/swh/vault/api/client.py --- a/swh/vault/api/client.py +++ b/swh/vault/api/client.py @@ -14,14 +14,11 @@ def __init__(self, base_url): super().__init__(api_exception=StorageAPIError, url=base_url) - def ls(self, obj_type): - return self.get('vault/{}/'.format(obj_type)) - def fetch(self, obj_type, obj_id): - return self.get('vault/{}/{}/'.format(obj_type, + return self.get('fetch/{}/{}/'.format(obj_type, hashutil.hash_to_hex(obj_id))) def cook(self, obj_type, obj_id): - return self.post('vault/{}/{}/'.format(obj_type, - hashutil.hash_to_hex(obj_id)), + return self.post('cook/{}/{}/'.format(obj_type, + hashutil.hash_to_hex(obj_id)), data={}) diff --git a/swh/vault/api/cooking_tasks.py b/swh/vault/api/cooking_tasks.py deleted file mode 100644 --- a/swh/vault/api/cooking_tasks.py +++ /dev/null @@ -1,27 +0,0 @@ -# Copyright (C) 2016-2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from swh.scheduler.task import Task -from swh.model import hashutil -from ..cache import VaultCache -from ..cookers import COOKER_TYPES -from ... import get_storage - - -class SWHCookingTask(Task): - """Main task which archives a contents batch. - - """ - task_queue = 'swh_storage_vault_cooking' - - def run(self, type, hex_id, storage_args, cache_args): - # Initialize elements - storage = get_storage(**storage_args) - cache = VaultCache(**cache_args) - # Initialize cooker - obj_id = hashutil.hash_to_bytes(hex_id) - cooker = COOKER_TYPES[type](storage, cache, obj_id) - # Perform the cooking - cooker.cook() diff --git a/swh/vault/api/server.py b/swh/vault/api/server.py --- a/swh/vault/api/server.py +++ b/swh/vault/api/server.py @@ -3,20 +3,15 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import asyncio +import aiohttp.web import click -import re -from flask import abort, g -from werkzeug.routing import BaseConverter -from swh.core import config -from swh.core.api import (SWHServerAPIApp, error_handler, - encode_data_server as encode_data) -from swh.scheduler.utils import get_task -from swh.storage.vault.api.cooking_tasks import SWHCookingTask # noqa -from swh.storage.vault.cache import VaultCache -from swh.storage.vault.cookers import COOKER_TYPES - -cooking_task_name = 'swh.storage.vault.api.cooking_tasks.SWHCookingTask' +from swh.core import config +from swh.core.api_async import (SWHRemoteAPI, + encode_data_server as encode_data) +from swh.vault.cookers import COOKER_TYPES +from swh.vault.backend import VaultBackend DEFAULT_CONFIG = { @@ -30,56 +25,49 @@ }, }, }), - 'cache': ('dict', {'root': '/tmp/vaultcache'}) + 'cache': ('dict', {'root': '/tmp/vaultcache'}), + 'vault_db': ('str', 'dbname=swh-vault') } -class CookerConverter(BaseConverter): - def __init__(self, url_map, *items): - super().__init__(url_map) - types = [re.escape(c) for c in COOKER_TYPES] - self.regex = '({})'.format('|'.join(types)) - +@asyncio.coroutine +def index(request): + return aiohttp.web.Response(body="SWH Vault API server") -app = SWHServerAPIApp(__name__) -app.url_map.converters['cooker'] = CookerConverter +@asyncio.coroutine +def vault_fetch(request): + obj_type = request.match_info['type'] + obj_id = request.match_info['id'] -@app.errorhandler(Exception) -def my_error_handler(exception): - return error_handler(exception, encode_data) + if not request.app['backend'].is_available(obj_type, obj_id): + raise aiohttp.web.HTTPNotFound + return encode_data(request.app['backend'].fetch(obj_type, obj_id)) -@app.before_request -def before_request(): - g.cache = VaultCache(**app.config['cache']) +@asyncio.coroutine +def vault_cook(request): + obj_type = request.match_info['type'] + obj_id = request.match_info['id'] + email = request.args.get('email') -@app.route('/') -def index(): - return 'SWH vault API server' + if obj_type not in COOKER_TYPES: + raise aiohttp.web.HTTPNotFound + request.app['backend'].cook_request(obj_type, obj_id, email) -@app.route('/vault//', methods=['GET']) -def vault_ls(type): - return encode_data(list( - g.cache.ls(type) - )) - - -@app.route('/vault///', methods=['GET']) -def vault_fetch(type, id): - if not g.cache.is_cached(type, id): - abort(404) - return encode_data(g.cache.get(type, id)) + # Return url to get the content and 201 CREATED + return encode_data('/vault/{}/{}/'.format(obj_type, obj_id), status=201) -@app.route('/vault///', methods=['POST']) -def vault_cook(type, id): - task = get_task(cooking_task_name) - task.delay(type, id, app.config['storage'], app.config['cache']) - # Return url to get the content and 201 CREATED - return encode_data('/vault/%s/%s/' % (type, id)), 201 +def make_app(config, **kwargs): + app = SWHRemoteAPI(**kwargs) + app.router.add_route('GET', '/', index) + app.router.add_route('GET', '/fetch/{type}/{id}', vault_fetch) + app.router.add_route('POST', '/cook/{type}/{id}', vault_cook) + app['backend'] = VaultBackend(config) + return app @click.command() @@ -90,8 +78,8 @@ @click.option('--debug/--nodebug', default=True, help="Indicates if the server should run in debug mode") def launch(config_path, host, port, debug): - app.config.update(config.read(config_path, DEFAULT_CONFIG)) - app.run(host, port=int(port), debug=bool(debug)) + app = make_app(config.read(config_path, DEFAULT_CONFIG), debug=bool(debug)) + aiohttp.web.run_app(app, host=host, port=int(port)) if __name__ == '__main__': diff --git a/swh/vault/backend.py b/swh/vault/backend.py new file mode 100644 --- /dev/null +++ b/swh/vault/backend.py @@ -0,0 +1,211 @@ +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import textwrap +import smtplib +import celery +import psycopg2 +import psycopg2.extras + +from functools import wraps +from email.mime.text import MIMEText + +from swh.model import hashutil +from swh.scheduler.utils import get_task +from swh.storage.vault.cache import VaultCache +from swh.storage.vault.cookers import COOKER_TYPES +from swh.storage.vault.cooking_tasks import SWHCookingTask # noqa + +cooking_task_name = 'swh.storage.vault.cooking_tasks.SWHCookingTask' + + +# TODO: Imported from swh.scheduler.backend. Factorization needed. +def autocommit(fn): + @wraps(fn) + def wrapped(self, *args, **kwargs): + autocommit = False + # TODO: I don't like using None, it's confusing for the user. how about + # a NEW_CURSOR object()? + if 'cursor' not in kwargs or not kwargs['cursor']: + autocommit = True + kwargs['cursor'] = self.cursor() + + try: + ret = fn(self, *args, **kwargs) + except: + if autocommit: + self.rollback() + raise + + if autocommit: + self.commit() + + return ret + + return wrapped + + +# TODO: This has to be factorized with other database base classes and helpers +# (swh.scheduler.backend.SchedulerBackend, swh.storage.db.BaseDb, ...) +# The three first methods are imported from swh.scheduler.backend. +class VaultBackend: + """ + Backend for the Software Heritage vault. + """ + def __init__(self, config): + self.config = config + self.cache = VaultCache(**self.config['cache']) + self.db = None + self.reconnect() + self.smtp_server = smtplib.SMTP('localhost') + + def reconnect(self): + if not self.db or self.db.closed: + self.db = psycopg2.connect( + dsn=self.config['vault_db'], + cursor_factory=psycopg2.extras.RealDictCursor, + ) + + def cursor(self): + """Return a fresh cursor on the database, with auto-reconnection in case + of failure""" + cur = None + + # Get a fresh cursor and reconnect at most three times + tries = 0 + while True: + tries += 1 + try: + cur = self.db.cursor() + cur.execute('select 1') + break + except psycopg2.OperationalError: + if tries < 3: + self.reconnect() + else: + raise + return cur + + @autocommit + def task_info(self, obj_type, obj_id, cursor=None): + res = cursor.execute(''' + SELECT id, type, object_id, task_uuid, task_status, + ts_request, ts_done + FROM vault_bundle + WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) + return res.fetchone() + + @autocommit + def create_task(self, obj_type, obj_id, cursor=None): + assert obj_type in COOKER_TYPES + + task_uuid = celery.uuid() + cursor.execute(''' + INSERT INTO vault_bundle (type, object_id, task_uuid) + VALUES (%s, %s, %s)''', (obj_type, obj_id, task_uuid)) + + args = [self.config, obj_type, obj_id] + task = get_task(cooking_task_name) + task.apply_async(args, task_id=task_uuid) + + @autocommit + def add_notif_email(self, obj_type, obj_id, email, cursor=None): + cursor.execute(''' + INSERT INTO vault_notif_email (email, bundle_id) + VALUES (%s, (SELECT id FROM vault_bundle + WHERE type = %s AND object_id = %s))''', + (email, obj_type, obj_id)) + + @autocommit + def cook_request(self, obj_type, obj_id, email=None, cursor=None): + info = self.task_info(obj_type, obj_id) + if info is None: + self.create_task(obj_type, obj_id) + if email is not None: + if info is not None and info['status'] == 'done': + self.send_notification(None, email, obj_type, obj_id) + else: + self.add_notif_email(obj_type, obj_id, email) + + @autocommit + def is_available(self, obj_type, obj_id, cursor=None): + info = self.task_info(obj_type, obj_id, cursor=cursor) + return (info is not None + and info['task_status'] == 'done' + and self.cache.is_cached(obj_type, obj_id)) + + @autocommit + def fetch(self, obj_type, obj_id, cursor=None): + if not self.is_available(obj_type, obj_id, cursor=cursor): + return None + self.update_access_ts(obj_type, obj_id, cursor=cursor) + return self.cache.get(obj_type, obj_id) + + @autocommit + def update_access_ts(self, obj_type, obj_id, cursor=None): + cursor.execute(''' + UPDATE vault_bundle + SET ts_last_access = NOW() + WHERE type = %s AND object_id = %s''', + (obj_type, obj_id)) + + @autocommit + def set_status(self, obj_type, obj_id, status, cursor=None): + req = (''' + UPDATE vault_bundle + SET status = %s''' + + ('''AND ts_done = NOW()''' if status == 'done' else '') + + '''WHERE type = %s AND object_id = %s''') + cursor.execute(req, (status, obj_type, obj_id)) + + @autocommit + def set_progress(self, obj_type, obj_id, progress, cursor=None): + cursor.execute(''' + UPDATE vault_bundle + SET progress = %s + WHERE type = %s AND object_id = %s''', + (progress, obj_type, obj_id)) + + @autocommit + def send_all_notifications(self, obj_type, obj_id, cursor=None): + res = cursor.execute(''' + SELECT id, email + FROM vault_notif_email + RIGHT JOIN vault_bundle ON bundle_id = vault_bundle.id + WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s''', + (obj_type, obj_id)) + for notif_id, email in res: + self.send_notification(notif_id, email, obj_type, obj_id) + + @autocommit + def send_notification(self, n_id, email, obj_type, obj_id, cursor=None): + hex_id = hashutil.hash_to_hex(obj_id) + text = """ + You have requested a bundle of type `{obj_type}` for the + object `{hex_id}` from the Software Heritage Archive. + + The bundle you requested is now available for download at the + following address: + + {url} + + Please keep in mind that this link might expire at some point, + in which case you will need to request the bundle again. + """ + text = text.format(obj_type=obj_type, hex_id=hex_id, url='URL_TODO') + text = textwrap.dedent(text) + text = textwrap.wrap(text, 72) + msg = MIMEText(text) + msg['Subject'] = ("The `{obj_type}` bundle of `{hex_id}` is ready" + .format(obj_type=obj_type, hex_id=hex_id)) + msg['From'] = 'vault@softwareheritage.org' + msg['To'] = email + + self.smtp_server.send_message(msg) + + if n_id is not None: + cursor.execute(''' + DELETE FROM vault_notif_email + WHERE id = %s''', (n_id,)) diff --git a/swh/vault/conf.yaml b/swh/vault/conf.yaml deleted file mode 100644 diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py --- a/swh/vault/cookers/base.py +++ b/swh/vault/cookers/base.py @@ -14,6 +14,8 @@ from pathlib import Path from swh.model import hashutil +from swh.storage import get_storage +from swh.vault.backend import VaultBackend def get_tar_bytes(path, arcname=None): @@ -41,13 +43,10 @@ To define a new cooker, inherit from this class and override: - CACHE_TYPE_KEY: key to use for the bundle to reference in cache - def cook(): cook the object into a bundle - - def notify_bundle_ready(notif_data): notify the - bundle is ready. - """ CACHE_TYPE_KEY = None - def __init__(self, storage, cache, obj_id): + def __init__(self, config, obj_type, obj_id): """Initialize the cooker. The type of the object represented by the id depends on the @@ -59,9 +58,10 @@ cache: the cache where to store the bundle obj_id: id of the object to be cooked into a bundle. """ - self.storage = storage - self.cache = cache - self.obj_id = obj_id + self.storage = get_storage(**config['storage']) + self.backend = VaultBackend(config) + self.obj_type = obj_type + self.obj_id = hashutil.hash_to_bytes(obj_id) @abc.abstractmethod def prepare_bundle(self): @@ -74,25 +74,23 @@ def cook(self): """Cook the requested object into a bundle """ + self.backend.set_status(self.obj_type, self.obj_id, 'pending') content_iter = self.prepare_bundle() - # Cache the bundle self.update_cache(content_iter) - # Make a notification that the bundle have been cooked - # NOT YET IMPLEMENTED see TODO in function. - self.notify_bundle_ready( - notif_data='Bundle %s ready' % hashutil.hash_to_hex(self.obj_id)) + self.backend.set_status(self.obj_type, self.obj_id, 'done') + + self.notify_bundle_ready() def update_cache(self, content_iter): """Update the cache with id and bundle_content. """ - self.cache.add_stream(self.CACHE_TYPE_KEY, self.obj_id, content_iter) + self.backend.cache.add_stream(self.CACHE_TYPE_KEY, + self.obj_id, content_iter) - def notify_bundle_ready(self, notif_data): - # TODO plug this method with the notification method once - # done. - pass + def notify_bundle_ready(self): + self.backend.send_all_notifications(self.obj_type, self.obj_id) class DirectoryBuilder: diff --git a/swh/vault/cooking_tasks.py b/swh/vault/cooking_tasks.py new file mode 100644 --- /dev/null +++ b/swh/vault/cooking_tasks.py @@ -0,0 +1,18 @@ +# Copyright (C) 2016-2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.scheduler.task import Task +from swh.vault.cookers import COOKER_TYPES + + +class SWHCookingTask(Task): + """Main task which archives a contents batch. + + """ + task_queue = 'swh_storage_vault_cooking' + + def run(self, config, obj_type, obj_id): + cooker = COOKER_TYPES[obj_type](config, obj_type, obj_id) + cooker.cook()