diff --git a/sql/swh-vault-schema.sql b/sql/swh-vault-schema.sql --- a/sql/swh-vault-schema.sql +++ b/sql/swh-vault-schema.sql @@ -31,12 +31,29 @@ ts_last_access timestamptz not null default now(), -- last access progress_msg text, -- progress message - - unique(type, object_id) ); +create unique index concurrently vault_bundle_type_object + on vault_bundle (type, object_id); +create index concurrently vault_bundle_task_id + on vault_bundle (task_id); create table vault_notif_email ( id bigserial primary key, email text not null, -- e-mail to notify bundle_id bigint not null references vault_bundle(id) ); +create index concurrently vault_notif_email_bundle + on vault_notif_email (bundle_id); +create index concurrently vault_notif_email_email + on vault_notif_email (email); + +create table vault_batch ( + id bigserial primary key +); + +create table vault_batch_bundle ( + batch_id bigint not null references vault_batch(id), + bundle_id bigint not null references vault_bundle(id) +); +create unique index concurrently vault_batch_bundle_pkey + on vault_batch_bundle (batch_id, bundle_id); diff --git a/sql/upgrades/002.sql b/sql/upgrades/002.sql new file mode 100644 --- /dev/null +++ b/sql/upgrades/002.sql @@ -0,0 +1,28 @@ +-- SWH DB schema upgrade +-- from_version: 001 +-- to_version: 002 +-- description: Add batches and various indexes + +insert into dbversion(version, release, description) + values(2, now(), 'Add batches and various indexes'); + +create unique index concurrently if not exists vault_bundle_type_object + on vault_bundle (type, object_id); +create index concurrently if not exists vault_bundle_task_id + on vault_bundle (task_id); + +create index concurrently if not exists vault_notif_email_bundle + on vault_notif_email (bundle_id); +create index concurrently if not exists vault_notif_email_email + on vault_notif_email (email); + +create table if not exists vault_batch ( + id bigserial primary key +); + +create table if not exists vault_batch_bundle ( + batch_id bigint not null references vault_batch(id), + bundle_id bigint not null references vault_bundle(id) +); +create unique index concurrently if not exists vault_batch_bundle_pkey + on vault_batch_bundle (batch_id, bundle_id); diff --git a/swh/vault/api/client.py b/swh/vault/api/client.py --- a/swh/vault/api/client.py +++ b/swh/vault/api/client.py @@ -58,3 +58,11 @@ hex_id = hashutil.hash_to_hex(obj_id) return self.post('send_notif/{}/{}' .format(obj_type, hex_id), data=None) + + # Batch endpoints + + def batch_cook(self, batch): + return self.post('batch_cook', data=batch) + + def batch_progress(self, batch_id): + return self.get('batch_progress/{}'.format(batch_id)) diff --git a/swh/vault/api/server.py b/swh/vault/api/server.py --- a/swh/vault/api/server.py +++ b/swh/vault/api/server.py @@ -3,9 +3,10 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import asyncio import aiohttp.web +import asyncio import click +import collections from swh.core import config from swh.core.api_async import (SWHRemoteAPI, @@ -141,6 +142,32 @@ return encode_data(True) # FIXME: success value? +# Batch endpoints + +@asyncio.coroutine +def batch_cook(request): + batch = yield from decode_request(request) + for obj_type, obj_id in batch: + if obj_type not in COOKER_TYPES: + raise aiohttp.web.HTTPNotFound + batch_id = request.app['backend'].batch_cook(batch) + return encode_data({'id': batch_id}) + + +@asyncio.coroutine +def batch_progress(request): + batch_id = request.match_info['batch_id'] + bundles = request.app['backend'].batch_info(batch_id) + if not bundles: + raise aiohttp.web.HTTPNotFound + bundles = [user_info(bundle) for bundle in bundles] + counter = collections.Counter(b['status'] for b in bundles) + res = {'bundles': bundles, 'total': len(bundles), + **{k: 0 for k in ('new', 'pending', 'done', 'failed')}, + **dict(counter)} + return encode_data(res) + + # Web server def make_app(config, **kwargs): @@ -161,6 +188,10 @@ app.router.add_route('POST', '/put_bundle/{type}/{id}', put_bundle) app.router.add_route('POST', '/send_notif/{type}/{id}', send_notif) + # Endpoints for batch requests + app.router.add_route('POST', '/batch_cook', batch_cook) + app.router.add_route('GET', '/batch_progress/{batch_id}', batch_progress) + app['backend'] = VaultBackend(config) return app diff --git a/swh/vault/backend.py b/swh/vault/backend.py --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -3,9 +3,9 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import smtplib import psycopg2 import psycopg2.extras +import smtplib from functools import wraps from email.mime.text import MIMEText @@ -65,6 +65,11 @@ pass +def batch_to_bytes(batch): + return [(obj_type, hashutil.hash_to_bytes(obj_id)) + for obj_type, obj_id in batch] + + # TODO: Imported from swh.scheduler.backend. Factorization needed. def autocommit(fn): @wraps(fn) @@ -236,6 +241,42 @@ return info @autocommit + def batch_cook(self, batch, cursor=None): + """Cook a batch of bundles and returns the cooking id.""" + cursor.execute('''INSERT INTO vault_batch (id) VALUES (DEFAULT) + RETURNING id''') + batch_id = cursor.fetchone()['id'] + batch = batch_to_bytes(batch) + + # Ideally, if we start to do a lot of batch inserts and performance + # becomes an issue, we should be able to rewrite all the following + # function calls to work on batches. It requires a significant amount + # of work (using UPDATE FROM to update task_id, using DELETE with tuple + # unpacking, etc), so we're doing a simple loop in the meantime. + for obj_type, obj_id in batch: + info = self.cook_request(obj_type, obj_id) + cursor.execute('''INSERT INTO vault_batch_bundle + (batch_id, bundle_id) + VALUES (%s, %s)''', (batch_id, info['id'])) + return batch_id + + @autocommit + def batch_info(self, batch_id, cursor=None): + """Fetch information from a batch of bundles""" + cursor.execute(''' + SELECT vault_bundle.id as id, + type, object_id, task_id, task_status, sticky, + ts_created, ts_done, ts_last_access, progress_msg + FROM vault_batch_bundle + LEFT JOIN vault_bundle ON vault_bundle.id = bundle_id + WHERE batch_id = %s''', (batch_id,)) + res = cursor.fetchall() + if res: + for d in res: + d['object_id'] = bytes(d['object_id']) + return res + + @autocommit def is_available(self, obj_type, obj_id, cursor=None): """Check whether a bundle is available for retrieval""" info = self.task_info(obj_type, obj_id, cursor=cursor)