Page MenuHomeSoftware Heritage

D208.id688.diff
No OneTemporary

D208.id688.diff

diff --git a/sql/swh-vault-schema.sql b/sql/swh-vault-schema.sql
new file mode 100644
--- /dev/null
+++ b/sql/swh-vault-schema.sql
@@ -0,0 +1,41 @@
+create table dbversion
+(
+ version int primary key,
+ release timestamptz not null,
+ description text not null
+);
+comment on table dbversion is 'Schema update tracking';
+insert into dbversion (version, release, description)
+ values (1, now(), 'Initial version');
+
+create domain obj_hash as bytea;
+
+create type cook_type as enum ('directory', 'revision_gitfast');
+comment on type cook_type is 'Type of the requested bundle';
+
+create type cook_status as enum ('new', 'pending', 'done');
+comment on type cook_status is 'Status of the cooking';
+
+create table vault_bundle (
+ id bigserial primary key,
+
+ type cook_type not null, -- requested cooking type
+ object_id obj_hash not null, -- requested object ID
+
+ task_uuid uuid not null, -- celery UUID of the cooking task
+ task_status cook_status not null default 'new', -- status of the task
+
+ ts_created timestamptz not null default now(), -- timestamp of creation
+ ts_done timestamptz, -- timestamp of the cooking result
+ ts_last_access timestamptz not null default now(), -- last access
+
+ progress_msg text, -- progress message
+
+ unique(type, object_id),
+);
+
+create table vault_notif_email (
+ id bigserial primary key,
+ email text not null, -- e-mail to notify
+ bundle_id bigint not null references vault_bundle(id)
+);
diff --git a/swh/vault/api/client.py b/swh/vault/api/client.py
--- a/swh/vault/api/client.py
+++ b/swh/vault/api/client.py
@@ -14,14 +14,11 @@
def __init__(self, base_url):
super().__init__(api_exception=StorageAPIError, url=base_url)
- def ls(self, obj_type):
- return self.get('vault/{}/'.format(obj_type))
-
def fetch(self, obj_type, obj_id):
- return self.get('vault/{}/{}/'.format(obj_type,
+ return self.get('fetch/{}/{}/'.format(obj_type,
hashutil.hash_to_hex(obj_id)))
def cook(self, obj_type, obj_id):
- return self.post('vault/{}/{}/'.format(obj_type,
- hashutil.hash_to_hex(obj_id)),
+ return self.post('cook/{}/{}/'.format(obj_type,
+ hashutil.hash_to_hex(obj_id)),
data={})
diff --git a/swh/vault/api/cooking_tasks.py b/swh/vault/api/cooking_tasks.py
deleted file mode 100644
--- a/swh/vault/api/cooking_tasks.py
+++ /dev/null
@@ -1,27 +0,0 @@
-# Copyright (C) 2016-2017 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-from swh.scheduler.task import Task
-from swh.model import hashutil
-from ..cache import VaultCache
-from ..cookers import COOKER_TYPES
-from ... import get_storage
-
-
-class SWHCookingTask(Task):
- """Main task which archives a contents batch.
-
- """
- task_queue = 'swh_storage_vault_cooking'
-
- def run(self, type, hex_id, storage_args, cache_args):
- # Initialize elements
- storage = get_storage(**storage_args)
- cache = VaultCache(**cache_args)
- # Initialize cooker
- obj_id = hashutil.hash_to_bytes(hex_id)
- cooker = COOKER_TYPES[type](storage, cache, obj_id)
- # Perform the cooking
- cooker.cook()
diff --git a/swh/vault/api/server.py b/swh/vault/api/server.py
--- a/swh/vault/api/server.py
+++ b/swh/vault/api/server.py
@@ -3,20 +3,15 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import asyncio
+import aiohttp.web
import click
-import re
-from flask import abort, g
-from werkzeug.routing import BaseConverter
-from swh.core import config
-from swh.core.api import (SWHServerAPIApp, error_handler,
- encode_data_server as encode_data)
-from swh.scheduler.utils import get_task
-from swh.storage.vault.api.cooking_tasks import SWHCookingTask # noqa
-from swh.storage.vault.cache import VaultCache
-from swh.storage.vault.cookers import COOKER_TYPES
-
-cooking_task_name = 'swh.storage.vault.api.cooking_tasks.SWHCookingTask'
+from swh.core import config
+from swh.core.api_async import (SWHRemoteAPI,
+ encode_data_server as encode_data)
+from swh.vault.cookers import COOKER_TYPES
+from swh.vault.backend import VaultBackend
DEFAULT_CONFIG = {
@@ -30,56 +25,49 @@
},
},
}),
- 'cache': ('dict', {'root': '/tmp/vaultcache'})
+ 'cache': ('dict', {'root': '/tmp/vaultcache'}),
+ 'vault_db': ('str', 'dbname=swh-vault')
}
-class CookerConverter(BaseConverter):
- def __init__(self, url_map, *items):
- super().__init__(url_map)
- types = [re.escape(c) for c in COOKER_TYPES]
- self.regex = '({})'.format('|'.join(types))
-
+@asyncio.coroutine
+def index(request):
+ return aiohttp.web.Response(body="SWH Vault API server")
-app = SWHServerAPIApp(__name__)
-app.url_map.converters['cooker'] = CookerConverter
+@asyncio.coroutine
+def vault_fetch(request):
+ obj_type = request.match_info['type']
+ obj_id = request.match_info['id']
-@app.errorhandler(Exception)
-def my_error_handler(exception):
- return error_handler(exception, encode_data)
+ if not request.app['backend'].is_available(obj_type, obj_id):
+ raise aiohttp.web.HTTPNotFound
+ return encode_data(request.app['backend'].fetch(obj_type, obj_id))
-@app.before_request
-def before_request():
- g.cache = VaultCache(**app.config['cache'])
+@asyncio.coroutine
+def vault_cook(request):
+ obj_type = request.match_info['type']
+ obj_id = request.match_info['id']
+ email = request.args.get('email')
-@app.route('/')
-def index():
- return 'SWH vault API server'
+ if obj_type not in COOKER_TYPES:
+ raise aiohttp.web.HTTPNotFound
+ request.app['backend'].cook_request(obj_type, obj_id, email)
-@app.route('/vault/<cooker:type>/', methods=['GET'])
-def vault_ls(type):
- return encode_data(list(
- g.cache.ls(type)
- ))
-
-
-@app.route('/vault/<cooker:type>/<id>/', methods=['GET'])
-def vault_fetch(type, id):
- if not g.cache.is_cached(type, id):
- abort(404)
- return encode_data(g.cache.get(type, id))
+ # Return url to get the content and 201 CREATED
+ return encode_data('/vault/{}/{}/'.format(obj_type, obj_id), status=201)
-@app.route('/vault/<cooker:type>/<id>/', methods=['POST'])
-def vault_cook(type, id):
- task = get_task(cooking_task_name)
- task.delay(type, id, app.config['storage'], app.config['cache'])
- # Return url to get the content and 201 CREATED
- return encode_data('/vault/%s/%s/' % (type, id)), 201
+def make_app(config, **kwargs):
+ app = SWHRemoteAPI(**kwargs)
+ app.router.add_route('GET', '/', index)
+ app.router.add_route('GET', '/fetch/{type}/{id}', vault_fetch)
+ app.router.add_route('POST', '/cook/{type}/{id}', vault_cook)
+ app['backend'] = VaultBackend(config)
+ return app
@click.command()
@@ -90,8 +78,8 @@
@click.option('--debug/--nodebug', default=True,
help="Indicates if the server should run in debug mode")
def launch(config_path, host, port, debug):
- app.config.update(config.read(config_path, DEFAULT_CONFIG))
- app.run(host, port=int(port), debug=bool(debug))
+ app = make_app(config.read(config_path, DEFAULT_CONFIG), debug=bool(debug))
+ aiohttp.web.run_app(app, host=host, port=int(port))
if __name__ == '__main__':
diff --git a/swh/vault/backend.py b/swh/vault/backend.py
new file mode 100644
--- /dev/null
+++ b/swh/vault/backend.py
@@ -0,0 +1,211 @@
+# Copyright (C) 2017 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import textwrap
+import smtplib
+import celery
+import psycopg2
+import psycopg2.extras
+
+from functools import wraps
+from email.mime.text import MIMEText
+
+from swh.model import hashutil
+from swh.scheduler.utils import get_task
+from swh.storage.vault.cache import VaultCache
+from swh.storage.vault.cookers import COOKER_TYPES
+from swh.storage.vault.cooking_tasks import SWHCookingTask # noqa
+
+cooking_task_name = 'swh.storage.vault.cooking_tasks.SWHCookingTask'
+
+
+# TODO: Imported from swh.scheduler.backend. Factorization needed.
+def autocommit(fn):
+ @wraps(fn)
+ def wrapped(self, *args, **kwargs):
+ autocommit = False
+ # TODO: I don't like using None, it's confusing for the user. how about
+ # a NEW_CURSOR object()?
+ if 'cursor' not in kwargs or not kwargs['cursor']:
+ autocommit = True
+ kwargs['cursor'] = self.cursor()
+
+ try:
+ ret = fn(self, *args, **kwargs)
+ except:
+ if autocommit:
+ self.rollback()
+ raise
+
+ if autocommit:
+ self.commit()
+
+ return ret
+
+ return wrapped
+
+
+# TODO: This has to be factorized with other database base classes and helpers
+# (swh.scheduler.backend.SchedulerBackend, swh.storage.db.BaseDb, ...)
+# The three first methods are imported from swh.scheduler.backend.
+class VaultBackend:
+ """
+ Backend for the Software Heritage vault.
+ """
+ def __init__(self, config):
+ self.config = config
+ self.cache = VaultCache(**self.config['cache'])
+ self.db = None
+ self.reconnect()
+ self.smtp_server = smtplib.SMTP('localhost')
+
+ def reconnect(self):
+ if not self.db or self.db.closed:
+ self.db = psycopg2.connect(
+ dsn=self.config['vault_db'],
+ cursor_factory=psycopg2.extras.RealDictCursor,
+ )
+
+ def cursor(self):
+ """Return a fresh cursor on the database, with auto-reconnection in case
+ of failure"""
+ cur = None
+
+ # Get a fresh cursor and reconnect at most three times
+ tries = 0
+ while True:
+ tries += 1
+ try:
+ cur = self.db.cursor()
+ cur.execute('select 1')
+ break
+ except psycopg2.OperationalError:
+ if tries < 3:
+ self.reconnect()
+ else:
+ raise
+ return cur
+
+ @autocommit
+ def task_info(self, obj_type, obj_id, cursor=None):
+ res = cursor.execute('''
+ SELECT id, type, object_id, task_uuid, task_status,
+ ts_request, ts_done
+ FROM vault_bundle
+ WHERE type = %s AND object_id = %s''', (obj_type, obj_id))
+ return res.fetchone()
+
+ @autocommit
+ def create_task(self, obj_type, obj_id, cursor=None):
+ assert obj_type in COOKER_TYPES
+
+ task_uuid = celery.uuid()
+ cursor.execute('''
+ INSERT INTO vault_bundle (type, object_id, task_uuid)
+ VALUES (%s, %s, %s)''', (obj_type, obj_id, task_uuid))
+
+ args = [self.config, obj_type, obj_id]
+ task = get_task(cooking_task_name)
+ task.apply_async(args, task_id=task_uuid)
+
+ @autocommit
+ def add_notif_email(self, obj_type, obj_id, email, cursor=None):
+ cursor.execute('''
+ INSERT INTO vault_notif_email (email, bundle_id)
+ VALUES (%s, (SELECT id FROM vault_bundle
+ WHERE type = %s AND object_id = %s))''',
+ (email, obj_type, obj_id))
+
+ @autocommit
+ def cook_request(self, obj_type, obj_id, email=None, cursor=None):
+ info = self.task_info(obj_type, obj_id)
+ if info is None:
+ self.create_task(obj_type, obj_id)
+ if email is not None:
+ if info is not None and info['status'] == 'done':
+ self.send_notification(None, email, obj_type, obj_id)
+ else:
+ self.add_notif_email(obj_type, obj_id, email)
+
+ @autocommit
+ def is_available(self, obj_type, obj_id, cursor=None):
+ info = self.task_info(obj_type, obj_id, cursor=cursor)
+ return (info is not None
+ and info['task_status'] == 'done'
+ and self.cache.is_cached(obj_type, obj_id))
+
+ @autocommit
+ def fetch(self, obj_type, obj_id, cursor=None):
+ if not self.is_available(obj_type, obj_id, cursor=cursor):
+ return None
+ self.update_access_ts(obj_type, obj_id, cursor=cursor)
+ return self.cache.get(obj_type, obj_id)
+
+ @autocommit
+ def update_access_ts(self, obj_type, obj_id, cursor=None):
+ cursor.execute('''
+ UPDATE vault_bundle
+ SET ts_last_access = NOW()
+ WHERE type = %s AND object_id = %s''',
+ (obj_type, obj_id))
+
+ @autocommit
+ def set_status(self, obj_type, obj_id, status, cursor=None):
+ req = ('''
+ UPDATE vault_bundle
+ SET status = %s'''
+ + ('''AND ts_done = NOW()''' if status == 'done' else '')
+ + '''WHERE type = %s AND object_id = %s''')
+ cursor.execute(req, (status, obj_type, obj_id))
+
+ @autocommit
+ def set_progress(self, obj_type, obj_id, progress, cursor=None):
+ cursor.execute('''
+ UPDATE vault_bundle
+ SET progress = %s
+ WHERE type = %s AND object_id = %s''',
+ (progress, obj_type, obj_id))
+
+ @autocommit
+ def send_all_notifications(self, obj_type, obj_id, cursor=None):
+ res = cursor.execute('''
+ SELECT id, email
+ FROM vault_notif_email
+ RIGHT JOIN vault_bundle ON bundle_id = vault_bundle.id
+ WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s''',
+ (obj_type, obj_id))
+ for notif_id, email in res:
+ self.send_notification(notif_id, email, obj_type, obj_id)
+
+ @autocommit
+ def send_notification(self, n_id, email, obj_type, obj_id, cursor=None):
+ hex_id = hashutil.hash_to_hex(obj_id)
+ text = """
+ You have requested a bundle of type `{obj_type}` for the
+ object `{hex_id}` from the Software Heritage Archive.
+
+ The bundle you requested is now available for download at the
+ following address:
+
+ {url}
+
+ Please keep in mind that this link might expire at some point,
+ in which case you will need to request the bundle again.
+ """
+ text = text.format(obj_type=obj_type, hex_id=hex_id, url='URL_TODO')
+ text = textwrap.dedent(text)
+ text = textwrap.wrap(text, 72)
+ msg = MIMEText(text)
+ msg['Subject'] = ("The `{obj_type}` bundle of `{hex_id}` is ready"
+ .format(obj_type=obj_type, hex_id=hex_id))
+ msg['From'] = 'vault@softwareheritage.org'
+ msg['To'] = email
+
+ self.smtp_server.send_message(msg)
+
+ if n_id is not None:
+ cursor.execute('''
+ DELETE FROM vault_notif_email
+ WHERE id = %s''', (n_id,))
diff --git a/swh/vault/conf.yaml b/swh/vault/conf.yaml
deleted file mode 100644
diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py
--- a/swh/vault/cookers/base.py
+++ b/swh/vault/cookers/base.py
@@ -14,6 +14,8 @@
from pathlib import Path
from swh.model import hashutil
+from swh.storage import get_storage
+from swh.vault.backend import VaultBackend
def get_tar_bytes(path, arcname=None):
@@ -41,13 +43,10 @@
To define a new cooker, inherit from this class and override:
- CACHE_TYPE_KEY: key to use for the bundle to reference in cache
- def cook(): cook the object into a bundle
- - def notify_bundle_ready(notif_data): notify the
- bundle is ready.
-
"""
CACHE_TYPE_KEY = None
- def __init__(self, storage, cache, obj_id):
+ def __init__(self, config, obj_type, obj_id):
"""Initialize the cooker.
The type of the object represented by the id depends on the
@@ -59,9 +58,10 @@
cache: the cache where to store the bundle
obj_id: id of the object to be cooked into a bundle.
"""
- self.storage = storage
- self.cache = cache
- self.obj_id = obj_id
+ self.storage = get_storage(**config['storage'])
+ self.backend = VaultBackend(config)
+ self.obj_type = obj_type
+ self.obj_id = hashutil.hash_to_bytes(obj_id)
@abc.abstractmethod
def prepare_bundle(self):
@@ -74,25 +74,23 @@
def cook(self):
"""Cook the requested object into a bundle
"""
+ self.backend.set_status(self.obj_type, self.obj_id, 'pending')
content_iter = self.prepare_bundle()
- # Cache the bundle
self.update_cache(content_iter)
- # Make a notification that the bundle have been cooked
- # NOT YET IMPLEMENTED see TODO in function.
- self.notify_bundle_ready(
- notif_data='Bundle %s ready' % hashutil.hash_to_hex(self.obj_id))
+ self.backend.set_status(self.obj_type, self.obj_id, 'done')
+
+ self.notify_bundle_ready()
def update_cache(self, content_iter):
"""Update the cache with id and bundle_content.
"""
- self.cache.add_stream(self.CACHE_TYPE_KEY, self.obj_id, content_iter)
+ self.backend.cache.add_stream(self.CACHE_TYPE_KEY,
+ self.obj_id, content_iter)
- def notify_bundle_ready(self, notif_data):
- # TODO plug this method with the notification method once
- # done.
- pass
+ def notify_bundle_ready(self):
+ self.backend.send_all_notifications(self.obj_type, self.obj_id)
class DirectoryBuilder:
diff --git a/swh/vault/cooking_tasks.py b/swh/vault/cooking_tasks.py
new file mode 100644
--- /dev/null
+++ b/swh/vault/cooking_tasks.py
@@ -0,0 +1,18 @@
+# Copyright (C) 2016-2017 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from swh.scheduler.task import Task
+from swh.vault.cookers import COOKER_TYPES
+
+
+class SWHCookingTask(Task):
+ """Main task which archives a contents batch.
+
+ """
+ task_queue = 'swh_storage_vault_cooking'
+
+ def run(self, config, obj_type, obj_id):
+ cooker = COOKER_TYPES[obj_type](config, obj_type, obj_id)
+ cooker.cook()

File Metadata

Mime Type
text/plain
Expires
Thu, Jan 30, 9:44 AM (19 h, 30 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3223064

Event Timeline