diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,5 @@ # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html dulwich +psycopg2 +tenacity diff --git a/swh/scrubber/storage_checker.py b/swh/scrubber/storage_checker.py --- a/swh/scrubber/storage_checker.py +++ b/swh/scrubber/storage_checker.py @@ -12,6 +12,9 @@ import logging from typing import Iterable, Optional, Tuple, Union +import psycopg2 +import tenacity + from swh.core.statsd import Statsd from swh.journal.serializers import value_to_kafka from swh.model import swhids @@ -128,14 +131,13 @@ ``start_object`` and ``end_object``. """ if isinstance(self.storage, PostgresqlStorage): - with storage_db(self.storage) as db: - return self._check_postgresql(db) + return self._check_postgresql() else: raise NotImplementedError( f"StorageChecker(storage={self.storage!r}).check_storage()" ) - def _check_postgresql(self, db): + def _check_postgresql(self): object_type = getattr(swhids.ObjectType, self.object_type.upper()) for range_start, range_end in backfill.RANGE_GENERATORS[self.object_type]( self.start_object, self.end_object @@ -172,6 +174,27 @@ backfill._format_range_bound(range_end), ) + self._check_postgresql_range(object_type, range_start, range_end) + + self.db.checked_range_upsert( + self.datastore_info(), + range_start_swhid, + range_end_swhid, + start_time, + ) + + @tenacity.retry( + retry=tenacity.retry_if_exception_type(psycopg2.OperationalError), + wait=tenacity.wait_random_exponential(min=10, max=180), + ) + def _check_postgresql_range( + self, object_type: swhids.ObjectType, range_start, range_end + ) -> None: + assert isinstance( + self.storage, PostgresqlStorage + ), f"_check_postgresql_range called with self.storage={self.storage!r}" + + with storage_db(self.storage) as db: objects = backfill.fetch( db, self.object_type, start=range_start, end=range_end ) @@ -186,13 +209,6 @@ ): self.check_object_references(objects) - self.db.checked_range_upsert( - self.datastore_info(), - range_start_swhid, - range_end_swhid, - start_time, - ) - def check_object_hashes(self, objects: Iterable[ScrubbableObject]): """Recomputes hashes, and reports mismatches.""" count = 0