diff --git a/PKG-INFO b/PKG-INFO index 645fbef..367a189 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.vault -Version: 0.0.12 +Version: 0.0.13 Summary: Software Heritage vault Home-page: https://forge.softwareheritage.org/diffusion/DVAU/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/docs/getting-started.rst b/docs/getting-started.rst index 9239d96..49d7760 100644 --- a/docs/getting-started.rst +++ b/docs/getting-started.rst @@ -1,62 +1,62 @@ .. _vault-primer: Getting started =============== The Vault is a service in charge of reconstructing parts of the archive as self-contained bundles, that can then be imported locally, for instance in a Git repository. This is basically where you can do a ``git clone`` of a repository stored in Software Heritage. The Vault is asynchronous : you first need to do a request to prepare the bundle you need, and then a second request to fetch the bundle once the Vault has finished to reconstitute the bundle. Example: retrieving a directory ------------------------------- First, ask the Vault to prepare your bundle: .. code:: shell - curl -X POST https://archive.softwareheritage.org/1/vault/directory/:dir_id/ + curl -X POST https://archive.softwareheritage.org/api/1/vault/directory/:dir_id/ where ``:dir_id`` is a :py:func:`directory identifier `. This initial request and all subsequent requests to this endpoint will return some JSON data containing information about the progress of bundle creation: .. code:: json { "id": 42, "fetch_url": "/api/1/vault/directory/:dir_id/raw/", "obj_id": ":dir_id", "obj_type": "directory", "progress_message": "Creating tarball...", "status": "pending" } Once the status is ``done``, you can fetch the bundle at the address given in the ``fetch_url`` field. .. code:: shell - curl -o bundle.tar.gz https://archive.softwareheritage.org/1/vault/directory/:dir_id/raw + curl -o bundle.tar.gz https://archive.softwareheritage.org/api/1/vault/directory/:dir_id/raw tar xaf bundle.tar.gz E-mail notifications -------------------- You can also ask to be notified by e-mail once the bundle you requested is ready, by giving an ``email`` POST parameter: .. code:: shell curl -X POST -d 'email=example@example.com' \ - https://archive.softwareheritage.org/1/vault/directory/:dir_id/ + https://archive.softwareheritage.org/api/1/vault/directory/:dir_id/ API reference ~~~~~~~~~~~~~ For a more exhaustive overview of the Vault API, see the :ref:`vault-api-ref`. diff --git a/sql/swh-vault-schema.sql b/sql/swh-vault-schema.sql index 3f5640e..8e874e7 100644 --- a/sql/swh-vault-schema.sql +++ b/sql/swh-vault-schema.sql @@ -1,42 +1,59 @@ create table dbversion ( version int primary key, release timestamptz not null, description text not null ); comment on table dbversion is 'Schema update tracking'; insert into dbversion (version, release, description) values (1, now(), 'Initial version'); create domain obj_hash as bytea; create type cook_type as enum ('directory', 'revision_gitfast'); comment on type cook_type is 'Type of the requested bundle'; create type cook_status as enum ('new', 'pending', 'done', 'failed'); comment on type cook_status is 'Status of the cooking'; create table vault_bundle ( id bigserial primary key, type cook_type not null, -- requested cooking type object_id obj_hash not null, -- requested object ID task_id integer, -- scheduler task id task_status cook_status not null default 'new', -- status of the task sticky boolean not null default false, -- bundle cannot expire ts_created timestamptz not null default now(), -- timestamp of creation ts_done timestamptz, -- timestamp of the cooking result ts_last_access timestamptz not null default now(), -- last access progress_msg text, -- progress message - - unique(type, object_id) ); +create unique index concurrently vault_bundle_type_object + on vault_bundle (type, object_id); +create index concurrently vault_bundle_task_id + on vault_bundle (task_id); create table vault_notif_email ( id bigserial primary key, email text not null, -- e-mail to notify bundle_id bigint not null references vault_bundle(id) ); +create index concurrently vault_notif_email_bundle + on vault_notif_email (bundle_id); +create index concurrently vault_notif_email_email + on vault_notif_email (email); + +create table vault_batch ( + id bigserial primary key +); + +create table vault_batch_bundle ( + batch_id bigint not null references vault_batch(id), + bundle_id bigint not null references vault_bundle(id) +); +create unique index concurrently vault_batch_bundle_pkey + on vault_batch_bundle (batch_id, bundle_id); diff --git a/sql/upgrades/002.sql b/sql/upgrades/002.sql new file mode 100644 index 0000000..42c1da8 --- /dev/null +++ b/sql/upgrades/002.sql @@ -0,0 +1,28 @@ +-- SWH DB schema upgrade +-- from_version: 001 +-- to_version: 002 +-- description: Add batches and various indexes + +insert into dbversion(version, release, description) + values(2, now(), 'Add batches and various indexes'); + +create unique index concurrently if not exists vault_bundle_type_object + on vault_bundle (type, object_id); +create index concurrently if not exists vault_bundle_task_id + on vault_bundle (task_id); + +create index concurrently if not exists vault_notif_email_bundle + on vault_notif_email (bundle_id); +create index concurrently if not exists vault_notif_email_email + on vault_notif_email (email); + +create table if not exists vault_batch ( + id bigserial primary key +); + +create table if not exists vault_batch_bundle ( + batch_id bigint not null references vault_batch(id), + bundle_id bigint not null references vault_bundle(id) +); +create unique index concurrently if not exists vault_batch_bundle_pkey + on vault_batch_bundle (batch_id, bundle_id); diff --git a/swh.vault.egg-info/PKG-INFO b/swh.vault.egg-info/PKG-INFO index 645fbef..367a189 100644 --- a/swh.vault.egg-info/PKG-INFO +++ b/swh.vault.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.vault -Version: 0.0.12 +Version: 0.0.13 Summary: Software Heritage vault Home-page: https://forge.softwareheritage.org/diffusion/DVAU/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.vault.egg-info/SOURCES.txt b/swh.vault.egg-info/SOURCES.txt index df4a40e..357f62a 100644 --- a/swh.vault.egg-info/SOURCES.txt +++ b/swh.vault.egg-info/SOURCES.txt @@ -1,49 +1,50 @@ .gitignore AUTHORS LICENSE MANIFEST.in Makefile requirements-swh.txt requirements.txt setup.py version.txt debian/changelog debian/compat debian/control debian/copyright debian/rules debian/source/format docs/.gitignore docs/Makefile docs/api.rst docs/conf.py docs/getting-started.rst docs/index.rst docs/_static/.placeholder docs/_templates/.placeholder sql/swh-vault-schema.sql +sql/upgrades/002.sql swh/__init__.py swh.vault.egg-info/PKG-INFO swh.vault.egg-info/SOURCES.txt swh.vault.egg-info/dependency_links.txt swh.vault.egg-info/not-zip-safe swh.vault.egg-info/requires.txt swh.vault.egg-info/top_level.txt swh/vault/__init__.py swh/vault/backend.py swh/vault/cache.py swh/vault/cooking_tasks.py swh/vault/to_disk.py swh/vault/api/__init__.py swh/vault/api/client.py swh/vault/api/server.py swh/vault/cookers/__init__.py swh/vault/cookers/base.py swh/vault/cookers/directory.py swh/vault/cookers/revision_flat.py swh/vault/cookers/revision_gitfast.py swh/vault/tests/test_backend.py swh/vault/tests/test_cache.py swh/vault/tests/test_cookers.py swh/vault/tests/test_cookers_base.py swh/vault/tests/vault_testing.py \ No newline at end of file diff --git a/swh/vault/api/client.py b/swh/vault/api/client.py index db63db9..138df1d 100644 --- a/swh/vault/api/client.py +++ b/swh/vault/api/client.py @@ -1,60 +1,68 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.model import hashutil from swh.core.api import SWHRemoteAPI class VaultAPIError(Exception): """Vault API Error""" def __str__(self): return ('An unexpected error occurred in the Vault backend: {}' .format(self.args)) class RemoteVaultClient(SWHRemoteAPI): """Client to the Software Heritage vault cache.""" def __init__(self, base_url): super().__init__(api_exception=VaultAPIError, url=base_url) # Web API endpoints def fetch(self, obj_type, obj_id): hex_id = hashutil.hash_to_hex(obj_id) return self.get('fetch/{}/{}'.format(obj_type, hex_id)) def cook(self, obj_type, obj_id, email=None): hex_id = hashutil.hash_to_hex(obj_id) return self.post('cook/{}/{}'.format(obj_type, hex_id), data={}, params=({'email': email} if email else None)) def progress(self, obj_type, obj_id): hex_id = hashutil.hash_to_hex(obj_id) return self.get('progress/{}/{}'.format(obj_type, hex_id)) # Cookers endpoints def set_progress(self, obj_type, obj_id, progress): hex_id = hashutil.hash_to_hex(obj_id) return self.post('set_progress/{}/{}'.format(obj_type, hex_id), data=progress) def set_status(self, obj_type, obj_id, status): hex_id = hashutil.hash_to_hex(obj_id) return self.post('set_status/{}/{}' .format(obj_type, hex_id), data=status) # TODO: handle streaming properly def put_bundle(self, obj_type, obj_id, bundle): hex_id = hashutil.hash_to_hex(obj_id) return self.post('put_bundle/{}/{}' .format(obj_type, hex_id), data=bundle) def send_notif(self, obj_type, obj_id): hex_id = hashutil.hash_to_hex(obj_id) return self.post('send_notif/{}/{}' .format(obj_type, hex_id), data=None) + + # Batch endpoints + + def batch_cook(self, batch): + return self.post('batch_cook', data=batch) + + def batch_progress(self, batch_id): + return self.get('batch_progress/{}'.format(batch_id)) diff --git a/swh/vault/api/server.py b/swh/vault/api/server.py index e2c419a..78684f4 100644 --- a/swh/vault/api/server.py +++ b/swh/vault/api/server.py @@ -1,186 +1,217 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import asyncio import aiohttp.web +import asyncio import click +import collections from swh.core import config from swh.core.api_async import (SWHRemoteAPI, encode_data_server as encode_data, decode_request) from swh.model import hashutil from swh.vault.cookers import COOKER_TYPES from swh.vault.backend import VaultBackend, NotFoundExc DEFAULT_CONFIG_PATH = 'vault/server' DEFAULT_CONFIG = { 'storage': ('dict', { 'cls': 'local', 'args': { 'db': 'dbname=softwareheritage-dev', 'objstorage': { 'cls': 'pathslicing', 'args': { 'root': '/srv/softwareheritage/objects', 'slicing': '0:2/2:4/4:6', }, }, }, }), 'cache': ('dict', { 'cls': 'pathslicing', 'args': { 'root': '/srv/softwareheritage/vault', 'slicing': '0:1/1:5', }, }), 'client_max_size': ('int', 1024 ** 3), 'db': ('str', 'dbname=softwareheritage-vault-dev'), 'scheduling_db': ('str', 'dbname=softwareheritage-scheduler-dev'), } @asyncio.coroutine def index(request): return aiohttp.web.Response(body="SWH Vault API server") # Web API endpoints @asyncio.coroutine def vault_fetch(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] if not request.app['backend'].is_available(obj_type, obj_id): raise aiohttp.web.HTTPNotFound return encode_data(request.app['backend'].fetch(obj_type, obj_id)) def user_info(task_info): return {'id': task_info['id'], 'status': task_info['task_status'], 'progress_message': task_info['progress_msg'], 'obj_type': task_info['type'], 'obj_id': hashutil.hash_to_hex(task_info['object_id'])} @asyncio.coroutine def vault_cook(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] email = request.query.get('email') sticky = request.query.get('sticky') in ('true', '1') if obj_type not in COOKER_TYPES: raise aiohttp.web.HTTPNotFound try: info = request.app['backend'].cook_request(obj_type, obj_id, email=email, sticky=sticky) except NotFoundExc: raise aiohttp.web.HTTPNotFound # TODO: return 201 status (Created) once the api supports it return encode_data(user_info(info)) @asyncio.coroutine def vault_progress(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] info = request.app['backend'].task_info(obj_type, obj_id) if not info: raise aiohttp.web.HTTPNotFound return encode_data(user_info(info)) # Cookers endpoints @asyncio.coroutine def set_progress(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] progress = yield from decode_request(request) request.app['backend'].set_progress(obj_type, obj_id, progress) return encode_data(True) # FIXME: success value? @asyncio.coroutine def set_status(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] status = yield from decode_request(request) request.app['backend'].set_status(obj_type, obj_id, status) return encode_data(True) # FIXME: success value? @asyncio.coroutine def put_bundle(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] # TODO: handle streaming properly content = yield from decode_request(request) request.app['backend'].cache.add(obj_type, obj_id, content) return encode_data(True) # FIXME: success value? @asyncio.coroutine def send_notif(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] request.app['backend'].send_all_notifications(obj_type, obj_id) return encode_data(True) # FIXME: success value? +# Batch endpoints + +@asyncio.coroutine +def batch_cook(request): + batch = yield from decode_request(request) + for obj_type, obj_id in batch: + if obj_type not in COOKER_TYPES: + raise aiohttp.web.HTTPNotFound + batch_id = request.app['backend'].batch_cook(batch) + return encode_data({'id': batch_id}) + + +@asyncio.coroutine +def batch_progress(request): + batch_id = request.match_info['batch_id'] + bundles = request.app['backend'].batch_info(batch_id) + if not bundles: + raise aiohttp.web.HTTPNotFound + bundles = [user_info(bundle) for bundle in bundles] + counter = collections.Counter(b['status'] for b in bundles) + res = {'bundles': bundles, 'total': len(bundles), + **{k: 0 for k in ('new', 'pending', 'done', 'failed')}, + **dict(counter)} + return encode_data(res) + + # Web server def make_app(config, **kwargs): if 'client_max_size' in config: kwargs['client_max_size'] = config['client_max_size'] app = SWHRemoteAPI(**kwargs) app.router.add_route('GET', '/', index) # Endpoints used by the web API app.router.add_route('GET', '/fetch/{type}/{id}', vault_fetch) app.router.add_route('POST', '/cook/{type}/{id}', vault_cook) app.router.add_route('GET', '/progress/{type}/{id}', vault_progress) # Endpoints used by the Cookers app.router.add_route('POST', '/set_progress/{type}/{id}', set_progress) app.router.add_route('POST', '/set_status/{type}/{id}', set_status) app.router.add_route('POST', '/put_bundle/{type}/{id}', put_bundle) app.router.add_route('POST', '/send_notif/{type}/{id}', send_notif) + # Endpoints for batch requests + app.router.add_route('POST', '/batch_cook', batch_cook) + app.router.add_route('GET', '/batch_progress/{batch_id}', batch_progress) + app['backend'] = VaultBackend(config) return app def make_app_from_configfile(config_path=DEFAULT_CONFIG_PATH, **kwargs): cfg = config.load_named_config(config_path, DEFAULT_CONFIG) return make_app(cfg, **kwargs) @click.command() @click.argument('config-path', required=1) @click.option('--host', default='0.0.0.0', help="Host to run the server") @click.option('--port', default=5005, type=click.INT, help="Binding port of the server") @click.option('--debug/--nodebug', default=True, help="Indicates if the server should run in debug mode") def launch(config_path, host, port, debug): app = make_app(config.read(config_path, DEFAULT_CONFIG), debug=bool(debug)) aiohttp.web.run_app(app, host=host, port=int(port)) if __name__ == '__main__': launch() diff --git a/swh/vault/backend.py b/swh/vault/backend.py index 09f0768..4a0a041 100644 --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -1,385 +1,426 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import smtplib import psycopg2 import psycopg2.extras +import smtplib from functools import wraps from email.mime.text import MIMEText from swh.model import hashutil from swh.scheduler.backend import SchedulerBackend from swh.scheduler.utils import create_oneshot_task_dict from swh.vault.cache import VaultCache from swh.vault.cookers import get_cooker from swh.vault.cooking_tasks import SWHCookingTask # noqa cooking_task_name = 'swh.vault.cooking_tasks.SWHCookingTask' NOTIF_EMAIL_FROM = ('"Software Heritage Vault" ' '') NOTIF_EMAIL_SUBJECT_SUCCESS = ("Bundle ready: {obj_type} {short_id}") NOTIF_EMAIL_SUBJECT_FAILURE = ("Bundle failed: {obj_type} {short_id}") NOTIF_EMAIL_BODY_SUCCESS = """ You have requested the following bundle from the Software Heritage Vault: Object Type: {obj_type} Object ID: {hex_id} This bundle is now available for download at the following address: {url} Please keep in mind that this link might expire at some point, in which case you will need to request the bundle again. --\x20 The Software Heritage Developers """ NOTIF_EMAIL_BODY_FAILURE = """ You have requested the following bundle from the Software Heritage Vault: Object Type: {obj_type} Object ID: {hex_id} This bundle could not be cooked for the following reason: {progress_msg} We apologize for the inconvenience. --\x20 The Software Heritage Developers """ class NotFoundExc(Exception): """Bundle was not found.""" pass +def batch_to_bytes(batch): + return [(obj_type, hashutil.hash_to_bytes(obj_id)) + for obj_type, obj_id in batch] + + # TODO: Imported from swh.scheduler.backend. Factorization needed. def autocommit(fn): @wraps(fn) def wrapped(self, *args, **kwargs): autocommit = False # TODO: I don't like using None, it's confusing for the user. how about # a NEW_CURSOR object()? if 'cursor' not in kwargs or not kwargs['cursor']: autocommit = True kwargs['cursor'] = self.cursor() try: ret = fn(self, *args, **kwargs) except: if autocommit: self.rollback() raise if autocommit: self.commit() return ret return wrapped # TODO: This has to be factorized with other database base classes and helpers # (swh.scheduler.backend.SchedulerBackend, swh.storage.db.BaseDb, ...) # The three first methods are imported from swh.scheduler.backend. class VaultBackend: """ Backend for the Software Heritage vault. """ def __init__(self, config): self.config = config self.cache = VaultCache(self.config['cache']) self.db = None self.reconnect() self.smtp_server = smtplib.SMTP('localhost', 25) if self.config['scheduling_db'] is not None: self.scheduler = SchedulerBackend( scheduling_db=self.config['scheduling_db']) def reconnect(self): """Reconnect to the database.""" if not self.db or self.db.closed: self.db = psycopg2.connect( dsn=self.config['db'], cursor_factory=psycopg2.extras.RealDictCursor, ) def close(self): """Close the underlying database connection.""" self.db.close() def cursor(self): """Return a fresh cursor on the database, with auto-reconnection in case of failure""" cur = None # Get a fresh cursor and reconnect at most three times tries = 0 while True: tries += 1 try: cur = self.db.cursor() cur.execute('select 1') break except psycopg2.OperationalError: if tries < 3: self.reconnect() else: raise return cur def commit(self): """Commit a transaction""" self.db.commit() def rollback(self): """Rollback a transaction""" self.db.rollback() @autocommit def task_info(self, obj_type, obj_id, cursor=None): """Fetch information from a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' SELECT id, type, object_id, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_bundle WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) res = cursor.fetchone() if res: res['object_id'] = bytes(res['object_id']) return res def _send_task(self, args): """Send a cooking task to the celery scheduler""" task = create_oneshot_task_dict('swh-vault-cooking', *args) added_tasks = self.scheduler.create_tasks([task]) return added_tasks[0]['id'] @autocommit def create_task(self, obj_type, obj_id, sticky=False, cursor=None): """Create and send a cooking task""" obj_id = hashutil.hash_to_bytes(obj_id) hex_id = hashutil.hash_to_hex(obj_id) args = [obj_type, hex_id] backend_storage_config = {'storage': self.config['storage']} cooker_class = get_cooker(obj_type) cooker = cooker_class(*args, override_cfg=backend_storage_config) if not cooker.check_exists(): raise NotFoundExc("Object {} was not found.".format(hex_id)) cursor.execute(''' INSERT INTO vault_bundle (type, object_id, sticky) VALUES (%s, %s, %s)''', (obj_type, obj_id, sticky)) self.commit() task_id = self._send_task(args) cursor.execute(''' UPDATE vault_bundle SET task_id = %s WHERE type = %s AND object_id = %s''', (task_id, obj_type, obj_id)) @autocommit def add_notif_email(self, obj_type, obj_id, email, cursor=None): """Add an e-mail address to notify when a given bundle is ready""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' INSERT INTO vault_notif_email (email, bundle_id) VALUES (%s, (SELECT id FROM vault_bundle WHERE type = %s AND object_id = %s))''', (email, obj_type, obj_id)) @autocommit def cook_request(self, obj_type, obj_id, *, sticky=False, email=None, cursor=None): """Main entry point for cooking requests. This starts a cooking task if needed, and add the given e-mail to the notify list""" obj_id = hashutil.hash_to_bytes(obj_id) info = self.task_info(obj_type, obj_id) # If there's a failed bundle entry, delete it first. if info is not None and info['task_status'] == 'failed': cursor.execute('''DELETE FROM vault_bundle WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) self.commit() info = None # If there's no bundle entry, create the task. if info is None: self.create_task(obj_type, obj_id, sticky) if email is not None: # If the task is already done, send the email directly if info is not None and info['task_status'] == 'done': self.send_notification(None, email, obj_type, obj_id, info['task_status']) # Else, add it to the notification queue else: self.add_notif_email(obj_type, obj_id, email) info = self.task_info(obj_type, obj_id) return info + @autocommit + def batch_cook(self, batch, cursor=None): + """Cook a batch of bundles and returns the cooking id.""" + cursor.execute('''INSERT INTO vault_batch (id) VALUES (DEFAULT) + RETURNING id''') + batch_id = cursor.fetchone()['id'] + batch = batch_to_bytes(batch) + + # Ideally, if we start to do a lot of batch inserts and performance + # becomes an issue, we should be able to rewrite all the following + # function calls to work on batches. It requires a significant amount + # of work (using UPDATE FROM to update task_id, using DELETE with tuple + # unpacking, etc), so we're doing a simple loop in the meantime. + for obj_type, obj_id in batch: + info = self.cook_request(obj_type, obj_id) + cursor.execute('''INSERT INTO vault_batch_bundle + (batch_id, bundle_id) + VALUES (%s, %s)''', (batch_id, info['id'])) + return batch_id + + @autocommit + def batch_info(self, batch_id, cursor=None): + """Fetch information from a batch of bundles""" + cursor.execute(''' + SELECT vault_bundle.id as id, + type, object_id, task_id, task_status, sticky, + ts_created, ts_done, ts_last_access, progress_msg + FROM vault_batch_bundle + LEFT JOIN vault_bundle ON vault_bundle.id = bundle_id + WHERE batch_id = %s''', (batch_id,)) + res = cursor.fetchall() + if res: + for d in res: + d['object_id'] = bytes(d['object_id']) + return res + @autocommit def is_available(self, obj_type, obj_id, cursor=None): """Check whether a bundle is available for retrieval""" info = self.task_info(obj_type, obj_id, cursor=cursor) return (info is not None and info['task_status'] == 'done' and self.cache.is_cached(obj_type, obj_id)) @autocommit def fetch(self, obj_type, obj_id, cursor=None): """Retrieve a bundle from the cache""" if not self.is_available(obj_type, obj_id, cursor=cursor): return None self.update_access_ts(obj_type, obj_id, cursor=cursor) return self.cache.get(obj_type, obj_id) @autocommit def update_access_ts(self, obj_type, obj_id, cursor=None): """Update the last access timestamp of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' UPDATE vault_bundle SET ts_last_access = NOW() WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) @autocommit def set_status(self, obj_type, obj_id, status, cursor=None): """Set the cooking status of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) req = (''' UPDATE vault_bundle SET task_status = %s ''' + (''', ts_done = NOW() ''' if status == 'done' else '') + '''WHERE type = %s AND object_id = %s''') cursor.execute(req, (status, obj_type, obj_id)) @autocommit def set_progress(self, obj_type, obj_id, progress, cursor=None): """Set the cooking progress of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' UPDATE vault_bundle SET progress_msg = %s WHERE type = %s AND object_id = %s''', (progress, obj_type, obj_id)) @autocommit def send_all_notifications(self, obj_type, obj_id, cursor=None): """Send all the e-mails in the notification list of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' SELECT vault_notif_email.id AS id, email, task_status, progress_msg FROM vault_notif_email INNER JOIN vault_bundle ON bundle_id = vault_bundle.id WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s''', (obj_type, obj_id)) for d in cursor: self.send_notification(d['id'], d['email'], obj_type, obj_id, status=d['task_status'], progress_msg=d['progress_msg']) @autocommit def send_notification(self, n_id, email, obj_type, obj_id, status, progress_msg=None, cursor=None): """Send the notification of a bundle to a specific e-mail""" hex_id = hashutil.hash_to_hex(obj_id) short_id = hex_id[:7] # TODO: instead of hardcoding this, we should probably: # * add a "fetch_url" field in the vault_notif_email table # * generate the url with flask.url_for() on the web-ui side # * send this url as part of the cook request and store it in # the table # * use this url for the notification e-mail url = ('https://archive.softwareheritage.org/api/1/vault/{}/{}/' 'raw'.format(obj_type, hex_id)) if status == 'done': text = NOTIF_EMAIL_BODY_SUCCESS.strip() text = text.format(obj_type=obj_type, hex_id=hex_id, url=url) msg = MIMEText(text) msg['Subject'] = (NOTIF_EMAIL_SUBJECT_SUCCESS .format(obj_type=obj_type, short_id=short_id)) elif status == 'failed': text = NOTIF_EMAIL_BODY_FAILURE.strip() text = text.format(obj_type=obj_type, hex_id=hex_id, progress_msg=progress_msg) msg = MIMEText(text) msg['Subject'] = (NOTIF_EMAIL_SUBJECT_FAILURE .format(obj_type=obj_type, short_id=short_id)) else: raise RuntimeError("send_notification called on a '{}' bundle" .format(status)) msg['From'] = NOTIF_EMAIL_FROM msg['To'] = email self._smtp_send(msg) if n_id is not None: cursor.execute(''' DELETE FROM vault_notif_email WHERE id = %s''', (n_id,)) def _smtp_send(self, msg): # Reconnect if needed try: status = self.smtp_server.noop()[0] except: # smtplib.SMTPServerDisconnected status = -1 if status != 250: self.smtp_server.connect() # Send the message self.smtp_server.send_message(msg) @autocommit def _cache_expire(self, cond, *args, cursor=None): """Low-level expiration method, used by cache_expire_* methods""" # Embedded SELECT query to be able to use ORDER BY and LIMIT cursor.execute(''' DELETE FROM vault_bundle WHERE ctid IN ( SELECT ctid FROM vault_bundle WHERE sticky = false {} ) RETURNING type, object_id '''.format(cond), args) for d in cursor: self.cache.delete(d['type'], bytes(d['object_id'])) @autocommit def cache_expire_oldest(self, n=1, by='last_access', cursor=None): """Expire the `n` oldest bundles""" assert by in ('created', 'done', 'last_access') filter = '''ORDER BY ts_{} LIMIT {}'''.format(by, n) return self._cache_expire(filter) @autocommit def cache_expire_until(self, date, by='last_access', cursor=None): """Expire all the bundles until a certain date""" assert by in ('created', 'done', 'last_access') filter = '''AND ts_{} <= %s'''.format(by) return self._cache_expire(filter, date) diff --git a/version.txt b/version.txt index e520c20..7be315d 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.12-0-g1c6f861 \ No newline at end of file +v0.0.13-0-g5d84e26 \ No newline at end of file