diff --git a/swh/vault/backend.py b/swh/vault/backend.py --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -173,25 +173,39 @@ added_tasks = self.scheduler.create_tasks([task]) return added_tasks[0]['id'] + # TODO: remove once the scheduler handles priority tasks + def _send_batch_task(self, args): + """Send a cooking task to the celery scheduler""" + task = create_oneshot_task_dict('swh-vault-batch-cooking', *args) + added_tasks = self.scheduler.create_tasks([task]) + return added_tasks[0]['id'] + @autocommit - def create_task(self, obj_type, obj_id, sticky=False, cursor=None): + def create_task(self, obj_type, obj_id, sticky=False, batch=False, + cursor=None): """Create and send a cooking task""" obj_id = hashutil.hash_to_bytes(obj_id) hex_id = hashutil.hash_to_hex(obj_id) args = [obj_type, hex_id] - backend_storage_config = {'storage': self.config['storage']} - cooker_class = get_cooker(obj_type) - cooker = cooker_class(*args, override_cfg=backend_storage_config) - if not cooker.check_exists(): - raise NotFoundExc("Object {} was not found.".format(hex_id)) + # Don't check all the elements of the batch locally to avoid rtt + if not batch: + backend_storage_config = {'storage': self.config['storage']} + cooker_class = get_cooker(obj_type) + cooker = cooker_class(*args, override_cfg=backend_storage_config) + if not cooker.check_exists(): + raise NotFoundExc("Object {} was not found.".format(hex_id)) cursor.execute(''' INSERT INTO vault_bundle (type, object_id, sticky) VALUES (%s, %s, %s)''', (obj_type, obj_id, sticky)) self.commit() - task_id = self._send_task(args) + # TODO: change once the scheduler handles priority tasks + if batch: + task_id = self._send_batch_task(args) + else: + task_id = self._send_task(args) cursor.execute(''' UPDATE vault_bundle @@ -254,7 +268,7 @@ # of work (using UPDATE FROM to update task_id, using DELETE with tuple # unpacking, etc), so we're doing a simple loop in the meantime. for obj_type, obj_id in batch: - info = self.cook_request(obj_type, obj_id) + info = self.cook_request(obj_type, obj_id, batch=True) cursor.execute('''INSERT INTO vault_batch_bundle (batch_id, bundle_id) VALUES (%s, %s)''', (batch_id, info['id'])) diff --git a/swh/vault/cooking_tasks.py b/swh/vault/cooking_tasks.py --- a/swh/vault/cooking_tasks.py +++ b/swh/vault/cooking_tasks.py @@ -15,3 +15,14 @@ def run_task(self, obj_type, obj_id): cooker = get_cooker(obj_type)(obj_type, obj_id) cooker.cook() + + +# TODO: remove once the scheduler handles priority tasks +class SWHBatchCookingTask(Task): + """Temporary task for the batch queue.""" + + task_queue = 'swh_vault_batch_cooking' + + def run_task(self, obj_type, obj_id): + cooker = get_cooker(obj_type)(obj_type, obj_id) + cooker.cook()