diff --git a/sql/swh-vault-schema.sql b/sql/swh-vault-schema.sql index f316cac..4a1de0a 100644 --- a/sql/swh-vault-schema.sql +++ b/sql/swh-vault-schema.sql @@ -1,40 +1,41 @@ create table dbversion ( version int primary key, release timestamptz not null, description text not null ); comment on table dbversion is 'Schema update tracking'; insert into dbversion (version, release, description) values (1, now(), 'Initial version'); create domain obj_hash as bytea; create type cook_type as enum ('directory', 'revision_gitfast'); comment on type cook_type is 'Type of the requested bundle'; create type cook_status as enum ('new', 'pending', 'done'); comment on type cook_status is 'Status of the cooking'; create table vault_bundle ( id bigserial primary key, type cook_type not null, -- requested cooking type object_id obj_hash not null, -- requested object ID task_uuid varchar(128) not null, -- celery UUID of the cooking task task_status cook_status not null default 'new', -- status of the task ts_created timestamptz not null default now(), -- timestamp of creation ts_done timestamptz, -- timestamp of the cooking result + ts_last_access timestamptz not null default now(), -- last access progress_msg text, -- progress message unique(type, object_id) ); create table vault_notif_email ( id bigserial primary key, email text not null, -- e-mail to notify bundle_id bigint not null references vault_bundle(id) ); diff --git a/swh/vault/api/server.py b/swh/vault/api/server.py index f0025d0..bec977d 100644 --- a/swh/vault/api/server.py +++ b/swh/vault/api/server.py @@ -1,86 +1,86 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import aiohttp.web import click from swh.core import config from swh.core.api_async import (SWHRemoteAPI, encode_data_server as encode_data) -from swh.storage.vault.cookers import COOKER_TYPES -from swh.storage.vault.backend import VaultBackend +from swh.vault.cookers import COOKER_TYPES +from swh.vault.backend import VaultBackend DEFAULT_CONFIG = { 'storage': ('dict', { 'cls': 'local', 'args': { 'db': 'dbname=softwareheritage-dev', 'objstorage': { 'root': '/tmp/objects', 'slicing': '0:2/2:4/4:6', }, }, }), 'cache': ('dict', {'root': '/tmp/vaultcache'}), 'vault_db': ('str', 'dbname=swh-vault') } @asyncio.coroutine def index(request): return aiohttp.web.Response(body="SWH Vault API server") @asyncio.coroutine def vault_fetch(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] if not request.app['backend'].is_available(obj_type, obj_id): raise aiohttp.web.HTTPNotFound return encode_data(request.app['backend'].fetch(obj_type, obj_id)) @asyncio.coroutine def vault_cook(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] email = request.args.get('email') if obj_type not in COOKER_TYPES: raise aiohttp.web.HTTPNotFound request.app['backend'].cook_request(obj_type, obj_id, email) # Return url to get the content and 201 CREATED return encode_data('/vault/{}/{}/'.format(obj_type, obj_id), status=201) def make_app(config, **kwargs): app = SWHRemoteAPI(**kwargs) app.router.add_route('GET', '/', index) app.router.add_route('GET', '/fetch/{type}/{id}', vault_fetch) app.router.add_route('POST', '/cook/{type}/{id}', vault_cook) app['backend'] = VaultBackend(config) return app @click.command() @click.argument('config-path', required=1) @click.option('--host', default='0.0.0.0', help="Host to run the server") @click.option('--port', default=5005, type=click.INT, help="Binding port of the server") @click.option('--debug/--nodebug', default=True, help="Indicates if the server should run in debug mode") def launch(config_path, host, port, debug): app = make_app(config.read(config_path, DEFAULT_CONFIG), debug=bool(debug)) aiohttp.web.run_app(app, host=host, port=int(port)) if __name__ == '__main__': launch() diff --git a/swh/vault/backend.py b/swh/vault/backend.py index 8fc6896..5e9afe9 100644 --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -1,150 +1,211 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import textwrap +import smtplib import celery import psycopg2 import psycopg2.extras +from functools import wraps +from email.mime.text import MIMEText + +from swh.model import hashutil from swh.scheduler.utils import get_task -from swh.storage.vault.cooking_tasks import SWHCookingTask # noqa from swh.storage.vault.cache import VaultCache from swh.storage.vault.cookers import COOKER_TYPES - -from functools import wraps +from swh.storage.vault.cooking_tasks import SWHCookingTask # noqa cooking_task_name = 'swh.storage.vault.cooking_tasks.SWHCookingTask' # TODO: Imported from swh.scheduler.backend. Factorization needed. def autocommit(fn): @wraps(fn) def wrapped(self, *args, **kwargs): autocommit = False # TODO: I don't like using None, it's confusing for the user. how about # a NEW_CURSOR object()? if 'cursor' not in kwargs or not kwargs['cursor']: autocommit = True kwargs['cursor'] = self.cursor() try: ret = fn(self, *args, **kwargs) except: if autocommit: self.rollback() raise if autocommit: self.commit() return ret return wrapped # TODO: This has to be factorized with other database base classes and helpers # (swh.scheduler.backend.SchedulerBackend, swh.storage.db.BaseDb, ...) # The three first methods are imported from swh.scheduler.backend. class VaultBackend: """ Backend for the Software Heritage vault. """ def __init__(self, config): self.config = config self.cache = VaultCache(**self.config['cache']) self.db = None self.reconnect() + self.smtp_server = smtplib.SMTP('localhost') def reconnect(self): if not self.db or self.db.closed: self.db = psycopg2.connect( dsn=self.config['vault_db'], cursor_factory=psycopg2.extras.RealDictCursor, ) def cursor(self): """Return a fresh cursor on the database, with auto-reconnection in case of failure""" cur = None # Get a fresh cursor and reconnect at most three times tries = 0 while True: tries += 1 try: cur = self.db.cursor() cur.execute('select 1') break except psycopg2.OperationalError: if tries < 3: self.reconnect() else: raise return cur @autocommit def task_info(self, obj_type, obj_id, cursor=None): res = cursor.execute(''' SELECT id, type, object_id, task_uuid, task_status, ts_request, ts_done FROM vault_bundle WHERE type = %s AND object_id = %s''', obj_type, obj_id) return res.fetchone() @autocommit def create_task(self, obj_type, obj_id, cursor=None): assert obj_type in COOKER_TYPES task_uuid = celery.uuid() cursor.execute(''' INSERT INTO vault_bundle (type, object_id, task_uuid) VALUES (%s, %s, %s)''', obj_type, obj_id, task_uuid) - args = [obj_type, obj_id, self.config['storage'], self.config['cache']] + args = [self.config, obj_type, obj_id] task = get_task(cooking_task_name) task.apply_async(args, task_id=task_uuid) @autocommit def add_notif_email(self, obj_type, obj_id, email, cursor=None): cursor.execute(''' INSERT INTO vault_notif_email (email, bundle_id) VALUES (%s, (SELECT id FROM vault_bundle WHERE type = %s AND object_id = %s))''', email, obj_type, obj_id) @autocommit def cook_request(self, obj_type, obj_id, email=None, cursor=None): - if self.task_info(obj_type, obj_id) is None: + info = self.task_info(obj_type, obj_id) + if info is None: self.create_task(obj_type, obj_id) if email is not None: - self.add_notif_email(obj_type, obj_id, email) + if info is not None and info['status'] == 'done': + self.send_notification(None, email, obj_type, obj_id) + else: + self.add_notif_email(obj_type, obj_id, email) @autocommit - def is_available(self, obj_type, obj_id): - info = self.task_info(obj_type, obj_id) + def is_available(self, obj_type, obj_id, cursor=None): + info = self.task_info(obj_type, obj_id, cursor=cursor) return (info is not None and info['task_status'] == 'done' and self.cache.is_cached(obj_type, obj_id)) @autocommit - def fetch(self, obj_type, obj_id): - if not self.is_available(obj_type, obj_id): + def fetch(self, obj_type, obj_id, cursor=None): + if not self.is_available(obj_type, obj_id, cursor=cursor): return None + self.update_access_ts(obj_type, obj_id, cursor=cursor) return self.cache.get(obj_type, obj_id) @autocommit - def set_status(self, obj_type, obj_id, status, cursor=None): + def update_access_ts(self, obj_type, obj_id, cursor=None): cursor.execute(''' UPDATE vault_bundle - SET status = %s + SET ts_last_access = NOW() WHERE type = %s AND object_id = %s''', - status, obj_type, obj_id) + obj_type, obj_id) + + @autocommit + def set_status(self, obj_type, obj_id, status, cursor=None): + req = (''' + UPDATE vault_bundle + SET status = %s''' + + ('''AND ts_done = NOW()''' if status == 'done' else '') + + '''WHERE type = %s AND object_id = %s''') + cursor.execute(req, status, obj_type, obj_id) @autocommit def set_progress(self, obj_type, obj_id, progress, cursor=None): cursor.execute(''' UPDATE vault_bundle SET progress = %s WHERE type = %s AND object_id = %s''', progress, obj_type, obj_id) + + @autocommit + def send_all_notifications(self, obj_type, obj_id, cursor=None): + res = cursor.execute(''' + SELECT id, email + FROM vault_notif_email + RIGHT JOIN vault_bundle ON bundle_id = vault_bundle.id + WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s''', + obj_type, obj_id) + for notif_id, email in res: + self.send_notification(notif_id, email, obj_type, obj_id) + + @autocommit + def send_notification(self, n_id, email, obj_type, obj_id, cursor=None): + hex_id = hashutil.hash_to_hex(obj_id) + text = """ + You have requested a bundle of type `{obj_type}` for the + object `{hex_id}` from the Software Heritage Archive. + + The bundle you requested is now available for download at the + following address: + + {url} + + Please keep in mind that this link might expire at some point, + in which case you will need to request the bundle again. + """ + text = text.format(obj_type=obj_type, hex_id=hex_id, url='URL_TODO') + text = textwrap.dedent(text) + text = textwrap.wrap(text, 72) + msg = MIMEText(text) + msg['Subject'] = ("The `{obj_type}` bundle of `{hex_id}` is ready" + .format(obj_type=obj_type, hex_id=hex_id)) + msg['From'] = 'vault@softwareheritage.org' + msg['To'] = email + + self.smtp_server.send_message(msg) + + if n_id is not None: + cursor.execute(''' + DELETE FROM vault_notif_email + WHERE id = %s''', n_id) diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py index 4a6c830..85da386 100644 --- a/swh/vault/cookers/base.py +++ b/swh/vault/cookers/base.py @@ -1,219 +1,219 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import io import itertools import logging import os import tarfile import tempfile from pathlib import Path from swh.model import hashutil +from swh.storage import get_storage +from swh.vault.backend import VaultBackend def get_tar_bytes(path, arcname=None): path = Path(path) if not arcname: arcname = path.name tar_buffer = io.BytesIO() tar = tarfile.open(fileobj=tar_buffer, mode='w') tar.add(str(path), arcname=arcname) return tar_buffer.getbuffer() SKIPPED_MESSAGE = (b'This content have not been retrieved in ' b'Software Heritage archive due to its size') HIDDEN_MESSAGE = (b'This content is hidden') class BaseVaultCooker(metaclass=abc.ABCMeta): """Abstract base class for the vault's bundle creators This class describes a common API for the cookers. To define a new cooker, inherit from this class and override: - CACHE_TYPE_KEY: key to use for the bundle to reference in cache - def cook(): cook the object into a bundle """ CACHE_TYPE_KEY = None - def __init__(self, storage, cache, obj_type, obj_id): + def __init__(self, config, obj_type, obj_id): """Initialize the cooker. The type of the object represented by the id depends on the concrete class. Very likely, each type of bundle will have its own cooker class. Args: storage: the storage object cache: the cache where to store the bundle obj_id: id of the object to be cooked into a bundle. """ - self.storage = storage - self.cache = cache + self.storage = get_storage(**config['storage']) + self.backend = VaultBackend(config) self.obj_type = obj_type - self.obj_id = obj_id + self.obj_id = hashutil.hash_to_bytes(obj_id) @abc.abstractmethod def prepare_bundle(self): """Implementation of the cooker. Yields chunks of the bundle bytes. Override this with the cooker implementation. """ raise NotImplemented def cook(self): """Cook the requested object into a bundle """ + self.backend.set_status(self.obj_type, self.obj_id, 'pending') content_iter = self.prepare_bundle() - # Cache the bundle self.update_cache(content_iter) - # Make a notification that the bundle have been cooked - # NOT YET IMPLEMENTED see TODO in function. - self.notify_bundle_ready( - notif_data='Bundle %s ready' % hashutil.hash_to_hex(self.obj_id)) + self.backend.set_status(self.obj_type, self.obj_id, 'done') + + self.notify_bundle_ready() def update_cache(self, content_iter): """Update the cache with id and bundle_content. """ - self.cache.add_stream(self.CACHE_TYPE_KEY, self.obj_id, content_iter) + self.backend.cache.add_stream(self.CACHE_TYPE_KEY, + self.obj_id, content_iter) - def notify_bundle_ready(self, notif_data): - # TODO plug this method with the notification method once - # done. - pass + def notify_bundle_ready(self): + self.backend.send_all_notifications(self.obj_type, self.obj_id) class DirectoryBuilder: """Creates a cooked directory from its sha1_git in the db. Warning: This is NOT a directly accessible cooker, but a low-level one that executes the manipulations. """ def __init__(self, storage): self.storage = storage def get_directory_bytes(self, dir_id): # Create temporary folder to retrieve the files into. root = bytes(tempfile.mkdtemp(prefix='directory.', suffix='.cook'), 'utf8') self.build_directory(dir_id, root) # Use the created directory to make a bundle with the data as # a compressed directory. bundle_content = self._create_bundle_content( root, hashutil.hash_to_hex(dir_id)) return bundle_content def build_directory(self, dir_id, root): # Retrieve data from the database. data = self.storage.directory_ls(dir_id, recursive=True) # Split into files and directory data. # TODO(seirl): also handle revision data. data1, data2 = itertools.tee(data, 2) dir_data = (entry['name'] for entry in data1 if entry['type'] == 'dir') file_data = (entry for entry in data2 if entry['type'] == 'file') # Recreate the directory's subtree and then the files into it. self._create_tree(root, dir_data) self._create_files(root, file_data) def _create_tree(self, root, directory_paths): """Create a directory tree from the given paths The tree is created from `root` and each given path in `directory_paths` will be created. """ # Directories are sorted by depth so they are created in the # right order bsep = bytes(os.path.sep, 'utf8') dir_names = sorted( directory_paths, key=lambda x: len(x.split(bsep))) for dir_name in dir_names: os.makedirs(os.path.join(root, dir_name)) def _create_files(self, root, file_datas): """Create the files according to their status. """ # Then create the files for file_data in file_datas: path = os.path.join(root, file_data['name']) status = file_data['status'] perms = file_data['perms'] if status == 'absent': self._create_file_absent(path) elif status == 'hidden': self._create_file_hidden(path) else: content = self._get_file_content(file_data['sha1']) self._create_file(path, content, perms) def _create_file(self, path, content, perms=0o100644): """Create the given file and fill it with content. """ if perms not in (0o100644, 0o100755, 0o120000): logging.warning('File {} has invalid permission {}, ' 'defaulting to 644.'.format(path, perms)) perms = 0o100644 if perms == 0o120000: # Symbolic link os.symlink(content, path) else: with open(path, 'wb') as f: f.write(content) os.chmod(path, perms & 0o777) def _get_file_content(self, obj_id): """Get the content of the given file. """ content = list(self.storage.content_get([obj_id]))[0]['data'] return content def _create_file_absent(self, path): """Create a file that indicates a skipped content Create the given file but fill it with a specific content to indicate that the content have not been retrieved by the software heritage archive due to its size. """ self._create_file(self, SKIPPED_MESSAGE) def _create_file_hidden(self, path): """Create a file that indicates an hidden content Create the given file but fill it with a specific content to indicate that the content could not be retrieved due to privacy policy. """ self._create_file(self, HIDDEN_MESSAGE) def _create_bundle_content(self, path, hex_dir_id): """Create a bundle from the given directory Args: path: location of the directory to package. hex_dir_id: hex representation of the directory id Returns: bytes that represent the compressed directory as a bundle. """ return get_tar_bytes(path.decode(), hex_dir_id) diff --git a/swh/vault/cooking_tasks.py b/swh/vault/cooking_tasks.py index 0b89a53..1e1e682 100644 --- a/swh/vault/cooking_tasks.py +++ b/swh/vault/cooking_tasks.py @@ -1,27 +1,18 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.task import Task -from swh.model import hashutil -from ..cache import VaultCache -from ..cookers import COOKER_TYPES -from ... import get_storage +from swh.vault.cookers import COOKER_TYPES class SWHCookingTask(Task): """Main task which archives a contents batch. """ task_queue = 'swh_storage_vault_cooking' - def run(self, obj_type, hex_id, storage_args, cache_args): - # Initialize elements - storage = get_storage(**storage_args) - cache = VaultCache(**cache_args) - # Initialize cooker - obj_id = hashutil.hash_to_bytes(hex_id) - cooker = COOKER_TYPES[obj_type](storage, cache, obj_type, obj_id) - # Perform the cooking + def run(self, config, obj_type, obj_id): + cooker = COOKER_TYPES[obj_type](config, obj_type, obj_id) cooker.cook()