Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7163513
D208.id688.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
18 KB
Subscribers
None
D208.id688.diff
View Options
diff --git a/sql/swh-vault-schema.sql b/sql/swh-vault-schema.sql
new file mode 100644
--- /dev/null
+++ b/sql/swh-vault-schema.sql
@@ -0,0 +1,41 @@
+create table dbversion
+(
+ version int primary key,
+ release timestamptz not null,
+ description text not null
+);
+comment on table dbversion is 'Schema update tracking';
+insert into dbversion (version, release, description)
+ values (1, now(), 'Initial version');
+
+create domain obj_hash as bytea;
+
+create type cook_type as enum ('directory', 'revision_gitfast');
+comment on type cook_type is 'Type of the requested bundle';
+
+create type cook_status as enum ('new', 'pending', 'done');
+comment on type cook_status is 'Status of the cooking';
+
+create table vault_bundle (
+ id bigserial primary key,
+
+ type cook_type not null, -- requested cooking type
+ object_id obj_hash not null, -- requested object ID
+
+ task_uuid uuid not null, -- celery UUID of the cooking task
+ task_status cook_status not null default 'new', -- status of the task
+
+ ts_created timestamptz not null default now(), -- timestamp of creation
+ ts_done timestamptz, -- timestamp of the cooking result
+ ts_last_access timestamptz not null default now(), -- last access
+
+ progress_msg text, -- progress message
+
+ unique(type, object_id),
+);
+
+create table vault_notif_email (
+ id bigserial primary key,
+ email text not null, -- e-mail to notify
+ bundle_id bigint not null references vault_bundle(id)
+);
diff --git a/swh/vault/api/client.py b/swh/vault/api/client.py
--- a/swh/vault/api/client.py
+++ b/swh/vault/api/client.py
@@ -14,14 +14,11 @@
def __init__(self, base_url):
super().__init__(api_exception=StorageAPIError, url=base_url)
- def ls(self, obj_type):
- return self.get('vault/{}/'.format(obj_type))
-
def fetch(self, obj_type, obj_id):
- return self.get('vault/{}/{}/'.format(obj_type,
+ return self.get('fetch/{}/{}/'.format(obj_type,
hashutil.hash_to_hex(obj_id)))
def cook(self, obj_type, obj_id):
- return self.post('vault/{}/{}/'.format(obj_type,
- hashutil.hash_to_hex(obj_id)),
+ return self.post('cook/{}/{}/'.format(obj_type,
+ hashutil.hash_to_hex(obj_id)),
data={})
diff --git a/swh/vault/api/cooking_tasks.py b/swh/vault/api/cooking_tasks.py
deleted file mode 100644
--- a/swh/vault/api/cooking_tasks.py
+++ /dev/null
@@ -1,27 +0,0 @@
-# Copyright (C) 2016-2017 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-from swh.scheduler.task import Task
-from swh.model import hashutil
-from ..cache import VaultCache
-from ..cookers import COOKER_TYPES
-from ... import get_storage
-
-
-class SWHCookingTask(Task):
- """Main task which archives a contents batch.
-
- """
- task_queue = 'swh_storage_vault_cooking'
-
- def run(self, type, hex_id, storage_args, cache_args):
- # Initialize elements
- storage = get_storage(**storage_args)
- cache = VaultCache(**cache_args)
- # Initialize cooker
- obj_id = hashutil.hash_to_bytes(hex_id)
- cooker = COOKER_TYPES[type](storage, cache, obj_id)
- # Perform the cooking
- cooker.cook()
diff --git a/swh/vault/api/server.py b/swh/vault/api/server.py
--- a/swh/vault/api/server.py
+++ b/swh/vault/api/server.py
@@ -3,20 +3,15 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import asyncio
+import aiohttp.web
import click
-import re
-from flask import abort, g
-from werkzeug.routing import BaseConverter
-from swh.core import config
-from swh.core.api import (SWHServerAPIApp, error_handler,
- encode_data_server as encode_data)
-from swh.scheduler.utils import get_task
-from swh.storage.vault.api.cooking_tasks import SWHCookingTask # noqa
-from swh.storage.vault.cache import VaultCache
-from swh.storage.vault.cookers import COOKER_TYPES
-
-cooking_task_name = 'swh.storage.vault.api.cooking_tasks.SWHCookingTask'
+from swh.core import config
+from swh.core.api_async import (SWHRemoteAPI,
+ encode_data_server as encode_data)
+from swh.vault.cookers import COOKER_TYPES
+from swh.vault.backend import VaultBackend
DEFAULT_CONFIG = {
@@ -30,56 +25,49 @@
},
},
}),
- 'cache': ('dict', {'root': '/tmp/vaultcache'})
+ 'cache': ('dict', {'root': '/tmp/vaultcache'}),
+ 'vault_db': ('str', 'dbname=swh-vault')
}
-class CookerConverter(BaseConverter):
- def __init__(self, url_map, *items):
- super().__init__(url_map)
- types = [re.escape(c) for c in COOKER_TYPES]
- self.regex = '({})'.format('|'.join(types))
-
+@asyncio.coroutine
+def index(request):
+ return aiohttp.web.Response(body="SWH Vault API server")
-app = SWHServerAPIApp(__name__)
-app.url_map.converters['cooker'] = CookerConverter
+@asyncio.coroutine
+def vault_fetch(request):
+ obj_type = request.match_info['type']
+ obj_id = request.match_info['id']
-@app.errorhandler(Exception)
-def my_error_handler(exception):
- return error_handler(exception, encode_data)
+ if not request.app['backend'].is_available(obj_type, obj_id):
+ raise aiohttp.web.HTTPNotFound
+ return encode_data(request.app['backend'].fetch(obj_type, obj_id))
-@app.before_request
-def before_request():
- g.cache = VaultCache(**app.config['cache'])
+@asyncio.coroutine
+def vault_cook(request):
+ obj_type = request.match_info['type']
+ obj_id = request.match_info['id']
+ email = request.args.get('email')
-@app.route('/')
-def index():
- return 'SWH vault API server'
+ if obj_type not in COOKER_TYPES:
+ raise aiohttp.web.HTTPNotFound
+ request.app['backend'].cook_request(obj_type, obj_id, email)
-@app.route('/vault/<cooker:type>/', methods=['GET'])
-def vault_ls(type):
- return encode_data(list(
- g.cache.ls(type)
- ))
-
-
-@app.route('/vault/<cooker:type>/<id>/', methods=['GET'])
-def vault_fetch(type, id):
- if not g.cache.is_cached(type, id):
- abort(404)
- return encode_data(g.cache.get(type, id))
+ # Return url to get the content and 201 CREATED
+ return encode_data('/vault/{}/{}/'.format(obj_type, obj_id), status=201)
-@app.route('/vault/<cooker:type>/<id>/', methods=['POST'])
-def vault_cook(type, id):
- task = get_task(cooking_task_name)
- task.delay(type, id, app.config['storage'], app.config['cache'])
- # Return url to get the content and 201 CREATED
- return encode_data('/vault/%s/%s/' % (type, id)), 201
+def make_app(config, **kwargs):
+ app = SWHRemoteAPI(**kwargs)
+ app.router.add_route('GET', '/', index)
+ app.router.add_route('GET', '/fetch/{type}/{id}', vault_fetch)
+ app.router.add_route('POST', '/cook/{type}/{id}', vault_cook)
+ app['backend'] = VaultBackend(config)
+ return app
@click.command()
@@ -90,8 +78,8 @@
@click.option('--debug/--nodebug', default=True,
help="Indicates if the server should run in debug mode")
def launch(config_path, host, port, debug):
- app.config.update(config.read(config_path, DEFAULT_CONFIG))
- app.run(host, port=int(port), debug=bool(debug))
+ app = make_app(config.read(config_path, DEFAULT_CONFIG), debug=bool(debug))
+ aiohttp.web.run_app(app, host=host, port=int(port))
if __name__ == '__main__':
diff --git a/swh/vault/backend.py b/swh/vault/backend.py
new file mode 100644
--- /dev/null
+++ b/swh/vault/backend.py
@@ -0,0 +1,211 @@
+# Copyright (C) 2017 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import textwrap
+import smtplib
+import celery
+import psycopg2
+import psycopg2.extras
+
+from functools import wraps
+from email.mime.text import MIMEText
+
+from swh.model import hashutil
+from swh.scheduler.utils import get_task
+from swh.storage.vault.cache import VaultCache
+from swh.storage.vault.cookers import COOKER_TYPES
+from swh.storage.vault.cooking_tasks import SWHCookingTask # noqa
+
+cooking_task_name = 'swh.storage.vault.cooking_tasks.SWHCookingTask'
+
+
+# TODO: Imported from swh.scheduler.backend. Factorization needed.
+def autocommit(fn):
+ @wraps(fn)
+ def wrapped(self, *args, **kwargs):
+ autocommit = False
+ # TODO: I don't like using None, it's confusing for the user. how about
+ # a NEW_CURSOR object()?
+ if 'cursor' not in kwargs or not kwargs['cursor']:
+ autocommit = True
+ kwargs['cursor'] = self.cursor()
+
+ try:
+ ret = fn(self, *args, **kwargs)
+ except:
+ if autocommit:
+ self.rollback()
+ raise
+
+ if autocommit:
+ self.commit()
+
+ return ret
+
+ return wrapped
+
+
+# TODO: This has to be factorized with other database base classes and helpers
+# (swh.scheduler.backend.SchedulerBackend, swh.storage.db.BaseDb, ...)
+# The three first methods are imported from swh.scheduler.backend.
+class VaultBackend:
+ """
+ Backend for the Software Heritage vault.
+ """
+ def __init__(self, config):
+ self.config = config
+ self.cache = VaultCache(**self.config['cache'])
+ self.db = None
+ self.reconnect()
+ self.smtp_server = smtplib.SMTP('localhost')
+
+ def reconnect(self):
+ if not self.db or self.db.closed:
+ self.db = psycopg2.connect(
+ dsn=self.config['vault_db'],
+ cursor_factory=psycopg2.extras.RealDictCursor,
+ )
+
+ def cursor(self):
+ """Return a fresh cursor on the database, with auto-reconnection in case
+ of failure"""
+ cur = None
+
+ # Get a fresh cursor and reconnect at most three times
+ tries = 0
+ while True:
+ tries += 1
+ try:
+ cur = self.db.cursor()
+ cur.execute('select 1')
+ break
+ except psycopg2.OperationalError:
+ if tries < 3:
+ self.reconnect()
+ else:
+ raise
+ return cur
+
+ @autocommit
+ def task_info(self, obj_type, obj_id, cursor=None):
+ res = cursor.execute('''
+ SELECT id, type, object_id, task_uuid, task_status,
+ ts_request, ts_done
+ FROM vault_bundle
+ WHERE type = %s AND object_id = %s''', (obj_type, obj_id))
+ return res.fetchone()
+
+ @autocommit
+ def create_task(self, obj_type, obj_id, cursor=None):
+ assert obj_type in COOKER_TYPES
+
+ task_uuid = celery.uuid()
+ cursor.execute('''
+ INSERT INTO vault_bundle (type, object_id, task_uuid)
+ VALUES (%s, %s, %s)''', (obj_type, obj_id, task_uuid))
+
+ args = [self.config, obj_type, obj_id]
+ task = get_task(cooking_task_name)
+ task.apply_async(args, task_id=task_uuid)
+
+ @autocommit
+ def add_notif_email(self, obj_type, obj_id, email, cursor=None):
+ cursor.execute('''
+ INSERT INTO vault_notif_email (email, bundle_id)
+ VALUES (%s, (SELECT id FROM vault_bundle
+ WHERE type = %s AND object_id = %s))''',
+ (email, obj_type, obj_id))
+
+ @autocommit
+ def cook_request(self, obj_type, obj_id, email=None, cursor=None):
+ info = self.task_info(obj_type, obj_id)
+ if info is None:
+ self.create_task(obj_type, obj_id)
+ if email is not None:
+ if info is not None and info['status'] == 'done':
+ self.send_notification(None, email, obj_type, obj_id)
+ else:
+ self.add_notif_email(obj_type, obj_id, email)
+
+ @autocommit
+ def is_available(self, obj_type, obj_id, cursor=None):
+ info = self.task_info(obj_type, obj_id, cursor=cursor)
+ return (info is not None
+ and info['task_status'] == 'done'
+ and self.cache.is_cached(obj_type, obj_id))
+
+ @autocommit
+ def fetch(self, obj_type, obj_id, cursor=None):
+ if not self.is_available(obj_type, obj_id, cursor=cursor):
+ return None
+ self.update_access_ts(obj_type, obj_id, cursor=cursor)
+ return self.cache.get(obj_type, obj_id)
+
+ @autocommit
+ def update_access_ts(self, obj_type, obj_id, cursor=None):
+ cursor.execute('''
+ UPDATE vault_bundle
+ SET ts_last_access = NOW()
+ WHERE type = %s AND object_id = %s''',
+ (obj_type, obj_id))
+
+ @autocommit
+ def set_status(self, obj_type, obj_id, status, cursor=None):
+ req = ('''
+ UPDATE vault_bundle
+ SET status = %s'''
+ + ('''AND ts_done = NOW()''' if status == 'done' else '')
+ + '''WHERE type = %s AND object_id = %s''')
+ cursor.execute(req, (status, obj_type, obj_id))
+
+ @autocommit
+ def set_progress(self, obj_type, obj_id, progress, cursor=None):
+ cursor.execute('''
+ UPDATE vault_bundle
+ SET progress = %s
+ WHERE type = %s AND object_id = %s''',
+ (progress, obj_type, obj_id))
+
+ @autocommit
+ def send_all_notifications(self, obj_type, obj_id, cursor=None):
+ res = cursor.execute('''
+ SELECT id, email
+ FROM vault_notif_email
+ RIGHT JOIN vault_bundle ON bundle_id = vault_bundle.id
+ WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s''',
+ (obj_type, obj_id))
+ for notif_id, email in res:
+ self.send_notification(notif_id, email, obj_type, obj_id)
+
+ @autocommit
+ def send_notification(self, n_id, email, obj_type, obj_id, cursor=None):
+ hex_id = hashutil.hash_to_hex(obj_id)
+ text = """
+ You have requested a bundle of type `{obj_type}` for the
+ object `{hex_id}` from the Software Heritage Archive.
+
+ The bundle you requested is now available for download at the
+ following address:
+
+ {url}
+
+ Please keep in mind that this link might expire at some point,
+ in which case you will need to request the bundle again.
+ """
+ text = text.format(obj_type=obj_type, hex_id=hex_id, url='URL_TODO')
+ text = textwrap.dedent(text)
+ text = textwrap.wrap(text, 72)
+ msg = MIMEText(text)
+ msg['Subject'] = ("The `{obj_type}` bundle of `{hex_id}` is ready"
+ .format(obj_type=obj_type, hex_id=hex_id))
+ msg['From'] = 'vault@softwareheritage.org'
+ msg['To'] = email
+
+ self.smtp_server.send_message(msg)
+
+ if n_id is not None:
+ cursor.execute('''
+ DELETE FROM vault_notif_email
+ WHERE id = %s''', (n_id,))
diff --git a/swh/vault/conf.yaml b/swh/vault/conf.yaml
deleted file mode 100644
diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py
--- a/swh/vault/cookers/base.py
+++ b/swh/vault/cookers/base.py
@@ -14,6 +14,8 @@
from pathlib import Path
from swh.model import hashutil
+from swh.storage import get_storage
+from swh.vault.backend import VaultBackend
def get_tar_bytes(path, arcname=None):
@@ -41,13 +43,10 @@
To define a new cooker, inherit from this class and override:
- CACHE_TYPE_KEY: key to use for the bundle to reference in cache
- def cook(): cook the object into a bundle
- - def notify_bundle_ready(notif_data): notify the
- bundle is ready.
-
"""
CACHE_TYPE_KEY = None
- def __init__(self, storage, cache, obj_id):
+ def __init__(self, config, obj_type, obj_id):
"""Initialize the cooker.
The type of the object represented by the id depends on the
@@ -59,9 +58,10 @@
cache: the cache where to store the bundle
obj_id: id of the object to be cooked into a bundle.
"""
- self.storage = storage
- self.cache = cache
- self.obj_id = obj_id
+ self.storage = get_storage(**config['storage'])
+ self.backend = VaultBackend(config)
+ self.obj_type = obj_type
+ self.obj_id = hashutil.hash_to_bytes(obj_id)
@abc.abstractmethod
def prepare_bundle(self):
@@ -74,25 +74,23 @@
def cook(self):
"""Cook the requested object into a bundle
"""
+ self.backend.set_status(self.obj_type, self.obj_id, 'pending')
content_iter = self.prepare_bundle()
- # Cache the bundle
self.update_cache(content_iter)
- # Make a notification that the bundle have been cooked
- # NOT YET IMPLEMENTED see TODO in function.
- self.notify_bundle_ready(
- notif_data='Bundle %s ready' % hashutil.hash_to_hex(self.obj_id))
+ self.backend.set_status(self.obj_type, self.obj_id, 'done')
+
+ self.notify_bundle_ready()
def update_cache(self, content_iter):
"""Update the cache with id and bundle_content.
"""
- self.cache.add_stream(self.CACHE_TYPE_KEY, self.obj_id, content_iter)
+ self.backend.cache.add_stream(self.CACHE_TYPE_KEY,
+ self.obj_id, content_iter)
- def notify_bundle_ready(self, notif_data):
- # TODO plug this method with the notification method once
- # done.
- pass
+ def notify_bundle_ready(self):
+ self.backend.send_all_notifications(self.obj_type, self.obj_id)
class DirectoryBuilder:
diff --git a/swh/vault/cooking_tasks.py b/swh/vault/cooking_tasks.py
new file mode 100644
--- /dev/null
+++ b/swh/vault/cooking_tasks.py
@@ -0,0 +1,18 @@
+# Copyright (C) 2016-2017 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from swh.scheduler.task import Task
+from swh.vault.cookers import COOKER_TYPES
+
+
+class SWHCookingTask(Task):
+ """Main task which archives a contents batch.
+
+ """
+ task_queue = 'swh_storage_vault_cooking'
+
+ def run(self, config, obj_type, obj_id):
+ cooker = COOKER_TYPES[obj_type](config, obj_type, obj_id)
+ cooker.cook()
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jan 30, 9:44 AM (19 h, 30 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3223064
Attached To
D208: [DNR] add database backend
Event Timeline
Log In to Comment