diff --git a/sql/swh-vault-schema.sql b/sql/swh-vault-schema.sql --- a/sql/swh-vault-schema.sql +++ b/sql/swh-vault-schema.sql @@ -25,6 +25,8 @@ task_uuid uuid not null, -- celery UUID of the cooking task task_status cook_status not null default 'new', -- status of the task + sticky boolean not null default false, -- bundle cannot expire + ts_created timestamptz not null default now(), -- timestamp of creation ts_done timestamptz, -- timestamp of the cooking result ts_last_access timestamptz not null default now(), -- last access diff --git a/swh/vault/backend.py b/swh/vault/backend.py --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -82,6 +82,7 @@ self.smtp_server = smtplib.SMTP('localhost') def reconnect(self): + """Reconnect to the database.""" if not self.db or self.db.closed: self.db = psycopg2.connect( dsn=self.config['vault_db'], @@ -89,6 +90,7 @@ ) def close(self): + """Close the underlying database connection.""" self.db.close() def cursor(self): @@ -121,9 +123,10 @@ @autocommit def task_info(self, obj_type, obj_id, cursor=None): + """Fetch information from a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' - SELECT id, type, object_id, task_uuid, task_status, + SELECT id, type, object_id, task_uuid, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_bundle WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) @@ -133,11 +136,13 @@ return res def _send_task(task_uuid, args): + """Send a cooking task to the celery scheduler""" task = get_task(cooking_task_name) task.apply_async(args, task_id=task_uuid) @autocommit - def create_task(self, obj_type, obj_id, cursor=None): + def create_task(self, obj_type, obj_id, sticky=False, cursor=None): + """Create and send a cooking task""" obj_id = hashutil.hash_to_bytes(obj_id) args = [self.config, obj_type, obj_id] CookerCls = get_cooker(obj_type) @@ -146,14 +151,16 @@ task_uuid = celery.uuid() cursor.execute(''' - INSERT INTO vault_bundle (type, object_id, task_uuid) - VALUES (%s, %s, %s)''', (obj_type, obj_id, task_uuid)) + INSERT INTO vault_bundle (type, object_id, task_uuid, sticky) + VALUES (%s, %s, %s, %s)''', + (obj_type, obj_id, task_uuid, sticky)) self.commit() self._send_task(task_uuid, args) @autocommit def add_notif_email(self, obj_type, obj_id, email, cursor=None): + """Add an e-mail address to notify when a given bundle is ready""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' INSERT INTO vault_notif_email (email, bundle_id) @@ -162,10 +169,13 @@ (email, obj_type, obj_id)) @autocommit - def cook_request(self, obj_type, obj_id, email=None, cursor=None): + def cook_request(self, obj_type, obj_id, *, sticky=False, + email=None, cursor=None): + """Main entry point for cooking requests. This starts a cooking task if + needed, and add the given e-mail to the notify list""" info = self.task_info(obj_type, obj_id) if info is None: - self.create_task(obj_type, obj_id) + self.create_task(obj_type, obj_id, sticky) if email is not None: if info is not None and info['task_status'] == 'done': self.send_notification(None, email, obj_type, obj_id) @@ -176,6 +186,7 @@ @autocommit def is_available(self, obj_type, obj_id, cursor=None): + """Check whether a bundle is available for retrieval""" info = self.task_info(obj_type, obj_id, cursor=cursor) return (info is not None and info['task_status'] == 'done' @@ -183,6 +194,7 @@ @autocommit def fetch(self, obj_type, obj_id, cursor=None): + """Retrieve a bundle from the cache""" if not self.is_available(obj_type, obj_id, cursor=cursor): return None self.update_access_ts(obj_type, obj_id, cursor=cursor) @@ -190,6 +202,7 @@ @autocommit def update_access_ts(self, obj_type, obj_id, cursor=None): + """Update the last access timestamp of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' UPDATE vault_bundle @@ -199,6 +212,7 @@ @autocommit def set_status(self, obj_type, obj_id, status, cursor=None): + """Set the cooking status of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) req = (''' UPDATE vault_bundle @@ -209,6 +223,7 @@ @autocommit def set_progress(self, obj_type, obj_id, progress, cursor=None): + """Set the cooking progress of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' UPDATE vault_bundle @@ -218,6 +233,7 @@ @autocommit def send_all_notifications(self, obj_type, obj_id, cursor=None): + """Send all the e-mails in the notification list of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' SELECT vault_notif_email.id AS id, email @@ -230,6 +246,7 @@ @autocommit def send_notification(self, n_id, email, obj_type, obj_id, cursor=None): + """Send the notification of a bundle to a specific e-mail""" hex_id = hashutil.hash_to_hex(obj_id) short_id = hex_id[:7] @@ -256,3 +273,35 @@ cursor.execute(''' DELETE FROM vault_notif_email WHERE id = %s''', (n_id,)) + + @autocommit + def _cache_expire(self, cond, *args, cursor=None): + """Low-level expiration method, used by cache_expire_* methods""" + # Embedded SELECT query to be able to use ORDER BY and LIMIT + cursor.execute(''' + DELETE FROM vault_bundle + WHERE ctid IN ( + SELECT ctid + FROM vault_bundle + WHERE sticky = false + {} + ) + RETURNING type, object_id + '''.format(cond), args) + + for d in cursor: + self.cache.delete(d['type'], bytes(d['object_id'])) + + @autocommit + def cache_expire_oldest(self, n=1, by='last_access', cursor=None): + """Expire the `n` oldest bundles""" + assert by in ('created', 'done', 'last_access') + filter = '''ORDER BY ts_{} LIMIT {}'''.format(by, n) + return self._cache_expire(filter) + + @autocommit + def cache_expire_until(self, date, by='last_access', cursor=None): + """Expire all the bundles until a certain date""" + assert by in ('created', 'done', 'last_access') + filter = '''AND ts_{} <= %s'''.format(by) + return self._cache_expire(filter, date) diff --git a/swh/vault/cache.py b/swh/vault/cache.py --- a/swh/vault/cache.py +++ b/swh/vault/cache.py @@ -31,6 +31,10 @@ storage = self._get_storage(obj_type) return storage.get(hashutil.hash_to_bytes(obj_id)) + def delete(self, obj_type, obj_id): + storage = self._get_storage(obj_type) + return storage.delete(hashutil.hash_to_bytes(obj_id)) + def add_stream(self, obj_type, obj_id, content_iter): storage = self._get_storage(obj_type) return storage.add_stream(content_iter, obj_id) @@ -55,7 +59,7 @@ if not os.path.isdir(fp): os.makedirs(fp, DIR_MODE, exist_ok=True) - self.storages[obj_type] = get_objstorage( - 'pathslicing', {'root': fp, 'slicing': '0:1/0:5'}) + conf = {'root': fp, 'slicing': '0:1/0:5', 'allow_delete': True} + self.storages[obj_type] = get_objstorage('pathslicing', conf) return self.storages[obj_type] diff --git a/swh/vault/tests/test_backend.py b/swh/vault/tests/test_backend.py --- a/swh/vault/tests/test_backend.py +++ b/swh/vault/tests/test_backend.py @@ -37,6 +37,18 @@ creation_delta_secs = (ts - now).total_seconds() self.assertLess(creation_delta_secs, tolerance_secs) + def hash_content(self, content): + obj_id = hashutil.hash_data(content)['sha1'] + return content, obj_id + + def fake_cook(self, obj_type, result_content, sticky=False): + content, obj_id = self.hash_content(result_content) + with self.mock_cooking(): + self.vault_backend.create_task(obj_type, obj_id, sticky) + self.vault_backend.cache.add(obj_type, obj_id, b'content') + self.vault_backend.set_status(obj_type, obj_id, 'done') + return obj_id, content + TEST_TYPE = 'revision_gitfast' TEST_HEX_ID = '4a4b9771542143cf070386f86b4b92d42966bdbc' @@ -160,7 +172,8 @@ madd.reset_mock() msend.reset_mock() - self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL) + self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, + email=TEST_EMAIL) madd.assert_called_once_with(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL) msend.assert_not_called() @@ -168,7 +181,8 @@ msend.reset_mock() self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') - self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL) + self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, + email=TEST_EMAIL) msend.assert_called_once_with(None, TEST_EMAIL, TEST_TYPE, TEST_OBJ_ID) madd.assert_not_called() @@ -179,7 +193,8 @@ 'billg@example.com', 'test+42@example.org') for email in emails: - self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, email) + self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, + email=email) self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') @@ -221,19 +236,55 @@ def test_fetch(self): self.assertEqual(self.vault_backend.fetch(TEST_TYPE, TEST_OBJ_ID), None) - with self.mock_cooking(): - self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) - self.vault_backend.cache.add(TEST_TYPE, TEST_OBJ_ID, b'content') - self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') + obj_id, content = self.fake_cook(TEST_TYPE, b'content') - info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) + info = self.vault_backend.task_info(TEST_TYPE, obj_id) access_ts_before = info['ts_last_access'] - self.assertEqual(self.vault_backend.fetch(TEST_TYPE, TEST_OBJ_ID), + self.assertEqual(self.vault_backend.fetch(TEST_TYPE, obj_id), b'content') - info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) + info = self.vault_backend.task_info(TEST_TYPE, obj_id) access_ts_after = info['ts_last_access'] self.assertTimestampAlmostNow(access_ts_after) self.assertLess(access_ts_before, access_ts_after) + + def test_cache_expire_oldest(self): + r = range(1, 10) + inserted = {} + for i in r: + sticky = (i == 5) + content = b'content%s' % str(i).encode() + obj_id, content = self.fake_cook(TEST_TYPE, content, sticky) + inserted[i] = (obj_id, content) + + self.vault_backend.update_access_ts(TEST_TYPE, inserted[2][0]) + self.vault_backend.update_access_ts(TEST_TYPE, inserted[3][0]) + self.vault_backend.cache_expire_oldest(n=4) + + should_be_still_here = {2, 3, 5, 8, 9} + for i in r: + self.assertEqual(self.vault_backend.is_available( + TEST_TYPE, inserted[i][0]), i in should_be_still_here) + + def test_cache_expire_until(self): + r = range(1, 10) + inserted = {} + for i in r: + sticky = (i == 5) + content = b'content%s' % str(i).encode() + obj_id, content = self.fake_cook(TEST_TYPE, content, sticky) + inserted[i] = (obj_id, content) + + if i == 7: + cutoff_date = datetime.datetime.now() + + self.vault_backend.update_access_ts(TEST_TYPE, inserted[2][0]) + self.vault_backend.update_access_ts(TEST_TYPE, inserted[3][0]) + self.vault_backend.cache_expire_until(date=cutoff_date) + + should_be_still_here = {2, 3, 5, 8, 9} + for i in r: + self.assertEqual(self.vault_backend.is_available( + TEST_TYPE, inserted[i][0]), i in should_be_still_here)