diff --git a/swh/vault/backend.py b/swh/vault/backend.py --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -173,39 +173,25 @@ added_tasks = self.scheduler.create_tasks([task]) return added_tasks[0]['id'] - # TODO: remove once the scheduler handles priority tasks - def _send_batch_task(self, args): - """Send a cooking task to the celery scheduler""" - task = create_oneshot_task_dict('swh-vault-batch-cooking', *args) - added_tasks = self.scheduler.create_tasks([task]) - return added_tasks[0]['id'] - @autocommit - def create_task(self, obj_type, obj_id, sticky=False, batch=False, - cursor=None): + def create_task(self, obj_type, obj_id, sticky=False, cursor=None): """Create and send a cooking task""" obj_id = hashutil.hash_to_bytes(obj_id) hex_id = hashutil.hash_to_hex(obj_id) args = [obj_type, hex_id] - # Don't check all the elements of the batch locally to avoid rtt - if not batch: - backend_storage_config = {'storage': self.config['storage']} - cooker_class = get_cooker(obj_type) - cooker = cooker_class(*args, override_cfg=backend_storage_config) - if not cooker.check_exists(): - raise NotFoundExc("Object {} was not found.".format(hex_id)) + backend_storage_config = {'storage': self.config['storage']} + cooker_class = get_cooker(obj_type) + cooker = cooker_class(*args, override_cfg=backend_storage_config) + if not cooker.check_exists(): + raise NotFoundExc("Object {} was not found.".format(hex_id)) cursor.execute(''' INSERT INTO vault_bundle (type, object_id, sticky) VALUES (%s, %s, %s)''', (obj_type, obj_id, sticky)) self.commit() - # TODO: change once the scheduler handles priority tasks - if batch: - task_id = self._send_batch_task(args) - else: - task_id = self._send_task(args) + task_id = self._send_task(args) cursor.execute(''' UPDATE vault_bundle @@ -223,7 +209,7 @@ (email, obj_type, obj_id)) @autocommit - def cook_request(self, obj_type, obj_id, *, sticky=False, batch=False, + def cook_request(self, obj_type, obj_id, *, sticky=False, email=None, cursor=None): """Main entry point for cooking requests. This starts a cooking task if needed, and add the given e-mail to the notify list""" @@ -240,7 +226,7 @@ # If there's no bundle entry, create the task. if info is None: - self.create_task(obj_type, obj_id, sticky, batch=batch) + self.create_task(obj_type, obj_id, sticky) if email is not None: # If the task is already done, send the email directly @@ -262,17 +248,45 @@ batch_id = cursor.fetchone()['id'] batch = batch_to_bytes(batch) - # Ideally, if we start to do a lot of batch inserts and performance - # becomes an issue, we should be able to rewrite all the following - # function calls to work on batches. It requires a significant amount - # of work (using UPDATE FROM to update task_id, using DELETE with tuple - # unpacking, etc), so we're doing a simple loop in the meantime. - for obj_type, obj_id in batch: - info = self.cook_request(obj_type, obj_id, batch=True) - cursor.execute('''INSERT INTO vault_batch_bundle - (batch_id, bundle_id) - VALUES (%s, %s) ON CONFLICT DO NOTHING''', - (batch_id, info['id'])) + # Delete all failed bundles from the batch + cursor.execute('''DELETE FROM vault_bundle + WHERE (type, object_id) IN (%s)''', batch) + + # Insert all the bundles + cursor.execute(''' + INSERT INTO vault_bundle (type, object_id) + VALUES %s RETURNING id''', batch) + + # Insert the batch-bundle entries + bundle_ids = cursor.fetchall() + batch_id_bundle_ids = [(batch_id, bundle_id) + for bundle_id in bundle_ids] + cursor.execute('''INSERT INTO vault_batch_bundle + (batch_id, bundle_id) + VALUES %s ON CONFLICT DO NOTHING''', + batch_id_bundle_ids) + self.commit() + + # Send the task + args_batch = [(obj_type, hashutil.hash_to_hex(obj_id)) + for obj_type, obj_id in batch] + # TODO: change once the scheduler handles priority tasks + tasks = [create_oneshot_task_dict('swh-vault-batch-cooking', *args) + for args in args_batch] + + added_tasks = self.scheduler.create_tasks(tasks) + tasks_ids_bundle_ids = zip([task['id'] for task in added_tasks], batch) + tasks_ids_bundle_ids = [(task_id, obj_type, obj_id) + for task_id, (obj_type, obj_id) + in tasks_ids_bundle_ids] + + # Update the task ids + cursor.execute(''' + UPDATE vault_bundle + SET task_id = sub.task_id + FROM (SELECT task_id, type, object_id FROM %s) AS sub + WHERE type = sub.type AND object_id = sub.object_id ''', + tasks_ids_bundle_ids) return batch_id @autocommit