diff --git a/sql/swh-vault-schema.sql b/sql/swh-vault-schema.sql index 43d8c9f..f41311f 100644 --- a/sql/swh-vault-schema.sql +++ b/sql/swh-vault-schema.sql @@ -1,43 +1,43 @@ create table dbversion ( version int primary key, release timestamptz not null, description text not null ); comment on table dbversion is 'Schema update tracking'; insert into dbversion (version, release, description) values (1, now(), 'Initial version'); create domain obj_hash as bytea; create type cook_type as enum ('directory', 'revision_gitfast'); comment on type cook_type is 'Type of the requested bundle'; create type cook_status as enum ('new', 'pending', 'done'); comment on type cook_status is 'Status of the cooking'; create table vault_bundle ( id bigserial primary key, type cook_type not null, -- requested cooking type object_id obj_hash not null, -- requested object ID task_uuid uuid not null, -- celery UUID of the cooking task task_status cook_status not null default 'new', -- status of the task - permanent boolean not null default false, -- bundles never deleted + sticky boolean not null default false, -- bundle cannot expire ts_created timestamptz not null default now(), -- timestamp of creation ts_done timestamptz, -- timestamp of the cooking result ts_last_access timestamptz not null default now(), -- last access progress_msg text, -- progress message unique(type, object_id) ); create table vault_notif_email ( id bigserial primary key, email text not null, -- e-mail to notify bundle_id bigint not null references vault_bundle(id) ); diff --git a/swh/vault/backend.py b/swh/vault/backend.py index b45b31a..a085a97 100644 --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -1,289 +1,290 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import smtplib import celery import psycopg2 import psycopg2.extras from functools import wraps from email.mime.text import MIMEText from swh.model import hashutil from swh.scheduler.utils import get_task from swh.vault.cache import VaultCache from swh.vault.cookers import get_cooker from swh.vault.cooking_tasks import SWHCookingTask # noqa cooking_task_name = 'swh.vault.cooking_tasks.SWHCookingTask' NOTIF_EMAIL_FROM = ('"Software Heritage Vault" ' '') NOTIF_EMAIL_SUBJECT = ("Bundle ready: {obj_type} {short_id}") NOTIF_EMAIL_BODY = """ You have requested the following bundle from the Software Heritage Vault: Object Type: {obj_type} Object ID: {hex_id} This bundle is now available for download at the following address: {url} Please keep in mind that this link might expire at some point, in which case you will need to request the bundle again. --\x20 The Software Heritage Developers """ # TODO: Imported from swh.scheduler.backend. Factorization needed. def autocommit(fn): @wraps(fn) def wrapped(self, *args, **kwargs): autocommit = False # TODO: I don't like using None, it's confusing for the user. how about # a NEW_CURSOR object()? if 'cursor' not in kwargs or not kwargs['cursor']: autocommit = True kwargs['cursor'] = self.cursor() try: ret = fn(self, *args, **kwargs) except: if autocommit: self.rollback() raise if autocommit: self.commit() return ret return wrapped # TODO: This has to be factorized with other database base classes and helpers # (swh.scheduler.backend.SchedulerBackend, swh.storage.db.BaseDb, ...) # The three first methods are imported from swh.scheduler.backend. class VaultBackend: """ Backend for the Software Heritage vault. """ def __init__(self, config): self.config = config self.cache = VaultCache(**self.config['cache']) self.db = None self.reconnect() self.smtp_server = smtplib.SMTP('localhost') def reconnect(self): if not self.db or self.db.closed: self.db = psycopg2.connect( dsn=self.config['vault_db'], cursor_factory=psycopg2.extras.RealDictCursor, ) def close(self): self.db.close() def cursor(self): """Return a fresh cursor on the database, with auto-reconnection in case of failure""" cur = None # Get a fresh cursor and reconnect at most three times tries = 0 while True: tries += 1 try: cur = self.db.cursor() cur.execute('select 1') break except psycopg2.OperationalError: if tries < 3: self.reconnect() else: raise return cur def commit(self): """Commit a transaction""" self.db.commit() def rollback(self): """Rollback a transaction""" self.db.rollback() @autocommit def task_info(self, obj_type, obj_id, cursor=None): obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' - SELECT id, type, object_id, task_uuid, task_status, permanent, + SELECT id, type, object_id, task_uuid, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_bundle WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) res = cursor.fetchone() if res: res['object_id'] = bytes(res['object_id']) return res def _send_task(task_uuid, args): task = get_task(cooking_task_name) task.apply_async(args, task_id=task_uuid) @autocommit - def create_task(self, obj_type, obj_id, permanent=False, cursor=None): + def create_task(self, obj_type, obj_id, sticky=False, cursor=None): obj_id = hashutil.hash_to_bytes(obj_id) args = [self.config, obj_type, obj_id] CookerCls = get_cooker(obj_type) cooker = CookerCls(*args) cooker.check_exists() task_uuid = celery.uuid() cursor.execute(''' - INSERT INTO vault_bundle (type, object_id, task_uuid, permanent) + INSERT INTO vault_bundle (type, object_id, task_uuid, sticky) VALUES (%s, %s, %s, %s)''', - (obj_type, obj_id, task_uuid, permanent)) + (obj_type, obj_id, task_uuid, sticky)) self.commit() self._send_task(task_uuid, args) @autocommit def add_notif_email(self, obj_type, obj_id, email, cursor=None): obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' INSERT INTO vault_notif_email (email, bundle_id) VALUES (%s, (SELECT id FROM vault_bundle WHERE type = %s AND object_id = %s))''', (email, obj_type, obj_id)) @autocommit - def cook_request(self, obj_type, obj_id, *, permanent=False, + def cook_request(self, obj_type, obj_id, *, sticky=False, email=None, cursor=None): info = self.task_info(obj_type, obj_id) if info is None: - self.create_task(obj_type, obj_id, permanent) + self.create_task(obj_type, obj_id, sticky) if email is not None: if info is not None and info['task_status'] == 'done': self.send_notification(None, email, obj_type, obj_id) else: self.add_notif_email(obj_type, obj_id, email) info = self.task_info(obj_type, obj_id) return info @autocommit def is_available(self, obj_type, obj_id, cursor=None): info = self.task_info(obj_type, obj_id, cursor=cursor) return (info is not None and info['task_status'] == 'done' and self.cache.is_cached(obj_type, obj_id)) @autocommit def fetch(self, obj_type, obj_id, cursor=None): if not self.is_available(obj_type, obj_id, cursor=cursor): return None self.update_access_ts(obj_type, obj_id, cursor=cursor) return self.cache.get(obj_type, obj_id) @autocommit def update_access_ts(self, obj_type, obj_id, cursor=None): obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' UPDATE vault_bundle SET ts_last_access = NOW() WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) @autocommit def set_status(self, obj_type, obj_id, status, cursor=None): obj_id = hashutil.hash_to_bytes(obj_id) req = (''' UPDATE vault_bundle SET task_status = %s ''' + (''', ts_done = NOW() ''' if status == 'done' else '') + '''WHERE type = %s AND object_id = %s''') cursor.execute(req, (status, obj_type, obj_id)) @autocommit def set_progress(self, obj_type, obj_id, progress, cursor=None): obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' UPDATE vault_bundle SET progress_msg = %s WHERE type = %s AND object_id = %s''', (progress, obj_type, obj_id)) @autocommit def send_all_notifications(self, obj_type, obj_id, cursor=None): obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' SELECT vault_notif_email.id AS id, email FROM vault_notif_email INNER JOIN vault_bundle ON bundle_id = vault_bundle.id WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s''', (obj_type, obj_id)) for d in cursor: self.send_notification(d['id'], d['email'], obj_type, obj_id) @autocommit def send_notification(self, n_id, email, obj_type, obj_id, cursor=None): hex_id = hashutil.hash_to_hex(obj_id) short_id = hex_id[:7] # TODO: instead of hardcoding this, we should probably: # * add a "fetch_url" field in the vault_notif_email table # * generate the url with flask.url_for() on the web-ui side # * send this url as part of the cook request and store it in # the table # * use this url for the notification e-mail url = ('https://archive.softwareheritage.org/api/1/vault/{}/{}/' 'raw'.format(obj_type, hex_id)) text = NOTIF_EMAIL_BODY.strip() text = text.format(obj_type=obj_type, hex_id=hex_id, url=url) msg = MIMEText(text) msg['Subject'] = (NOTIF_EMAIL_SUBJECT .format(obj_type=obj_type, short_id=short_id)) msg['From'] = NOTIF_EMAIL_FROM msg['To'] = email self.smtp_server.send_message(msg) if n_id is not None: cursor.execute(''' DELETE FROM vault_notif_email WHERE id = %s''', (n_id,)) @autocommit def _cache_expire(self, cond, *args, cursor=None): + """Low-level expiration method, used by cache_expire_* methods""" # Embedded SELECT query to be able to use ORDER BY and LIMIT cursor.execute(''' DELETE FROM vault_bundle WHERE ctid IN ( SELECT ctid FROM vault_bundle - WHERE permanent = false + WHERE sticky = false {} ) RETURNING type, object_id '''.format(cond), args) for d in cursor: self.cache.delete(d['type'], bytes(d['object_id'])) @autocommit - def cache_expire_count(self, n=1, by='last_access', cursor=None): + def cache_expire_oldest(self, n=1, by='last_access', cursor=None): assert by in ('created', 'done', 'last_access') filter = '''ORDER BY ts_{} LIMIT {}'''.format(by, n) return self._cache_expire(filter) @autocommit def cache_expire_until(self, date, by='last_access', cursor=None): assert by in ('created', 'done', 'last_access') filter = '''AND ts_{} <= %s'''.format(by) return self._cache_expire(filter, date) diff --git a/swh/vault/tests/test_backend.py b/swh/vault/tests/test_backend.py index b0068f7..38f9fcd 100644 --- a/swh/vault/tests/test_backend.py +++ b/swh/vault/tests/test_backend.py @@ -1,290 +1,290 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import contextlib import datetime import psycopg2 import unittest from unittest.mock import patch from swh.core.tests.db_testing import DbTestFixture from swh.model import hashutil from swh.storage.tests.storage_testing import StorageTestFixture from swh.vault.tests.vault_testing import VaultTestFixture class BaseTestBackend(VaultTestFixture, StorageTestFixture, DbTestFixture): @contextlib.contextmanager def mock_cooking(self): with patch.object(self.vault_backend, '_send_task') as mt: with patch('swh.vault.backend.get_cooker') as mg: mcc = unittest.mock.MagicMock() mc = unittest.mock.MagicMock() mg.return_value = mcc mcc.return_value = mc mc.check_exists.return_value = True yield {'send_task': mt, 'get_cooker': mg, 'cooker_cls': mcc, 'cooker': mc} def assertTimestampAlmostNow(self, ts, tolerance_secs=1.0): now = datetime.datetime.now(datetime.timezone.utc) creation_delta_secs = (ts - now).total_seconds() self.assertLess(creation_delta_secs, tolerance_secs) def hash_content(self, content): obj_id = hashutil.hash_data(content)['sha1'] return content, obj_id - def fake_cook(self, obj_type, result_content, permanent=False): + def fake_cook(self, obj_type, result_content, sticky=False): content, obj_id = self.hash_content(result_content) with self.mock_cooking(): - self.vault_backend.create_task(obj_type, obj_id, permanent) + self.vault_backend.create_task(obj_type, obj_id, sticky) self.vault_backend.cache.add(obj_type, obj_id, b'content') self.vault_backend.set_status(obj_type, obj_id, 'done') return obj_id, content TEST_TYPE = 'revision_gitfast' TEST_HEX_ID = '4a4b9771542143cf070386f86b4b92d42966bdbc' TEST_OBJ_ID = hashutil.hash_to_bytes(TEST_HEX_ID) TEST_PROGRESS = ("Mr. White, You're telling me you're cooking again?" " \N{ASTONISHED FACE} ") TEST_EMAIL = 'ouiche@example.com' class TestBackend(BaseTestBackend, unittest.TestCase): def test_create_task_simple(self): with self.mock_cooking() as m: self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) m['get_cooker'].assert_called_once_with(TEST_TYPE) args = m['cooker_cls'].call_args[0] self.assertEqual(args[0], self.vault_backend.config) self.assertEqual(args[1], TEST_TYPE) self.assertEqual(args[2], TEST_OBJ_ID) self.assertEqual(m['cooker'].check_exists.call_count, 1) self.assertEqual(m['send_task'].call_count, 1) args = m['send_task'].call_args[0][1] self.assertEqual(args[0], self.vault_backend.config) self.assertEqual(args[1], TEST_TYPE) self.assertEqual(args[2], TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['object_id'], TEST_OBJ_ID) self.assertEqual(info['type'], TEST_TYPE) self.assertEqual(str(info['task_uuid']), m['send_task'].call_args[0][0]) self.assertEqual(info['task_status'], 'new') self.assertTimestampAlmostNow(info['ts_created']) self.assertEqual(info['ts_done'], None) self.assertEqual(info['progress_msg'], None) def test_create_fail_duplicate_task(self): with self.mock_cooking(): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) with self.assertRaises(psycopg2.IntegrityError): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) def test_create_fail_nonexisting_object(self): with self.mock_cooking() as m: m['cooker'].check_exists.side_effect = ValueError('Nothing here.') with self.assertRaises(ValueError): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) def test_create_set_progress(self): with self.mock_cooking(): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['progress_msg'], None) self.vault_backend.set_progress(TEST_TYPE, TEST_OBJ_ID, TEST_PROGRESS) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['progress_msg'], TEST_PROGRESS) def test_create_set_status(self): with self.mock_cooking(): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['task_status'], 'new') self.assertEqual(info['ts_done'], None) self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'pending') info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['task_status'], 'pending') self.assertEqual(info['ts_done'], None) self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['task_status'], 'done') self.assertTimestampAlmostNow(info['ts_done']) def test_create_update_access_ts(self): with self.mock_cooking(): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) access_ts_1 = info['ts_last_access'] self.assertTimestampAlmostNow(access_ts_1) self.vault_backend.update_access_ts(TEST_TYPE, TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) access_ts_2 = info['ts_last_access'] self.assertTimestampAlmostNow(access_ts_2) self.vault_backend.update_access_ts(TEST_TYPE, TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) access_ts_3 = info['ts_last_access'] self.assertTimestampAlmostNow(access_ts_3) self.assertLess(access_ts_1, access_ts_2) self.assertLess(access_ts_2, access_ts_3) def test_cook_request_idempotent(self): with self.mock_cooking(): info1 = self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) info2 = self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) info3 = self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info1, info2) self.assertEqual(info1, info3) def test_cook_email_pending_done(self): with self.mock_cooking(), \ patch.object(self.vault_backend, 'add_notif_email') as madd, \ patch.object(self.vault_backend, 'send_notification') as msend: self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) madd.assert_not_called() msend.assert_not_called() madd.reset_mock() msend.reset_mock() self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, email=TEST_EMAIL) madd.assert_called_once_with(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL) msend.assert_not_called() madd.reset_mock() msend.reset_mock() self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, email=TEST_EMAIL) msend.assert_called_once_with(None, TEST_EMAIL, TEST_TYPE, TEST_OBJ_ID) madd.assert_not_called() def test_send_all_emails(self): with self.mock_cooking(): emails = ('a@example.com', 'billg@example.com', 'test+42@example.org') for email in emails: self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, email=email) self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') with patch.object(self.vault_backend, 'smtp_server') as m: self.vault_backend.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) sent_emails = {k[0][0] for k in m.send_message.call_args_list} self.assertEqual({k['To'] for k in sent_emails}, set(emails)) for e in sent_emails: self.assertIn('info@softwareheritage.org', e['From']) self.assertIn(TEST_TYPE, e['Subject']) self.assertIn(TEST_HEX_ID[:5], e['Subject']) self.assertIn(TEST_TYPE, str(e)) self.assertIn('https://archive.softwareheritage.org/', str(e)) self.assertIn(TEST_HEX_ID[:5], str(e)) self.assertIn('--\x20\n', str(e)) # Well-formated signature!!! # Check that the entries have been deleted and recalling the # function does not re-send the e-mails m.reset_mock() self.vault_backend.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) m.assert_not_called() def test_available(self): self.assertFalse(self.vault_backend.is_available(TEST_TYPE, TEST_OBJ_ID)) with self.mock_cooking(): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) self.assertFalse(self.vault_backend.is_available(TEST_TYPE, TEST_OBJ_ID)) self.vault_backend.cache.add(TEST_TYPE, TEST_OBJ_ID, b'content') self.assertFalse(self.vault_backend.is_available(TEST_TYPE, TEST_OBJ_ID)) self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') self.assertTrue(self.vault_backend.is_available(TEST_TYPE, TEST_OBJ_ID)) def test_fetch(self): self.assertEqual(self.vault_backend.fetch(TEST_TYPE, TEST_OBJ_ID), None) obj_id, content = self.fake_cook(TEST_TYPE, b'content') info = self.vault_backend.task_info(TEST_TYPE, obj_id) access_ts_before = info['ts_last_access'] self.assertEqual(self.vault_backend.fetch(TEST_TYPE, obj_id), b'content') info = self.vault_backend.task_info(TEST_TYPE, obj_id) access_ts_after = info['ts_last_access'] self.assertTimestampAlmostNow(access_ts_after) self.assertLess(access_ts_before, access_ts_after) - def test_cache_expire_count(self): + def test_cache_expire_oldest(self): r = range(1, 10) inserted = {} for i in r: - permanent = (i == 5) + sticky = (i == 5) content = b'content%s' % str(i).encode() - obj_id, content = self.fake_cook(TEST_TYPE, content, permanent) + obj_id, content = self.fake_cook(TEST_TYPE, content, sticky) inserted[i] = (obj_id, content) self.vault_backend.update_access_ts(TEST_TYPE, inserted[2][0]) self.vault_backend.update_access_ts(TEST_TYPE, inserted[3][0]) - self.vault_backend.cache_expire_count(n=4) + self.vault_backend.cache_expire_oldest(n=4) should_be_still_here = {2, 3, 5, 8, 9} for i in r: self.assertEqual(self.vault_backend.is_available( TEST_TYPE, inserted[i][0]), i in should_be_still_here) def test_cache_expire_until(self): r = range(1, 10) inserted = {} for i in r: - permanent = (i == 5) + sticky = (i == 5) content = b'content%s' % str(i).encode() - obj_id, content = self.fake_cook(TEST_TYPE, content, permanent) + obj_id, content = self.fake_cook(TEST_TYPE, content, sticky) inserted[i] = (obj_id, content) if i == 7: cutoff_date = datetime.datetime.now() self.vault_backend.update_access_ts(TEST_TYPE, inserted[2][0]) self.vault_backend.update_access_ts(TEST_TYPE, inserted[3][0]) self.vault_backend.cache_expire_until(date=cutoff_date) should_be_still_here = {2, 3, 5, 8, 9} for i in r: self.assertEqual(self.vault_backend.is_available( TEST_TYPE, inserted[i][0]), i in should_be_still_here)