diff --git a/sql/swh-vault-schema.sql b/sql/swh-vault-schema.sql index 7ee23c1..f316cac 100644 --- a/sql/swh-vault-schema.sql +++ b/sql/swh-vault-schema.sql @@ -1,27 +1,40 @@ create table dbversion ( version int primary key, release timestamptz not null, description text not null ); comment on table dbversion is 'Schema update tracking'; insert into dbversion (version, release, description) values (1, now(), 'Initial version'); create domain obj_hash as bytea; +create type cook_type as enum ('directory', 'revision_gitfast'); +comment on type cook_type is 'Type of the requested bundle'; + create type cook_status as enum ('new', 'pending', 'done'); comment on type cook_status is 'Status of the cooking'; -create table cook_requests ( +create table vault_bundle ( id bigserial primary key, - type text not null, - object_id obj_hash not null, - status cook_status not null + + type cook_type not null, -- requested cooking type + object_id obj_hash not null, -- requested object ID + + task_uuid varchar(128) not null, -- celery UUID of the cooking task + task_status cook_status not null default 'new', -- status of the task + + ts_created timestamptz not null default now(), -- timestamp of creation + ts_done timestamptz, -- timestamp of the cooking result + + progress_msg text, -- progress message + + unique(type, object_id) ); -create table cook_notifications ( +create table vault_notif_email ( id bigserial primary key, - email text not null, - request_id bigint not null references cook_requests(id) + email text not null, -- e-mail to notify + bundle_id bigint not null references vault_bundle(id) ); diff --git a/swh/vault/api/client.py b/swh/vault/api/client.py index f187dce..754b1ff 100644 --- a/swh/vault/api/client.py +++ b/swh/vault/api/client.py @@ -1,27 +1,24 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.model import hashutil from swh.core.api import SWHRemoteAPI from swh.storage.exc import StorageAPIError class RemoteVaultCache(SWHRemoteAPI): """Client to the Software Heritage vault cache.""" def __init__(self, base_url): super().__init__(api_exception=StorageAPIError, url=base_url) - def ls(self, obj_type): - return self.get('vault/{}/'.format(obj_type)) - def fetch(self, obj_type, obj_id): - return self.get('vault/{}/{}/'.format(obj_type, + return self.get('fetch/{}/{}/'.format(obj_type, hashutil.hash_to_hex(obj_id))) def cook(self, obj_type, obj_id): - return self.post('vault/{}/{}/'.format(obj_type, - hashutil.hash_to_hex(obj_id)), + return self.post('cook/{}/{}/'.format(obj_type, + hashutil.hash_to_hex(obj_id)), data={}) diff --git a/swh/vault/api/server.py b/swh/vault/api/server.py index d54687b..f0025d0 100644 --- a/swh/vault/api/server.py +++ b/swh/vault/api/server.py @@ -1,98 +1,86 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import asyncio +import aiohttp.web import click -import re -from flask import abort, g -from werkzeug.routing import BaseConverter + from swh.core import config -from swh.core.api import (SWHServerAPIApp, error_handler, - encode_data_server as encode_data) -from swh.scheduler.utils import get_task -from swh.storage.vault.api.cooking_tasks import SWHCookingTask # noqa -from swh.storage.vault.cache import VaultCache +from swh.core.api_async import (SWHRemoteAPI, + encode_data_server as encode_data) from swh.storage.vault.cookers import COOKER_TYPES - - -cooking_task_name = 'swh.storage.vault.api.cooking_tasks.SWHCookingTask' +from swh.storage.vault.backend import VaultBackend DEFAULT_CONFIG = { 'storage': ('dict', { 'cls': 'local', 'args': { 'db': 'dbname=softwareheritage-dev', 'objstorage': { 'root': '/tmp/objects', 'slicing': '0:2/2:4/4:6', }, }, }), - 'cache': ('dict', {'root': '/tmp/vaultcache'}) + 'cache': ('dict', {'root': '/tmp/vaultcache'}), + 'vault_db': ('str', 'dbname=swh-vault') } -class CookerConverter(BaseConverter): - def __init__(self, url_map, *items): - super().__init__(url_map) - types = [re.escape(c) for c in COOKER_TYPES] - self.regex = '({})'.format('|'.join(types)) - +@asyncio.coroutine +def index(request): + return aiohttp.web.Response(body="SWH Vault API server") -app = SWHServerAPIApp(__name__) -app.url_map.converters['cooker'] = CookerConverter +@asyncio.coroutine +def vault_fetch(request): + obj_type = request.match_info['type'] + obj_id = request.match_info['id'] -@app.errorhandler(Exception) -def my_error_handler(exception): - return error_handler(exception, encode_data) + if not request.app['backend'].is_available(obj_type, obj_id): + raise aiohttp.web.HTTPNotFound + return encode_data(request.app['backend'].fetch(obj_type, obj_id)) -@app.before_request -def before_request(): - g.cache = VaultCache(**app.config['cache']) +@asyncio.coroutine +def vault_cook(request): + obj_type = request.match_info['type'] + obj_id = request.match_info['id'] + email = request.args.get('email') -@app.route('/') -def index(): - return 'SWH vault API server' + if obj_type not in COOKER_TYPES: + raise aiohttp.web.HTTPNotFound + request.app['backend'].cook_request(obj_type, obj_id, email) -@app.route('/vault//', methods=['GET']) -def vault_ls(type): - return encode_data(list( - g.cache.ls(type) - )) - - -@app.route('/vault///', methods=['GET']) -def vault_fetch(type, id): - if not g.cache.is_cached(type, id): - abort(404) - return encode_data(g.cache.get(type, id)) + # Return url to get the content and 201 CREATED + return encode_data('/vault/{}/{}/'.format(obj_type, obj_id), status=201) -@app.route('/vault///', methods=['POST']) -def vault_cook(type, id): - task = get_task(cooking_task_name) - task.delay(type, id, app.config['storage'], app.config['cache']) - # Return url to get the content and 201 CREATED - return encode_data('/vault/%s/%s/' % (type, id)), 201 +def make_app(config, **kwargs): + app = SWHRemoteAPI(**kwargs) + app.router.add_route('GET', '/', index) + app.router.add_route('GET', '/fetch/{type}/{id}', vault_fetch) + app.router.add_route('POST', '/cook/{type}/{id}', vault_cook) + app['backend'] = VaultBackend(config) + return app @click.command() @click.argument('config-path', required=1) @click.option('--host', default='0.0.0.0', help="Host to run the server") @click.option('--port', default=5005, type=click.INT, help="Binding port of the server") @click.option('--debug/--nodebug', default=True, help="Indicates if the server should run in debug mode") def launch(config_path, host, port, debug): - app.config.update(config.read(config_path, DEFAULT_CONFIG)) - app.run(host, port=int(port), debug=bool(debug)) + app = make_app(config.read(config_path, DEFAULT_CONFIG), debug=bool(debug)) + aiohttp.web.run_app(app, host=host, port=int(port)) if __name__ == '__main__': launch() diff --git a/swh/vault/backend.py b/swh/vault/backend.py new file mode 100644 index 0000000..8fc6896 --- /dev/null +++ b/swh/vault/backend.py @@ -0,0 +1,150 @@ +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import celery +import psycopg2 +import psycopg2.extras + +from swh.scheduler.utils import get_task +from swh.storage.vault.cooking_tasks import SWHCookingTask # noqa +from swh.storage.vault.cache import VaultCache +from swh.storage.vault.cookers import COOKER_TYPES + +from functools import wraps + +cooking_task_name = 'swh.storage.vault.cooking_tasks.SWHCookingTask' + + +# TODO: Imported from swh.scheduler.backend. Factorization needed. +def autocommit(fn): + @wraps(fn) + def wrapped(self, *args, **kwargs): + autocommit = False + # TODO: I don't like using None, it's confusing for the user. how about + # a NEW_CURSOR object()? + if 'cursor' not in kwargs or not kwargs['cursor']: + autocommit = True + kwargs['cursor'] = self.cursor() + + try: + ret = fn(self, *args, **kwargs) + except: + if autocommit: + self.rollback() + raise + + if autocommit: + self.commit() + + return ret + + return wrapped + + +# TODO: This has to be factorized with other database base classes and helpers +# (swh.scheduler.backend.SchedulerBackend, swh.storage.db.BaseDb, ...) +# The three first methods are imported from swh.scheduler.backend. +class VaultBackend: + """ + Backend for the Software Heritage vault. + """ + def __init__(self, config): + self.config = config + self.cache = VaultCache(**self.config['cache']) + self.db = None + self.reconnect() + + def reconnect(self): + if not self.db or self.db.closed: + self.db = psycopg2.connect( + dsn=self.config['vault_db'], + cursor_factory=psycopg2.extras.RealDictCursor, + ) + + def cursor(self): + """Return a fresh cursor on the database, with auto-reconnection in case + of failure""" + cur = None + + # Get a fresh cursor and reconnect at most three times + tries = 0 + while True: + tries += 1 + try: + cur = self.db.cursor() + cur.execute('select 1') + break + except psycopg2.OperationalError: + if tries < 3: + self.reconnect() + else: + raise + return cur + + @autocommit + def task_info(self, obj_type, obj_id, cursor=None): + res = cursor.execute(''' + SELECT id, type, object_id, task_uuid, task_status, + ts_request, ts_done + FROM vault_bundle + WHERE type = %s AND object_id = %s''', obj_type, obj_id) + return res.fetchone() + + @autocommit + def create_task(self, obj_type, obj_id, cursor=None): + assert obj_type in COOKER_TYPES + + task_uuid = celery.uuid() + cursor.execute(''' + INSERT INTO vault_bundle (type, object_id, task_uuid) + VALUES (%s, %s, %s)''', obj_type, obj_id, task_uuid) + + args = [obj_type, obj_id, self.config['storage'], self.config['cache']] + task = get_task(cooking_task_name) + task.apply_async(args, task_id=task_uuid) + + @autocommit + def add_notif_email(self, obj_type, obj_id, email, cursor=None): + cursor.execute(''' + INSERT INTO vault_notif_email (email, bundle_id) + VALUES (%s, (SELECT id FROM vault_bundle + WHERE type = %s AND object_id = %s))''', + email, obj_type, obj_id) + + @autocommit + def cook_request(self, obj_type, obj_id, email=None, cursor=None): + if self.task_info(obj_type, obj_id) is None: + self.create_task(obj_type, obj_id) + if email is not None: + self.add_notif_email(obj_type, obj_id, email) + + @autocommit + def is_available(self, obj_type, obj_id): + info = self.task_info(obj_type, obj_id) + return (info is not None + and info['task_status'] == 'done' + and self.cache.is_cached(obj_type, obj_id)) + + @autocommit + def fetch(self, obj_type, obj_id): + if not self.is_available(obj_type, obj_id): + return None + return self.cache.get(obj_type, obj_id) + + @autocommit + def set_status(self, obj_type, obj_id, status, cursor=None): + cursor.execute(''' + UPDATE vault_bundle + SET status = %s + WHERE type = %s AND object_id = %s''', + status, obj_type, obj_id) + + @autocommit + def set_progress(self, obj_type, obj_id, progress, cursor=None): + cursor.execute(''' + UPDATE vault_bundle + SET progress = %s + WHERE type = %s AND object_id = %s''', + progress, obj_type, obj_id) diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py index 473abc4..4a6c830 100644 --- a/swh/vault/cookers/base.py +++ b/swh/vault/cookers/base.py @@ -1,221 +1,219 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import io import itertools import logging import os import tarfile import tempfile from pathlib import Path from swh.model import hashutil def get_tar_bytes(path, arcname=None): path = Path(path) if not arcname: arcname = path.name tar_buffer = io.BytesIO() tar = tarfile.open(fileobj=tar_buffer, mode='w') tar.add(str(path), arcname=arcname) return tar_buffer.getbuffer() SKIPPED_MESSAGE = (b'This content have not been retrieved in ' b'Software Heritage archive due to its size') HIDDEN_MESSAGE = (b'This content is hidden') class BaseVaultCooker(metaclass=abc.ABCMeta): """Abstract base class for the vault's bundle creators This class describes a common API for the cookers. To define a new cooker, inherit from this class and override: - CACHE_TYPE_KEY: key to use for the bundle to reference in cache - def cook(): cook the object into a bundle - - def notify_bundle_ready(notif_data): notify the - bundle is ready. - """ CACHE_TYPE_KEY = None - def __init__(self, storage, cache, obj_id): + def __init__(self, storage, cache, obj_type, obj_id): """Initialize the cooker. The type of the object represented by the id depends on the concrete class. Very likely, each type of bundle will have its own cooker class. Args: storage: the storage object cache: the cache where to store the bundle obj_id: id of the object to be cooked into a bundle. """ self.storage = storage self.cache = cache + self.obj_type = obj_type self.obj_id = obj_id @abc.abstractmethod def prepare_bundle(self): """Implementation of the cooker. Yields chunks of the bundle bytes. Override this with the cooker implementation. """ raise NotImplemented def cook(self): """Cook the requested object into a bundle """ content_iter = self.prepare_bundle() # Cache the bundle self.update_cache(content_iter) # Make a notification that the bundle have been cooked # NOT YET IMPLEMENTED see TODO in function. self.notify_bundle_ready( notif_data='Bundle %s ready' % hashutil.hash_to_hex(self.obj_id)) def update_cache(self, content_iter): """Update the cache with id and bundle_content. """ self.cache.add_stream(self.CACHE_TYPE_KEY, self.obj_id, content_iter) def notify_bundle_ready(self, notif_data): # TODO plug this method with the notification method once # done. pass class DirectoryBuilder: """Creates a cooked directory from its sha1_git in the db. Warning: This is NOT a directly accessible cooker, but a low-level one that executes the manipulations. """ def __init__(self, storage): self.storage = storage def get_directory_bytes(self, dir_id): # Create temporary folder to retrieve the files into. root = bytes(tempfile.mkdtemp(prefix='directory.', suffix='.cook'), 'utf8') self.build_directory(dir_id, root) # Use the created directory to make a bundle with the data as # a compressed directory. bundle_content = self._create_bundle_content( root, hashutil.hash_to_hex(dir_id)) return bundle_content def build_directory(self, dir_id, root): # Retrieve data from the database. data = self.storage.directory_ls(dir_id, recursive=True) # Split into files and directory data. # TODO(seirl): also handle revision data. data1, data2 = itertools.tee(data, 2) dir_data = (entry['name'] for entry in data1 if entry['type'] == 'dir') file_data = (entry for entry in data2 if entry['type'] == 'file') # Recreate the directory's subtree and then the files into it. self._create_tree(root, dir_data) self._create_files(root, file_data) def _create_tree(self, root, directory_paths): """Create a directory tree from the given paths The tree is created from `root` and each given path in `directory_paths` will be created. """ # Directories are sorted by depth so they are created in the # right order bsep = bytes(os.path.sep, 'utf8') dir_names = sorted( directory_paths, key=lambda x: len(x.split(bsep))) for dir_name in dir_names: os.makedirs(os.path.join(root, dir_name)) def _create_files(self, root, file_datas): """Create the files according to their status. """ # Then create the files for file_data in file_datas: path = os.path.join(root, file_data['name']) status = file_data['status'] perms = file_data['perms'] if status == 'absent': self._create_file_absent(path) elif status == 'hidden': self._create_file_hidden(path) else: content = self._get_file_content(file_data['sha1']) self._create_file(path, content, perms) def _create_file(self, path, content, perms=0o100644): """Create the given file and fill it with content. """ if perms not in (0o100644, 0o100755, 0o120000): logging.warning('File {} has invalid permission {}, ' 'defaulting to 644.'.format(path, perms)) perms = 0o100644 if perms == 0o120000: # Symbolic link os.symlink(content, path) else: with open(path, 'wb') as f: f.write(content) os.chmod(path, perms & 0o777) def _get_file_content(self, obj_id): """Get the content of the given file. """ content = list(self.storage.content_get([obj_id]))[0]['data'] return content def _create_file_absent(self, path): """Create a file that indicates a skipped content Create the given file but fill it with a specific content to indicate that the content have not been retrieved by the software heritage archive due to its size. """ self._create_file(self, SKIPPED_MESSAGE) def _create_file_hidden(self, path): """Create a file that indicates an hidden content Create the given file but fill it with a specific content to indicate that the content could not be retrieved due to privacy policy. """ self._create_file(self, HIDDEN_MESSAGE) def _create_bundle_content(self, path, hex_dir_id): """Create a bundle from the given directory Args: path: location of the directory to package. hex_dir_id: hex representation of the directory id Returns: bytes that represent the compressed directory as a bundle. """ return get_tar_bytes(path.decode(), hex_dir_id) diff --git a/swh/vault/api/cooking_tasks.py b/swh/vault/cooking_tasks.py similarity index 85% rename from swh/vault/api/cooking_tasks.py rename to swh/vault/cooking_tasks.py index 44a81e4..0b89a53 100644 --- a/swh/vault/api/cooking_tasks.py +++ b/swh/vault/cooking_tasks.py @@ -1,27 +1,27 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.task import Task from swh.model import hashutil from ..cache import VaultCache from ..cookers import COOKER_TYPES from ... import get_storage class SWHCookingTask(Task): """Main task which archives a contents batch. """ task_queue = 'swh_storage_vault_cooking' - def run(self, type, hex_id, storage_args, cache_args): + def run(self, obj_type, hex_id, storage_args, cache_args): # Initialize elements storage = get_storage(**storage_args) cache = VaultCache(**cache_args) # Initialize cooker obj_id = hashutil.hash_to_bytes(hex_id) - cooker = COOKER_TYPES[type](storage, cache, obj_id) + cooker = COOKER_TYPES[obj_type](storage, cache, obj_type, obj_id) # Perform the cooking cooker.cook()