diff --git a/sql/swh-vault-schema.sql b/sql/swh-vault-schema.sql --- a/sql/swh-vault-schema.sql +++ b/sql/swh-vault-schema.sql @@ -25,6 +25,8 @@ task_uuid uuid not null, -- celery UUID of the cooking task task_status cook_status not null default 'new', -- status of the task + permanent boolean not null default false, -- bundles never deleted + ts_created timestamptz not null default now(), -- timestamp of creation ts_done timestamptz, -- timestamp of the cooking result ts_last_access timestamptz not null default now(), -- last access diff --git a/swh/vault/backend.py b/swh/vault/backend.py --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -123,7 +123,7 @@ def task_info(self, obj_type, obj_id, cursor=None): obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' - SELECT id, type, object_id, task_uuid, task_status, + SELECT id, type, object_id, task_uuid, task_status, permanent, ts_created, ts_done, ts_last_access, progress_msg FROM vault_bundle WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) @@ -137,7 +137,7 @@ task.apply_async(args, task_id=task_uuid) @autocommit - def create_task(self, obj_type, obj_id, cursor=None): + def create_task(self, obj_type, obj_id, permanent=False, cursor=None): obj_id = hashutil.hash_to_bytes(obj_id) args = [self.config, obj_type, obj_id] CookerCls = get_cooker(obj_type) @@ -146,8 +146,9 @@ task_uuid = celery.uuid() cursor.execute(''' - INSERT INTO vault_bundle (type, object_id, task_uuid) - VALUES (%s, %s, %s)''', (obj_type, obj_id, task_uuid)) + INSERT INTO vault_bundle (type, object_id, task_uuid, permanent) + VALUES (%s, %s, %s, %s)''', + (obj_type, obj_id, task_uuid, permanent)) self.commit() self._send_task(task_uuid, args) @@ -162,10 +163,11 @@ (email, obj_type, obj_id)) @autocommit - def cook_request(self, obj_type, obj_id, email=None, cursor=None): + def cook_request(self, obj_type, obj_id, *, permanent=False, + email=None, cursor=None): info = self.task_info(obj_type, obj_id) if info is None: - self.create_task(obj_type, obj_id) + self.create_task(obj_type, obj_id, permanent) if email is not None: if info is not None and info['task_status'] == 'done': self.send_notification(None, email, obj_type, obj_id) @@ -256,3 +258,32 @@ cursor.execute(''' DELETE FROM vault_notif_email WHERE id = %s''', (n_id,)) + + @autocommit + def _cache_expire(self, cond, *args, cursor=None): + # Embedded SELECT query to be able to use ORDER BY and LIMIT + cursor.execute(''' + DELETE FROM vault_bundle + WHERE ctid IN ( + SELECT ctid + FROM vault_bundle + WHERE permanent = false + {} + ) + RETURNING type, object_id + '''.format(cond), args) + + for d in cursor: + self.cache.delete(d['type'], bytes(d['object_id'])) + + @autocommit + def cache_expire_count(self, n=1, by='last_access', cursor=None): + assert by in ('created', 'done', 'last_access') + filter = '''ORDER BY ts_{} LIMIT {}'''.format(by, n) + return self._cache_expire(filter) + + @autocommit + def cache_expire_until(self, date, by='last_access', cursor=None): + assert by in ('created', 'done', 'last_access') + filter = '''AND ts_{} <= %s'''.format(by) + return self._cache_expire(filter, date) diff --git a/swh/vault/cache.py b/swh/vault/cache.py --- a/swh/vault/cache.py +++ b/swh/vault/cache.py @@ -31,6 +31,10 @@ storage = self._get_storage(obj_type) return storage.get(hashutil.hash_to_bytes(obj_id)) + def delete(self, obj_type, obj_id): + storage = self._get_storage(obj_type) + return storage.delete(hashutil.hash_to_bytes(obj_id)) + def add_stream(self, obj_type, obj_id, content_iter): storage = self._get_storage(obj_type) return storage.add_stream(content_iter, obj_id) @@ -55,7 +59,7 @@ if not os.path.isdir(fp): os.makedirs(fp, DIR_MODE, exist_ok=True) - self.storages[obj_type] = get_objstorage( - 'pathslicing', {'root': fp, 'slicing': '0:1/0:5'}) + conf = {'root': fp, 'slicing': '0:1/0:5', 'allow_delete': True} + self.storages[obj_type] = get_objstorage('pathslicing', conf) return self.storages[obj_type] diff --git a/swh/vault/tests/test_backend.py b/swh/vault/tests/test_backend.py --- a/swh/vault/tests/test_backend.py +++ b/swh/vault/tests/test_backend.py @@ -37,6 +37,10 @@ creation_delta_secs = (ts - now).total_seconds() self.assertLess(creation_delta_secs, tolerance_secs) + def hash_content(self, content): + obj_id = hashutil.hash_data(content)['sha1'] + return content, obj_id + TEST_TYPE = 'revision_gitfast' TEST_HEX_ID = '4a4b9771542143cf070386f86b4b92d42966bdbc' @@ -160,7 +164,8 @@ madd.reset_mock() msend.reset_mock() - self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL) + self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, + email=TEST_EMAIL) madd.assert_called_once_with(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL) msend.assert_not_called() @@ -168,7 +173,8 @@ msend.reset_mock() self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') - self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL) + self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, + email=TEST_EMAIL) msend.assert_called_once_with(None, TEST_EMAIL, TEST_TYPE, TEST_OBJ_ID) madd.assert_not_called() @@ -179,7 +185,8 @@ 'billg@example.com', 'test+42@example.org') for email in emails: - self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, email) + self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, + email=email) self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') @@ -237,3 +244,47 @@ self.assertTimestampAlmostNow(access_ts_after) self.assertLess(access_ts_before, access_ts_after) + + def test_cache_expire_count(self): + r = range(1, 10) + inserted = {} + for i in r: + content, obj_id = self.hash_content(b'content%s' % str(i).encode()) + inserted[i] = (obj_id, content) + with self.mock_cooking(): + permanent = (i == 5) + self.vault_backend.create_task(TEST_TYPE, obj_id, permanent) + self.vault_backend.cache.add(TEST_TYPE, obj_id, content) + self.vault_backend.set_status(TEST_TYPE, obj_id, 'done') + + self.vault_backend.update_access_ts(TEST_TYPE, inserted[2][0]) + self.vault_backend.update_access_ts(TEST_TYPE, inserted[3][0]) + self.vault_backend.cache_expire_count(n=4) + + should_be_still_here = {2, 3, 5, 8, 9} + for i in r: + self.assertEqual(self.vault_backend.is_available( + TEST_TYPE, inserted[i][0]), i in should_be_still_here) + + def test_cache_expire_until(self): + r = range(1, 10) + inserted = {} + for i in r: + content, obj_id = self.hash_content(b'content%s' % str(i).encode()) + inserted[i] = (obj_id, content) + with self.mock_cooking(): + permanent = (i == 5) + self.vault_backend.create_task(TEST_TYPE, obj_id, permanent) + self.vault_backend.cache.add(TEST_TYPE, obj_id, content) + self.vault_backend.set_status(TEST_TYPE, obj_id, 'done') + if i == 7: + cutoff_date = datetime.datetime.now() + + self.vault_backend.update_access_ts(TEST_TYPE, inserted[2][0]) + self.vault_backend.update_access_ts(TEST_TYPE, inserted[3][0]) + self.vault_backend.cache_expire_until(date=cutoff_date) + + should_be_still_here = {2, 3, 5, 8, 9} + for i in r: + self.assertEqual(self.vault_backend.is_available( + TEST_TYPE, inserted[i][0]), i in should_be_still_here)