diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -2,5 +2,5 @@ flask psycopg2 python-dateutil -python-fastimport +fastimport vcversioner diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -from setuptools import setup +from setuptools import setup, find_packages def parse_requirements(): @@ -16,20 +16,15 @@ setup( - name='swh.storage', - description='Software Heritage storage manager', + name='swh.vault', + description='Software Heritage vault', author='Software Heritage developers', author_email='swh-devel@inria.fr', - url='https://forge.softwareheritage.org/diffusion/DSTO/', + url='https://forge.softwareheritage.org/diffusion/DVAU/', packages=[ - 'swh.storage', - 'swh.storage.archiver', - 'swh.storage.api', - 'swh.storage.provenance', - 'swh.storage.tests', - ], - scripts=[ - 'bin/swh-storage-add-dir', + 'swh.vault', + 'swh.vault.api', + 'swh.vault.cooker', ], install_requires=parse_requirements(), setup_requires=['vcversioner'], diff --git a/sql/swh-vault-schema.sql b/sql/swh-vault-schema.sql new file mode 100644 --- /dev/null +++ b/sql/swh-vault-schema.sql @@ -0,0 +1,41 @@ +create table dbversion +( + version int primary key, + release timestamptz not null, + description text not null +); +comment on table dbversion is 'Schema update tracking'; +insert into dbversion (version, release, description) + values (1, now(), 'Initial version'); + +create domain obj_hash as bytea; + +create type cook_type as enum ('directory', 'revision_gitfast'); +comment on type cook_type is 'Type of the requested bundle'; + +create type cook_status as enum ('new', 'pending', 'done'); +comment on type cook_status is 'Status of the cooking'; + +create table vault_bundle ( + id bigserial primary key, + + type cook_type not null, -- requested cooking type + object_id obj_hash not null, -- requested object ID + + task_uuid uuid not null, -- celery UUID of the cooking task + task_status cook_status not null default 'new', -- status of the task + + ts_created timestamptz not null default now(), -- timestamp of creation + ts_done timestamptz, -- timestamp of the cooking result + ts_last_access timestamptz not null default now(), -- last access + + progress_msg text, -- progress message + + unique(type, object_id) +); + +create table vault_notif_email ( + id bigserial primary key, + email text not null, -- e-mail to notify + bundle_id bigint not null references vault_bundle(id) +); diff --git a/swh/vault/api/client.py b/swh/vault/api/client.py --- a/swh/vault/api/client.py +++ b/swh/vault/api/client.py @@ -14,14 +14,12 @@ def __init__(self, base_url): super().__init__(api_exception=StorageAPIError, url=base_url) - def ls(self, obj_type): - return self.get('vault/{}/'.format(obj_type)) - def fetch(self, obj_type, obj_id): - return self.get('vault/{}/{}/'.format(obj_type, - hashutil.hash_to_hex(obj_id))) + return self.get('fetch/{}/{}'.format(obj_type, + hashutil.hash_to_hex(obj_id))) - def cook(self, obj_type, obj_id): - return self.post('vault/{}/{}/'.format(obj_type, - hashutil.hash_to_hex(obj_id)), - data={}) + def cook(self, obj_type, obj_id, email=None): + return self.post('cook/{}/{}'.format(obj_type, + hashutil.hash_to_hex(obj_id)), + data={}, + params=({'email': email} if email else None)) diff --git a/swh/vault/api/cooking_tasks.py b/swh/vault/api/cooking_tasks.py deleted file mode 100644 --- a/swh/vault/api/cooking_tasks.py +++ /dev/null @@ -1,27 +0,0 @@ -# Copyright (C) 2016-2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from swh.scheduler.task import Task -from swh.model import hashutil -from ..cache import VaultCache -from ..cookers import COOKER_TYPES -from ... import get_storage - - -class SWHCookingTask(Task): - """Main task which archives a contents batch. - - """ - task_queue = 'swh_storage_vault_cooking' - - def run(self, type, hex_id, storage_args, cache_args): - # Initialize elements - storage = get_storage(**storage_args) - cache = VaultCache(**cache_args) - # Initialize cooker - obj_id = hashutil.hash_to_bytes(hex_id) - cooker = COOKER_TYPES[type](storage, cache, obj_id) - # Perform the cooking - cooker.cook() diff --git a/swh/vault/api/server.py b/swh/vault/api/server.py --- a/swh/vault/api/server.py +++ b/swh/vault/api/server.py @@ -3,20 +3,15 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import asyncio +import aiohttp.web import click -import re -from flask import abort, g -from werkzeug.routing import BaseConverter -from swh.core import config -from swh.core.api import (SWHServerAPIApp, error_handler, - encode_data_server as encode_data) -from swh.scheduler.utils import get_task -from swh.storage.vault.api.cooking_tasks import SWHCookingTask # noqa -from swh.storage.vault.cache import VaultCache -from swh.storage.vault.cookers import COOKER_TYPES - -cooking_task_name = 'swh.storage.vault.api.cooking_tasks.SWHCookingTask' +from swh.core import config +from swh.core.api_async import (SWHRemoteAPI, + encode_data_server as encode_data) +from swh.vault.cookers import COOKER_TYPES +from swh.vault.backend import VaultBackend DEFAULT_CONFIG = { @@ -30,56 +25,49 @@ }, }, }), - 'cache': ('dict', {'root': '/tmp/vaultcache'}) + 'cache': ('dict', {'root': '/tmp/vaultcache'}), + 'vault_db': ('str', 'dbname=swh-vault') } -class CookerConverter(BaseConverter): - def __init__(self, url_map, *items): - super().__init__(url_map) - types = [re.escape(c) for c in COOKER_TYPES] - self.regex = '({})'.format('|'.join(types)) - +@asyncio.coroutine +def index(request): + return aiohttp.web.Response(body="SWH Vault API server") -app = SWHServerAPIApp(__name__) -app.url_map.converters['cooker'] = CookerConverter +@asyncio.coroutine +def vault_fetch(request): + obj_type = request.match_info['type'] + obj_id = request.match_info['id'] -@app.errorhandler(Exception) -def my_error_handler(exception): - return error_handler(exception, encode_data) + if not request.app['backend'].is_available(obj_type, obj_id): + raise aiohttp.web.HTTPNotFound + return encode_data(request.app['backend'].fetch(obj_type, obj_id)) -@app.before_request -def before_request(): - g.cache = VaultCache(**app.config['cache']) +@asyncio.coroutine +def vault_cook(request): + obj_type = request.match_info['type'] + obj_id = request.match_info['id'] + email = request.query.get('email') -@app.route('/') -def index(): - return 'SWH vault API server' + if obj_type not in COOKER_TYPES: + raise aiohttp.web.HTTPNotFound + request.app['backend'].cook_request(obj_type, obj_id, email) -@app.route('/vault//', methods=['GET']) -def vault_ls(type): - return encode_data(list( - g.cache.ls(type) - )) - - -@app.route('/vault///', methods=['GET']) -def vault_fetch(type, id): - if not g.cache.is_cached(type, id): - abort(404) - return encode_data(g.cache.get(type, id)) + # Return url to get the content and 201 CREATED + return encode_data('/vault/{}/{}/'.format(obj_type, obj_id), status=201) -@app.route('/vault///', methods=['POST']) -def vault_cook(type, id): - task = get_task(cooking_task_name) - task.delay(type, id, app.config['storage'], app.config['cache']) - # Return url to get the content and 201 CREATED - return encode_data('/vault/%s/%s/' % (type, id)), 201 +def make_app(config, **kwargs): + app = SWHRemoteAPI(**kwargs) + app.router.add_route('GET', '/', index) + app.router.add_route('GET', '/fetch/{type}/{id}', vault_fetch) + app.router.add_route('POST', '/cook/{type}/{id}', vault_cook) + app['backend'] = VaultBackend(config) + return app @click.command() @@ -90,8 +78,8 @@ @click.option('--debug/--nodebug', default=True, help="Indicates if the server should run in debug mode") def launch(config_path, host, port, debug): - app.config.update(config.read(config_path, DEFAULT_CONFIG)) - app.run(host, port=int(port), debug=bool(debug)) + app = make_app(config.read(config_path, DEFAULT_CONFIG), debug=bool(debug)) + aiohttp.web.run_app(app, host=host, port=int(port)) if __name__ == '__main__': diff --git a/swh/vault/backend.py b/swh/vault/backend.py new file mode 100644 --- /dev/null +++ b/swh/vault/backend.py @@ -0,0 +1,224 @@ +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import textwrap +import smtplib +import celery +import psycopg2 +import psycopg2.extras + +from functools import wraps +from email.mime.text import MIMEText + +from swh.model import hashutil +from swh.scheduler.utils import get_task +from swh.vault.cache import VaultCache +from swh.vault.cookers import COOKER_TYPES +from swh.vault.cooking_tasks import SWHCookingTask # noqa + +cooking_task_name = 'swh.vault.cooking_tasks.SWHCookingTask' + + +# TODO: Imported from swh.scheduler.backend. Factorization needed. +def autocommit(fn): + @wraps(fn) + def wrapped(self, *args, **kwargs): + autocommit = False + # TODO: I don't like using None, it's confusing for the user. how about + # a NEW_CURSOR object()? + if 'cursor' not in kwargs or not kwargs['cursor']: + autocommit = True + kwargs['cursor'] = self.cursor() + + try: + ret = fn(self, *args, **kwargs) + except: + if autocommit: + self.rollback() + raise + + if autocommit: + self.commit() + + return ret + + return wrapped + + +# TODO: This has to be factorized with other database base classes and helpers +# (swh.scheduler.backend.SchedulerBackend, swh.storage.db.BaseDb, ...) +# The three first methods are imported from swh.scheduler.backend. +class VaultBackend: + """ + Backend for the Software Heritage vault. + """ + def __init__(self, config): + self.config = config + self.cache = VaultCache(**self.config['cache']) + self.db = None + self.reconnect() + self.smtp_server = smtplib.SMTP('localhost') + + def reconnect(self): + if not self.db or self.db.closed: + self.db = psycopg2.connect( + dsn=self.config['vault_db'], + cursor_factory=psycopg2.extras.RealDictCursor, + ) + + def cursor(self): + """Return a fresh cursor on the database, with auto-reconnection in case + of failure""" + cur = None + + # Get a fresh cursor and reconnect at most three times + tries = 0 + while True: + tries += 1 + try: + cur = self.db.cursor() + cur.execute('select 1') + break + except psycopg2.OperationalError: + if tries < 3: + self.reconnect() + else: + raise + return cur + + def commit(self): + """Commit a transaction""" + self.db.commit() + + def rollback(self): + """Rollback a transaction""" + self.db.rollback() + + @autocommit + def task_info(self, obj_type, obj_id, cursor=None): + obj_id = hashutil.hash_to_bytes(obj_id) + cursor.execute(''' + SELECT id, type, object_id, task_uuid, task_status, + ts_created, ts_done + FROM vault_bundle + WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) + return cursor.fetchone() + + @autocommit + def create_task(self, obj_type, obj_id, cursor=None): + obj_id = hashutil.hash_to_bytes(obj_id) + assert obj_type in COOKER_TYPES + + task_uuid = celery.uuid() + cursor.execute(''' + INSERT INTO vault_bundle (type, object_id, task_uuid) + VALUES (%s, %s, %s)''', (obj_type, obj_id, task_uuid)) + + args = [self.config, obj_type, obj_id] + task = get_task(cooking_task_name) + self.commit() + task.apply_async(args, task_id=task_uuid) + + @autocommit + def add_notif_email(self, obj_type, obj_id, email, cursor=None): + obj_id = hashutil.hash_to_bytes(obj_id) + cursor.execute(''' + INSERT INTO vault_notif_email (email, bundle_id) + VALUES (%s, (SELECT id FROM vault_bundle + WHERE type = %s AND object_id = %s))''', + (email, obj_type, obj_id)) + + @autocommit + def cook_request(self, obj_type, obj_id, email=None, cursor=None): + info = self.task_info(obj_type, obj_id) + if info is None: + self.create_task(obj_type, obj_id) + if email is not None: + if info is not None and info['task_status'] == 'done': + self.send_notification(None, email, obj_type, obj_id) + else: + self.add_notif_email(obj_type, obj_id, email) + + @autocommit + def is_available(self, obj_type, obj_id, cursor=None): + info = self.task_info(obj_type, obj_id, cursor=cursor) + return (info is not None + and info['task_status'] == 'done' + and self.cache.is_cached(obj_type, obj_id)) + + @autocommit + def fetch(self, obj_type, obj_id, cursor=None): + if not self.is_available(obj_type, obj_id, cursor=cursor): + return None + self.update_access_ts(obj_type, obj_id, cursor=cursor) + return self.cache.get(obj_type, obj_id) + + @autocommit + def update_access_ts(self, obj_type, obj_id, cursor=None): + obj_id = hashutil.hash_to_bytes(obj_id) + cursor.execute(''' + UPDATE vault_bundle + SET ts_last_access = NOW() + WHERE type = %s AND object_id = %s''', + (obj_type, obj_id)) + + @autocommit + def set_status(self, obj_type, obj_id, status, cursor=None): + obj_id = hashutil.hash_to_bytes(obj_id) + req = (''' + UPDATE vault_bundle + SET task_status = %s ''' + + (''', ts_done = NOW() ''' if status == 'done' else '') + + '''WHERE type = %s AND object_id = %s''') + cursor.execute(req, (status, obj_type, obj_id)) + + @autocommit + def set_progress(self, obj_type, obj_id, progress, cursor=None): + obj_id = hashutil.hash_to_bytes(obj_id) + cursor.execute(''' + UPDATE vault_bundle + SET progress_msg = %s + WHERE type = %s AND object_id = %s''', + (progress, obj_type, obj_id)) + + @autocommit + def send_all_notifications(self, obj_type, obj_id, cursor=None): + obj_id = hashutil.hash_to_bytes(obj_id) + cursor.execute(''' + SELECT vault_notif_email.id AS id, email + FROM vault_notif_email + RIGHT JOIN vault_bundle ON bundle_id = vault_bundle.id + WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s''', + (obj_type, obj_id)) + for d in cursor: + self.send_notification(d['id'], d['email'], obj_type, obj_id) + + @autocommit + def send_notification(self, n_id, email, obj_type, obj_id, cursor=None): + hex_id = hashutil.hash_to_hex(obj_id) + text = ( + "You have requested a bundle of type `{obj_type}` for the object " + "`{hex_id}` from the Software Heritage Archive.\n\n" + "The bundle you requested is now available for download at the " + "following address:\n\n" + "{url}\n\n" + "Please keep in mind that this link might expire at some point, " + "in which case you will need to request the bundle again.") + + text = text.format(obj_type=obj_type, hex_id=hex_id, url='URL_TODO') + text = textwrap.dedent(text) + text = '\n'.join(textwrap.wrap(text, 72, replace_whitespace=False)) + msg = MIMEText(text) + msg['Subject'] = ("The `{obj_type}` bundle of `{hex_id}` is ready" + .format(obj_type=obj_type, hex_id=hex_id)) + msg['From'] = '"Software Heritage Vault" ' + msg['To'] = email + + self.smtp_server.send_message(msg) + + if n_id is not None: + cursor.execute(''' + DELETE FROM vault_notif_email + WHERE id = %s''', (n_id,)) diff --git a/swh/vault/conf.yaml b/swh/vault/conf.yaml deleted file mode 100644 diff --git a/swh/vault/cookers/__init__.py b/swh/vault/cookers/__init__.py --- a/swh/vault/cookers/__init__.py +++ b/swh/vault/cookers/__init__.py @@ -3,12 +3,12 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from .directory import DirectoryCooker -from .revision_flat import RevisionFlatCooker -from .revision_git import RevisionGitCooker +from swh.vault.cookers.directory import DirectoryCooker +from swh.vault.cookers.revision_flat import RevisionFlatCooker +from swh.vault.cookers.revision_gitfast import RevisionGitfastCooker COOKER_TYPES = { 'directory': DirectoryCooker, 'revision_flat': RevisionFlatCooker, - 'revision_git': RevisionGitCooker, + 'revision_gitfast': RevisionGitfastCooker, } diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py --- a/swh/vault/cookers/base.py +++ b/swh/vault/cookers/base.py @@ -14,6 +14,7 @@ from pathlib import Path from swh.model import hashutil +from swh.storage import get_storage def get_tar_bytes(path, arcname=None): @@ -41,13 +42,10 @@ To define a new cooker, inherit from this class and override: - CACHE_TYPE_KEY: key to use for the bundle to reference in cache - def cook(): cook the object into a bundle - - def notify_bundle_ready(notif_data): notify the - bundle is ready. - """ CACHE_TYPE_KEY = None - def __init__(self, storage, cache, obj_id): + def __init__(self, config, obj_type, obj_id): """Initialize the cooker. The type of the object represented by the id depends on the @@ -59,9 +57,13 @@ cache: the cache where to store the bundle obj_id: id of the object to be cooked into a bundle. """ - self.storage = storage - self.cache = cache - self.obj_id = obj_id + # Imported here to avoid circular dependency + from swh.vault.backend import VaultBackend + + self.storage = get_storage(**config['storage']) + self.backend = VaultBackend(config) + self.obj_type = obj_type + self.obj_id = hashutil.hash_to_bytes(obj_id) @abc.abstractmethod def prepare_bundle(self): @@ -74,25 +76,25 @@ def cook(self): """Cook the requested object into a bundle """ + self.backend.set_status(self.obj_type, self.obj_id, 'pending') + self.backend.set_progress(self.obj_type, self.obj_id, 'Initializing.') content_iter = self.prepare_bundle() - # Cache the bundle self.update_cache(content_iter) - # Make a notification that the bundle have been cooked - # NOT YET IMPLEMENTED see TODO in function. - self.notify_bundle_ready( - notif_data='Bundle %s ready' % hashutil.hash_to_hex(self.obj_id)) + self.backend.set_status(self.obj_type, self.obj_id, 'done') + self.backend.set_progress(self.obj_type, self.obj_id, 'Done.') + + self.notify_bundle_ready() def update_cache(self, content_iter): """Update the cache with id and bundle_content. """ - self.cache.add_stream(self.CACHE_TYPE_KEY, self.obj_id, content_iter) + self.backend.cache.add_stream(self.CACHE_TYPE_KEY, + self.obj_id, content_iter) - def notify_bundle_ready(self, notif_data): - # TODO plug this method with the notification method once - # done. - pass + def notify_bundle_ready(self): + self.backend.send_all_notifications(self.obj_type, self.obj_id) class DirectoryBuilder: diff --git a/swh/vault/cookers/directory.py b/swh/vault/cookers/directory.py --- a/swh/vault/cookers/directory.py +++ b/swh/vault/cookers/directory.py @@ -3,7 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from .base import BaseVaultCooker, DirectoryBuilder +from swh.vault.cookers.base import BaseVaultCooker, DirectoryBuilder class DirectoryCooker(BaseVaultCooker): diff --git a/swh/vault/cookers/revision_git.py b/swh/vault/cookers/revision_gitfast.py rename from swh/vault/cookers/revision_git.py rename to swh/vault/cookers/revision_gitfast.py --- a/swh/vault/cookers/revision_git.py +++ b/swh/vault/cookers/revision_gitfast.py @@ -6,15 +6,15 @@ import collections import fastimport.commands import functools -import logging import os +import time from .base import BaseVaultCooker -class RevisionGitCooker(BaseVaultCooker): +class RevisionGitfastCooker(BaseVaultCooker): """Cooker to create a git fast-import bundle """ - CACHE_TYPE_KEY = 'revision_git' + CACHE_TYPE_KEY = 'revision_gitfast' def prepare_bundle(self): log = self.storage.revision_log([self.obj_id]) @@ -32,12 +32,22 @@ self.obj_to_mark = {} self.next_available_mark = 1 + last_progress_report = None + # We want a single transaction for the whole export, so we store a # cursor and use it during the process. with self.storage.db.transaction() as self.cursor: for i, rev in enumerate(self.rev_sorted, 1): - logging.info('Computing revision %d/%d', i, - len(self.rev_sorted)) + # Update progress if needed + ct = time.time() + if (last_progress_report is None + or last_progress_report + 2 <= ct): + last_progress_report = ct + pg = ('Computing revision {}/{}' + .format(i, len(self.rev_sorted))) + self.backend.set_progress(self.obj_type, self.obj_id, pg) + + # Compute the current commit yield from self._compute_commit_command(rev) def _toposort(self, rev_by_id): diff --git a/swh/vault/cooking_tasks.py b/swh/vault/cooking_tasks.py new file mode 100644 --- /dev/null +++ b/swh/vault/cooking_tasks.py @@ -0,0 +1,18 @@ +# Copyright (C) 2016-2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.scheduler.task import Task +from swh.vault.cookers import COOKER_TYPES + + +class SWHCookingTask(Task): + """Main task which archives a contents batch. + + """ + task_queue = 'swh_vault_cooking' + + def run_task(self, config, obj_type, obj_id): + cooker = COOKER_TYPES[obj_type](config, obj_type, obj_id) + cooker.cook()