diff --git a/sql/swh-vault-schema.sql b/sql/swh-vault-schema.sql --- a/sql/swh-vault-schema.sql +++ b/sql/swh-vault-schema.sql @@ -22,9 +22,7 @@ type cook_type not null, -- requested cooking type object_id obj_hash not null, -- requested object ID - task_uuid uuid not null, -- celery UUID of the cooking task task_status cook_status not null default 'new', -- status of the task - sticky boolean not null default false, -- bundle cannot expire ts_created timestamptz not null default now(), -- timestamp of creation diff --git a/swh/vault/api/server.py b/swh/vault/api/server.py --- a/swh/vault/api/server.py +++ b/swh/vault/api/server.py @@ -61,7 +61,7 @@ def user_info(task_info): - return {'task_uuid': str(task_info['task_uuid']), + return {'id': task_info['id'], 'status': task_info['task_status'], 'progress_message': task_info['progress_msg'], 'obj_type': task_info['type'], diff --git a/swh/vault/backend.py b/swh/vault/backend.py --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -4,7 +4,6 @@ # See top-level LICENSE file for more information import smtplib -import celery import psycopg2 import psycopg2.extras @@ -12,7 +11,8 @@ from email.mime.text import MIMEText from swh.model import hashutil -from swh.scheduler.utils import get_task +from swh.scheduler.backend import SchedulerBackend +from swh.scheduler.utils import create_oneshot_task_dict from swh.vault.cache import VaultCache from swh.vault.cookers import get_cooker from swh.vault.cooking_tasks import SWHCookingTask # noqa @@ -80,6 +80,7 @@ self.db = None self.reconnect() self.smtp_server = smtplib.SMTP('localhost', 25) + self.scheduler = SchedulerBackend() def reconnect(self): """Reconnect to the database.""" @@ -126,7 +127,7 @@ """Fetch information from a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' - SELECT id, type, object_id, task_uuid, task_status, sticky, + SELECT id, type, object_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_bundle WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) @@ -135,29 +136,28 @@ res['object_id'] = bytes(res['object_id']) return res - @staticmethod - def _send_task(task_uuid, args): + def _send_task(self, args): """Send a cooking task to the celery scheduler""" - task = get_task(cooking_task_name) - task.apply_async(args, task_id=task_uuid) + task = create_oneshot_task_dict('swh-vault-cooking', *args) + self.scheduler.create_tasks([task]) @autocommit def create_task(self, obj_type, obj_id, sticky=False, cursor=None): """Create and send a cooking task""" obj_id = hashutil.hash_to_bytes(obj_id) - args = [obj_type, obj_id] + hex_id = hashutil.hash_to_hex(obj_id) + args = [obj_type, hex_id] + cooker_class = get_cooker(obj_type) cooker = cooker_class(*args) cooker.check_exists() - task_uuid = celery.uuid() cursor.execute(''' - INSERT INTO vault_bundle (type, object_id, task_uuid, sticky) - VALUES (%s, %s, %s, %s)''', - (obj_type, obj_id, task_uuid, sticky)) + INSERT INTO vault_bundle (type, object_id, sticky) + VALUES (%s, %s, %s)''', (obj_type, obj_id, sticky)) self.commit() - self._send_task(task_uuid, args) + self._send_task(args) @autocommit def add_notif_email(self, obj_type, obj_id, email, cursor=None): diff --git a/swh/vault/tests/test_backend.py b/swh/vault/tests/test_backend.py --- a/swh/vault/tests/test_backend.py +++ b/swh/vault/tests/test_backend.py @@ -68,15 +68,13 @@ self.assertEqual(m['cooker'].check_exists.call_count, 1) self.assertEqual(m['send_task'].call_count, 1) - args = m['send_task'].call_args[0][1] + args = m['send_task'].call_args[0][0] self.assertEqual(args[0], TEST_TYPE) self.assertEqual(args[1], TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['object_id'], TEST_OBJ_ID) self.assertEqual(info['type'], TEST_TYPE) - self.assertEqual(str(info['task_uuid']), - m['send_task'].call_args[0][0]) self.assertEqual(info['task_status'], 'new') self.assertTimestampAlmostNow(info['ts_created'])