diff --git a/swh/scrubber/db.py b/swh/scrubber/db.py --- a/swh/scrubber/db.py +++ b/swh/scrubber/db.py @@ -50,32 +50,32 @@ @functools.lru_cache(1000) def datastore_get_or_add(self, datastore: Datastore) -> int: """Creates a datastore if it does not exist, and returns its id.""" - cur = self.cursor() - cur.execute( - """ - WITH inserted AS ( - INSERT INTO datastore (package, class, instance) - VALUES (%(package)s, %(cls)s, %(instance)s) - ON CONFLICT DO NOTHING - RETURNING id - ) - SELECT id - FROM inserted - UNION ( - -- If the datastore already exists, we need to fetch its id + with self.transaction() as cur: + cur.execute( + """ + WITH inserted AS ( + INSERT INTO datastore (package, class, instance) + VALUES (%(package)s, %(cls)s, %(instance)s) + ON CONFLICT DO NOTHING + RETURNING id + ) SELECT id - FROM datastore - WHERE - package=%(package)s - AND class=%(cls)s - AND instance=%(instance)s + FROM inserted + UNION ( + -- If the datastore already exists, we need to fetch its id + SELECT id + FROM datastore + WHERE + package=%(package)s + AND class=%(cls)s + AND instance=%(instance)s + ) + LIMIT 1 + """, + (dataclasses.asdict(datastore)), ) - LIMIT 1 - """, - (dataclasses.asdict(datastore)), - ) - (id_,) = cur.fetchone() - return id_ + (id_,) = cur.fetchone() + return id_ def corrupt_object_add( self, @@ -84,40 +84,40 @@ serialized_object: bytes, ) -> None: datastore_id = self.datastore_get_or_add(datastore) - cur = self.cursor() - cur.execute( - """ - INSERT INTO corrupt_object (id, datastore, object) - VALUES (%s, %s, %s) - ON CONFLICT DO NOTHING - """, - (str(id), datastore_id, serialized_object), - ) + with self.transaction() as cur: + cur.execute( + """ + INSERT INTO corrupt_object (id, datastore, object) + VALUES (%s, %s, %s) + ON CONFLICT DO NOTHING + """, + (str(id), datastore_id, serialized_object), + ) def corrupt_object_iter(self) -> Iterator[CorruptObject]: """Yields all records in the 'corrupt_object' table.""" - cur = self.cursor() - cur.execute( - """ - SELECT - co.id, co.first_occurrence, co.object, - ds.package, ds.class, ds.instance - FROM corrupt_object AS co - INNER JOIN datastore AS ds ON (ds.id=co.datastore) - """ - ) - - for row in cur: - (id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row - yield CorruptObject( - id=CoreSWHID.from_string(id), - first_occurrence=first_occurrence, - object_=object_, - datastore=Datastore( - package=ds_package, cls=ds_class, instance=ds_instance - ), + with self.transaction() as cur: + cur.execute( + """ + SELECT + co.id, co.first_occurrence, co.object, + ds.package, ds.class, ds.instance + FROM corrupt_object AS co + INNER JOIN datastore AS ds ON (ds.id=co.datastore) + """ ) + for row in cur: + (id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row + yield CorruptObject( + id=CoreSWHID.from_string(id), + first_occurrence=first_occurrence, + object_=object_, + datastore=Datastore( + package=ds_package, cls=ds_class, instance=ds_instance + ), + ) + def _corrupt_object_list_from_cursor( self, cur: psycopg2.extensions.cursor ) -> List[CorruptObject]: @@ -151,23 +151,23 @@ in_origin: An origin URL. If provided, only returns objects that may be found in the given origin """ - cur = self.cursor() - cur.execute( - """ - SELECT - co.id, co.first_occurrence, co.object, - ds.package, ds.class, ds.instance - FROM corrupt_object AS co - INNER JOIN datastore AS ds ON (ds.id=co.datastore) - WHERE - co.id >= %s - AND co.id <= %s - ORDER BY co.id - LIMIT %s - """, - (str(start_id), str(end_id), limit), - ) - return self._corrupt_object_list_from_cursor(cur) + with self.transaction() as cur: + cur.execute( + """ + SELECT + co.id, co.first_occurrence, co.object, + ds.package, ds.class, ds.instance + FROM corrupt_object AS co + INNER JOIN datastore AS ds ON (ds.id=co.datastore) + WHERE + co.id >= %s + AND co.id <= %s + ORDER BY co.id + LIMIT %s + """, + (str(start_id), str(end_id), limit), + ) + return self._corrupt_object_list_from_cursor(cur) def corrupt_object_grab_by_id( self, @@ -273,24 +273,24 @@ Arguments: after: if given, only returns origins with an URL after this value """ - cur = self.cursor() - cur.execute( - """ - SELECT DISTINCT origin_url - FROM object_origin - WHERE - origin_url > %(after)s - AND object_id IN ( - (SELECT id FROM corrupt_object) - EXCEPT (SELECT id FROM fixed_object) - ) - ORDER BY origin_url - LIMIT %(limit)s - """, - dict(after=after, limit=limit), - ) + with self.transaction() as cur: + cur.execute( + """ + SELECT DISTINCT origin_url + FROM object_origin + WHERE + origin_url > %(after)s + AND object_id IN ( + (SELECT id FROM corrupt_object) + EXCEPT (SELECT id FROM fixed_object) + ) + ORDER BY origin_url + LIMIT %(limit)s + """, + dict(after=after, limit=limit), + ) - return [origin_url for (origin_url,) in cur] + return [origin_url for (origin_url,) in cur] def fixed_object_add( self, cur: psycopg2.extensions.cursor, fixed_objects: List[FixedObject] @@ -309,12 +309,12 @@ ) def fixed_object_iter(self) -> Iterator[FixedObject]: - cur = self.cursor() - cur.execute("SELECT id, object, method, recovery_date FROM fixed_object") - for (id, object_, method, recovery_date) in cur: - yield FixedObject( - id=CoreSWHID.from_string(id), - object_=object_, - method=method, - recovery_date=recovery_date, - ) + with self.transaction() as cur: + cur.execute("SELECT id, object, method, recovery_date FROM fixed_object") + for (id, object_, method, recovery_date) in cur: + yield FixedObject( + id=CoreSWHID.from_string(id), + object_=object_, + method=method, + recovery_date=recovery_date, + )