diff --git a/swh/vault/backend.py b/swh/vault/backend.py --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -3,9 +3,9 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import psycopg2 -import psycopg2.extras import smtplib +import psycopg2 +from psycopg2.extras import RealDictCursor, execute_values from functools import wraps from email.mime.text import MIMEText @@ -118,7 +118,7 @@ if not self.db or self.db.closed: self.db = psycopg2.connect( dsn=self.config['db'], - cursor_factory=psycopg2.extras.RealDictCursor, + cursor_factory=RealDictCursor, ) def close(self): @@ -173,39 +173,25 @@ added_tasks = self.scheduler.create_tasks([task]) return added_tasks[0]['id'] - # TODO: remove once the scheduler handles priority tasks - def _send_batch_task(self, args): - """Send a cooking task to the celery scheduler""" - task = create_oneshot_task_dict('swh-vault-batch-cooking', *args) - added_tasks = self.scheduler.create_tasks([task]) - return added_tasks[0]['id'] - @autocommit - def create_task(self, obj_type, obj_id, sticky=False, batch=False, - cursor=None): + def create_task(self, obj_type, obj_id, sticky=False, cursor=None): """Create and send a cooking task""" obj_id = hashutil.hash_to_bytes(obj_id) hex_id = hashutil.hash_to_hex(obj_id) args = [obj_type, hex_id] - # Don't check all the elements of the batch locally to avoid rtt - if not batch: - backend_storage_config = {'storage': self.config['storage']} - cooker_class = get_cooker(obj_type) - cooker = cooker_class(*args, override_cfg=backend_storage_config) - if not cooker.check_exists(): - raise NotFoundExc("Object {} was not found.".format(hex_id)) + backend_storage_config = {'storage': self.config['storage']} + cooker_class = get_cooker(obj_type) + cooker = cooker_class(*args, override_cfg=backend_storage_config) + if not cooker.check_exists(): + raise NotFoundExc("Object {} was not found.".format(hex_id)) cursor.execute(''' INSERT INTO vault_bundle (type, object_id, sticky) VALUES (%s, %s, %s)''', (obj_type, obj_id, sticky)) self.commit() - # TODO: change once the scheduler handles priority tasks - if batch: - task_id = self._send_batch_task(args) - else: - task_id = self._send_task(args) + task_id = self._send_task(args) cursor.execute(''' UPDATE vault_bundle @@ -223,7 +209,7 @@ (email, obj_type, obj_id)) @autocommit - def cook_request(self, obj_type, obj_id, *, sticky=False, batch=False, + def cook_request(self, obj_type, obj_id, *, sticky=False, email=None, cursor=None): """Main entry point for cooking requests. This starts a cooking task if needed, and add the given e-mail to the notify list""" @@ -240,7 +226,7 @@ # If there's no bundle entry, create the task. if info is None: - self.create_task(obj_type, obj_id, sticky, batch=batch) + self.create_task(obj_type, obj_id, sticky) if email is not None: # If the task is already done, send the email directly @@ -257,22 +243,54 @@ @autocommit def batch_cook(self, batch, cursor=None): """Cook a batch of bundles and returns the cooking id.""" - cursor.execute('''INSERT INTO vault_batch (id) VALUES (DEFAULT) - RETURNING id''') + cursor.execute(''' + INSERT INTO vault_batch (id) + VALUES (DEFAULT) + RETURNING id''') batch_id = cursor.fetchone()['id'] batch = batch_to_bytes(batch) - # Ideally, if we start to do a lot of batch inserts and performance - # becomes an issue, we should be able to rewrite all the following - # function calls to work on batches. It requires a significant amount - # of work (using UPDATE FROM to update task_id, using DELETE with tuple - # unpacking, etc), so we're doing a simple loop in the meantime. - for obj_type, obj_id in batch: - info = self.cook_request(obj_type, obj_id, batch=True) - cursor.execute('''INSERT INTO vault_batch_bundle - (batch_id, bundle_id) - VALUES (%s, %s) ON CONFLICT DO NOTHING''', - (batch_id, info['id'])) + # Delete all failed bundles from the batch + cursor.execute(''' + DELETE FROM vault_bundle + WHERE (type, object_id) IN %s''', (tuple(batch),)) + + # Insert all the bundles + execute_values(cursor, ''' + INSERT INTO vault_bundle (type, object_id) + VALUES %s ON CONFLICT DO NOTHING RETURNING id''', batch) + + # Insert the batch-bundle entries + bundle_ids = cursor.fetchall() + batch_id_bundle_ids = [(batch_id, row['id']) for row in bundle_ids] + execute_values(cursor, ''' + INSERT INTO vault_batch_bundle (batch_id, bundle_id) + VALUES %s ON CONFLICT DO NOTHING''', + batch_id_bundle_ids) + self.commit() + + # Send the tasks + args_batch = [(obj_type, hashutil.hash_to_hex(obj_id)) + for obj_type, obj_id in batch] + # TODO: change once the scheduler handles priority tasks + tasks = [create_oneshot_task_dict('swh-vault-batch-cooking', *args) + for args in args_batch] + + added_tasks = self.scheduler.create_tasks(tasks) + tasks_ids_bundle_ids = zip([task['id'] for task in added_tasks], batch) + tasks_ids_bundle_ids = [(task_id, obj_type, obj_id) + for task_id, (obj_type, obj_id) + in tasks_ids_bundle_ids] + + # Update the task ids + execute_values(cursor, ''' + UPDATE vault_bundle + SET task_id = s_task_id + FROM (SELECT * FROM (VALUES %s) + AS sub (s_task_id, s_type, s_object_id) + ) AS sub + WHERE type = s_type::cook_type AND object_id = s_object_id ''', + tasks_ids_bundle_ids) return batch_id @autocommit