diff --git a/PKG-INFO b/PKG-INFO index 03be118..733f6d1 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.vault -Version: 0.0.16 +Version: 0.0.17 Summary: Software Heritage vault Home-page: https://forge.softwareheritage.org/diffusion/DVAU/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/sql/swh-vault-schema.sql b/sql/swh-vault-schema.sql index b7dad47..1d477f1 100644 --- a/sql/swh-vault-schema.sql +++ b/sql/swh-vault-schema.sql @@ -1,59 +1,59 @@ create table dbversion ( version int primary key, release timestamptz not null, description text not null ); comment on table dbversion is 'Schema update tracking'; insert into dbversion (version, release, description) values (1, now(), 'Initial version'); create domain obj_hash as bytea; create type cook_type as enum ('directory', 'revision_gitfast'); comment on type cook_type is 'Type of the requested bundle'; create type cook_status as enum ('new', 'pending', 'done', 'failed'); comment on type cook_status is 'Status of the cooking'; create table vault_bundle ( id bigserial primary key, type cook_type not null, -- requested cooking type object_id obj_hash not null, -- requested object ID task_id integer, -- scheduler task id task_status cook_status not null default 'new', -- status of the task sticky boolean not null default false, -- bundle cannot expire ts_created timestamptz not null default now(), -- timestamp of creation ts_done timestamptz, -- timestamp of the cooking result ts_last_access timestamptz not null default now(), -- last access progress_msg text -- progress message ); create unique index concurrently vault_bundle_type_object on vault_bundle (type, object_id); create index concurrently vault_bundle_task_id on vault_bundle (task_id); create table vault_notif_email ( id bigserial primary key, email text not null, -- e-mail to notify - bundle_id bigint not null references vault_bundle(id) + bundle_id bigint not null references vault_bundle(id) on delete cascade ); create index concurrently vault_notif_email_bundle on vault_notif_email (bundle_id); create index concurrently vault_notif_email_email on vault_notif_email (email); create table vault_batch ( id bigserial primary key ); create table vault_batch_bundle ( - batch_id bigint not null references vault_batch(id), - bundle_id bigint not null references vault_bundle(id) + batch_id bigint not null references vault_batch(id) on delete cascade, + bundle_id bigint not null references vault_bundle(id) on delete cascade ); create unique index concurrently vault_batch_bundle_pkey on vault_batch_bundle (batch_id, bundle_id); diff --git a/sql/upgrades/003.sql b/sql/upgrades/003.sql new file mode 100644 index 0000000..309bf3a --- /dev/null +++ b/sql/upgrades/003.sql @@ -0,0 +1,28 @@ +-- SWH DB schema upgrade +-- from_version: 002 +-- to_version: 003 +-- description: Add delete cascade conditions + +insert into dbversion(version, release, description) + values(3, now(), 'Add delete cascade conditions'); + +alter table vault_notif_email + drop constraint "vault_notif_email_bundle_id_fkey", + add constraint "vault_notif_email_bundle_id_fkey" + foreign key(bundle_id) + references vault_bundle + on delete cascade; + +alter table vault_batch_bundle + drop constraint "vault_batch_bundle_bundle_id_fkey", + add constraint "vault_batch_bundle_bundle_id_fkey" + foreign key(bundle_id) + references vault_bundle + on delete cascade; + +alter table vault_batch_bundle + drop constraint "vault_batch_bundle_batch_id_fkey", + add constraint "vault_batch_bundle_batch_id_fkey" + foreign key(batch_id) + references vault_batch + on delete cascade; diff --git a/swh.vault.egg-info/PKG-INFO b/swh.vault.egg-info/PKG-INFO index 03be118..733f6d1 100644 --- a/swh.vault.egg-info/PKG-INFO +++ b/swh.vault.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.vault -Version: 0.0.16 +Version: 0.0.17 Summary: Software Heritage vault Home-page: https://forge.softwareheritage.org/diffusion/DVAU/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.vault.egg-info/SOURCES.txt b/swh.vault.egg-info/SOURCES.txt index 357f62a..57166d7 100644 --- a/swh.vault.egg-info/SOURCES.txt +++ b/swh.vault.egg-info/SOURCES.txt @@ -1,50 +1,51 @@ .gitignore AUTHORS LICENSE MANIFEST.in Makefile requirements-swh.txt requirements.txt setup.py version.txt debian/changelog debian/compat debian/control debian/copyright debian/rules debian/source/format docs/.gitignore docs/Makefile docs/api.rst docs/conf.py docs/getting-started.rst docs/index.rst docs/_static/.placeholder docs/_templates/.placeholder sql/swh-vault-schema.sql sql/upgrades/002.sql +sql/upgrades/003.sql swh/__init__.py swh.vault.egg-info/PKG-INFO swh.vault.egg-info/SOURCES.txt swh.vault.egg-info/dependency_links.txt swh.vault.egg-info/not-zip-safe swh.vault.egg-info/requires.txt swh.vault.egg-info/top_level.txt swh/vault/__init__.py swh/vault/backend.py swh/vault/cache.py swh/vault/cooking_tasks.py swh/vault/to_disk.py swh/vault/api/__init__.py swh/vault/api/client.py swh/vault/api/server.py swh/vault/cookers/__init__.py swh/vault/cookers/base.py swh/vault/cookers/directory.py swh/vault/cookers/revision_flat.py swh/vault/cookers/revision_gitfast.py swh/vault/tests/test_backend.py swh/vault/tests/test_cache.py swh/vault/tests/test_cookers.py swh/vault/tests/test_cookers_base.py swh/vault/tests/vault_testing.py \ No newline at end of file diff --git a/swh/vault/backend.py b/swh/vault/backend.py index 1eac7a8..12200fc 100644 --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -1,441 +1,461 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import psycopg2 -import psycopg2.extras import smtplib +import psycopg2 +from psycopg2.extras import RealDictCursor from functools import wraps from email.mime.text import MIMEText from swh.model import hashutil from swh.scheduler.backend import SchedulerBackend from swh.scheduler.utils import create_oneshot_task_dict from swh.vault.cache import VaultCache from swh.vault.cookers import get_cooker from swh.vault.cooking_tasks import SWHCookingTask # noqa cooking_task_name = 'swh.vault.cooking_tasks.SWHCookingTask' NOTIF_EMAIL_FROM = ('"Software Heritage Vault" ' '') NOTIF_EMAIL_SUBJECT_SUCCESS = ("Bundle ready: {obj_type} {short_id}") NOTIF_EMAIL_SUBJECT_FAILURE = ("Bundle failed: {obj_type} {short_id}") NOTIF_EMAIL_BODY_SUCCESS = """ You have requested the following bundle from the Software Heritage Vault: Object Type: {obj_type} Object ID: {hex_id} This bundle is now available for download at the following address: {url} Please keep in mind that this link might expire at some point, in which case you will need to request the bundle again. --\x20 The Software Heritage Developers """ NOTIF_EMAIL_BODY_FAILURE = """ You have requested the following bundle from the Software Heritage Vault: Object Type: {obj_type} Object ID: {hex_id} This bundle could not be cooked for the following reason: {progress_msg} We apologize for the inconvenience. --\x20 The Software Heritage Developers """ class NotFoundExc(Exception): """Bundle was not found.""" pass def batch_to_bytes(batch): return [(obj_type, hashutil.hash_to_bytes(obj_id)) for obj_type, obj_id in batch] # TODO: Imported from swh.scheduler.backend. Factorization needed. def autocommit(fn): @wraps(fn) def wrapped(self, *args, **kwargs): autocommit = False # TODO: I don't like using None, it's confusing for the user. how about # a NEW_CURSOR object()? if 'cursor' not in kwargs or not kwargs['cursor']: autocommit = True kwargs['cursor'] = self.cursor() try: ret = fn(self, *args, **kwargs) except BaseException: if autocommit: self.rollback() raise if autocommit: self.commit() return ret return wrapped # TODO: This has to be factorized with other database base classes and helpers # (swh.scheduler.backend.SchedulerBackend, swh.storage.db.BaseDb, ...) # The three first methods are imported from swh.scheduler.backend. class VaultBackend: """ Backend for the Software Heritage vault. """ def __init__(self, config): self.config = config self.cache = VaultCache(self.config['cache']) self.db = None self.reconnect() self.smtp_server = smtplib.SMTP() if self.config['scheduling_db'] is not None: self.scheduler = SchedulerBackend( scheduling_db=self.config['scheduling_db']) def reconnect(self): """Reconnect to the database.""" if not self.db or self.db.closed: self.db = psycopg2.connect( dsn=self.config['db'], - cursor_factory=psycopg2.extras.RealDictCursor, + cursor_factory=RealDictCursor, ) def close(self): """Close the underlying database connection.""" self.db.close() def cursor(self): """Return a fresh cursor on the database, with auto-reconnection in case of failure""" cur = None # Get a fresh cursor and reconnect at most three times tries = 0 while True: tries += 1 try: cur = self.db.cursor() cur.execute('select 1') break except psycopg2.OperationalError: if tries < 3: self.reconnect() else: raise return cur def commit(self): """Commit a transaction""" self.db.commit() def rollback(self): """Rollback a transaction""" self.db.rollback() @autocommit def task_info(self, obj_type, obj_id, cursor=None): """Fetch information from a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' SELECT id, type, object_id, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_bundle WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) res = cursor.fetchone() if res: res['object_id'] = bytes(res['object_id']) return res def _send_task(self, args): """Send a cooking task to the celery scheduler""" task = create_oneshot_task_dict('swh-vault-cooking', *args) added_tasks = self.scheduler.create_tasks([task]) return added_tasks[0]['id'] - # TODO: remove once the scheduler handles priority tasks - def _send_batch_task(self, args): - """Send a cooking task to the celery scheduler""" - task = create_oneshot_task_dict('swh-vault-batch-cooking', *args) - added_tasks = self.scheduler.create_tasks([task]) - return added_tasks[0]['id'] - @autocommit - def create_task(self, obj_type, obj_id, sticky=False, batch=False, - cursor=None): + def create_task(self, obj_type, obj_id, sticky=False, cursor=None): """Create and send a cooking task""" obj_id = hashutil.hash_to_bytes(obj_id) hex_id = hashutil.hash_to_hex(obj_id) args = [obj_type, hex_id] - # Don't check all the elements of the batch locally to avoid rtt - if not batch: - backend_storage_config = {'storage': self.config['storage']} - cooker_class = get_cooker(obj_type) - cooker = cooker_class(*args, override_cfg=backend_storage_config) - if not cooker.check_exists(): - raise NotFoundExc("Object {} was not found.".format(hex_id)) + backend_storage_config = {'storage': self.config['storage']} + cooker_class = get_cooker(obj_type) + cooker = cooker_class(*args, override_cfg=backend_storage_config) + if not cooker.check_exists(): + raise NotFoundExc("Object {} was not found.".format(hex_id)) cursor.execute(''' INSERT INTO vault_bundle (type, object_id, sticky) VALUES (%s, %s, %s)''', (obj_type, obj_id, sticky)) self.commit() - # TODO: change once the scheduler handles priority tasks - if batch: - task_id = self._send_batch_task(args) - else: - task_id = self._send_task(args) + task_id = self._send_task(args) cursor.execute(''' UPDATE vault_bundle SET task_id = %s WHERE type = %s AND object_id = %s''', (task_id, obj_type, obj_id)) @autocommit def add_notif_email(self, obj_type, obj_id, email, cursor=None): """Add an e-mail address to notify when a given bundle is ready""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' INSERT INTO vault_notif_email (email, bundle_id) VALUES (%s, (SELECT id FROM vault_bundle WHERE type = %s AND object_id = %s))''', (email, obj_type, obj_id)) @autocommit - def cook_request(self, obj_type, obj_id, *, sticky=False, batch=False, + def cook_request(self, obj_type, obj_id, *, sticky=False, email=None, cursor=None): """Main entry point for cooking requests. This starts a cooking task if needed, and add the given e-mail to the notify list""" obj_id = hashutil.hash_to_bytes(obj_id) info = self.task_info(obj_type, obj_id) # If there's a failed bundle entry, delete it first. if info is not None and info['task_status'] == 'failed': cursor.execute('''DELETE FROM vault_bundle WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) self.commit() info = None # If there's no bundle entry, create the task. if info is None: - self.create_task(obj_type, obj_id, sticky, batch=batch) + self.create_task(obj_type, obj_id, sticky) if email is not None: # If the task is already done, send the email directly if info is not None and info['task_status'] == 'done': self.send_notification(None, email, obj_type, obj_id, info['task_status']) # Else, add it to the notification queue else: self.add_notif_email(obj_type, obj_id, email) info = self.task_info(obj_type, obj_id) return info @autocommit def batch_cook(self, batch, cursor=None): """Cook a batch of bundles and returns the cooking id.""" - cursor.execute('''INSERT INTO vault_batch (id) VALUES (DEFAULT) - RETURNING id''') + # Import execute_values at runtime only, because it requires + # psycopg2 >= 2.7 (only available on postgresql servers) + from psycopg2.extras import execute_values + + cursor.execute(''' + INSERT INTO vault_batch (id) + VALUES (DEFAULT) + RETURNING id''') batch_id = cursor.fetchone()['id'] batch = batch_to_bytes(batch) - # Ideally, if we start to do a lot of batch inserts and performance - # becomes an issue, we should be able to rewrite all the following - # function calls to work on batches. It requires a significant amount - # of work (using UPDATE FROM to update task_id, using DELETE with tuple - # unpacking, etc), so we're doing a simple loop in the meantime. - for obj_type, obj_id in batch: - info = self.cook_request(obj_type, obj_id, batch=True) - cursor.execute('''INSERT INTO vault_batch_bundle - (batch_id, bundle_id) - VALUES (%s, %s) ON CONFLICT DO NOTHING''', - (batch_id, info['id'])) + # Delete all failed bundles from the batch + cursor.execute(''' + DELETE FROM vault_bundle + WHERE (type, object_id) IN %s''', (tuple(batch),)) + + # Insert all the bundles + execute_values(cursor, ''' + INSERT INTO vault_bundle (type, object_id) + VALUES %s ON CONFLICT DO NOTHING RETURNING id''', batch) + + # Insert the batch-bundle entries + bundle_ids = cursor.fetchall() + batch_id_bundle_ids = [(batch_id, row['id']) for row in bundle_ids] + execute_values(cursor, ''' + INSERT INTO vault_batch_bundle (batch_id, bundle_id) + VALUES %s ON CONFLICT DO NOTHING''', + batch_id_bundle_ids) + self.commit() + + # Send the tasks + args_batch = [(obj_type, hashutil.hash_to_hex(obj_id)) + for obj_type, obj_id in batch] + # TODO: change once the scheduler handles priority tasks + tasks = [create_oneshot_task_dict('swh-vault-batch-cooking', *args) + for args in args_batch] + + added_tasks = self.scheduler.create_tasks(tasks) + tasks_ids_bundle_ids = zip([task['id'] for task in added_tasks], batch) + tasks_ids_bundle_ids = [(task_id, obj_type, obj_id) + for task_id, (obj_type, obj_id) + in tasks_ids_bundle_ids] + + # Update the task ids + execute_values(cursor, ''' + UPDATE vault_bundle + SET task_id = s_task_id + FROM (VALUES %s) AS sub (s_task_id, s_type, s_object_id) + WHERE type = s_type::cook_type AND object_id = s_object_id ''', + tasks_ids_bundle_ids) return batch_id @autocommit def batch_info(self, batch_id, cursor=None): """Fetch information from a batch of bundles""" cursor.execute(''' SELECT vault_bundle.id as id, type, object_id, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_batch_bundle LEFT JOIN vault_bundle ON vault_bundle.id = bundle_id WHERE batch_id = %s''', (batch_id,)) res = cursor.fetchall() if res: for d in res: d['object_id'] = bytes(d['object_id']) return res @autocommit def is_available(self, obj_type, obj_id, cursor=None): """Check whether a bundle is available for retrieval""" info = self.task_info(obj_type, obj_id, cursor=cursor) return (info is not None and info['task_status'] == 'done' and self.cache.is_cached(obj_type, obj_id)) @autocommit def fetch(self, obj_type, obj_id, cursor=None): """Retrieve a bundle from the cache""" if not self.is_available(obj_type, obj_id, cursor=cursor): return None self.update_access_ts(obj_type, obj_id, cursor=cursor) return self.cache.get(obj_type, obj_id) @autocommit def update_access_ts(self, obj_type, obj_id, cursor=None): """Update the last access timestamp of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' UPDATE vault_bundle SET ts_last_access = NOW() WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) @autocommit def set_status(self, obj_type, obj_id, status, cursor=None): """Set the cooking status of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) req = (''' UPDATE vault_bundle SET task_status = %s ''' + (''', ts_done = NOW() ''' if status == 'done' else '') + '''WHERE type = %s AND object_id = %s''') cursor.execute(req, (status, obj_type, obj_id)) @autocommit def set_progress(self, obj_type, obj_id, progress, cursor=None): """Set the cooking progress of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' UPDATE vault_bundle SET progress_msg = %s WHERE type = %s AND object_id = %s''', (progress, obj_type, obj_id)) @autocommit def send_all_notifications(self, obj_type, obj_id, cursor=None): """Send all the e-mails in the notification list of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' SELECT vault_notif_email.id AS id, email, task_status, progress_msg FROM vault_notif_email INNER JOIN vault_bundle ON bundle_id = vault_bundle.id WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s''', (obj_type, obj_id)) for d in cursor: self.send_notification(d['id'], d['email'], obj_type, obj_id, status=d['task_status'], progress_msg=d['progress_msg']) @autocommit def send_notification(self, n_id, email, obj_type, obj_id, status, progress_msg=None, cursor=None): """Send the notification of a bundle to a specific e-mail""" hex_id = hashutil.hash_to_hex(obj_id) short_id = hex_id[:7] # TODO: instead of hardcoding this, we should probably: # * add a "fetch_url" field in the vault_notif_email table # * generate the url with flask.url_for() on the web-ui side # * send this url as part of the cook request and store it in # the table # * use this url for the notification e-mail url = ('https://archive.softwareheritage.org/api/1/vault/{}/{}/' 'raw'.format(obj_type, hex_id)) if status == 'done': text = NOTIF_EMAIL_BODY_SUCCESS.strip() text = text.format(obj_type=obj_type, hex_id=hex_id, url=url) msg = MIMEText(text) msg['Subject'] = (NOTIF_EMAIL_SUBJECT_SUCCESS .format(obj_type=obj_type, short_id=short_id)) elif status == 'failed': text = NOTIF_EMAIL_BODY_FAILURE.strip() text = text.format(obj_type=obj_type, hex_id=hex_id, progress_msg=progress_msg) msg = MIMEText(text) msg['Subject'] = (NOTIF_EMAIL_SUBJECT_FAILURE .format(obj_type=obj_type, short_id=short_id)) else: raise RuntimeError("send_notification called on a '{}' bundle" .format(status)) msg['From'] = NOTIF_EMAIL_FROM msg['To'] = email self._smtp_send(msg) if n_id is not None: cursor.execute(''' DELETE FROM vault_notif_email WHERE id = %s''', (n_id,)) def _smtp_send(self, msg): # Reconnect if needed try: status = self.smtp_server.noop()[0] except smtplib.SMTPException: status = -1 if status != 250: self.smtp_server.connect('localhost', 25) # Send the message self.smtp_server.send_message(msg) @autocommit def _cache_expire(self, cond, *args, cursor=None): """Low-level expiration method, used by cache_expire_* methods""" # Embedded SELECT query to be able to use ORDER BY and LIMIT cursor.execute(''' DELETE FROM vault_bundle WHERE ctid IN ( SELECT ctid FROM vault_bundle WHERE sticky = false {} ) RETURNING type, object_id '''.format(cond), args) for d in cursor: self.cache.delete(d['type'], bytes(d['object_id'])) @autocommit def cache_expire_oldest(self, n=1, by='last_access', cursor=None): """Expire the `n` oldest bundles""" assert by in ('created', 'done', 'last_access') filter = '''ORDER BY ts_{} LIMIT {}'''.format(by, n) return self._cache_expire(filter) @autocommit def cache_expire_until(self, date, by='last_access', cursor=None): """Expire all the bundles until a certain date""" assert by in ('created', 'done', 'last_access') filter = '''AND ts_{} <= %s'''.format(by) return self._cache_expire(filter, date) diff --git a/version.txt b/version.txt index de6836c..2b20985 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.16-0-g06658e1 \ No newline at end of file +v0.0.17-0-gde4caa1 \ No newline at end of file