Changeset View
Changeset View
Standalone View
Standalone View
swh/scrubber/storage_checker.py
| # Copyright (C) 2021-2022 The Software Heritage developers | # Copyright (C) 2021-2022 The Software Heritage developers | ||||
| # See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
| # License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
| # See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
| """Reads all objects in a swh-storage instance and recomputes their checksums.""" | """Reads all objects in a swh-storage instance and recomputes their checksums.""" | ||||
| import collections | import collections | ||||
| import contextlib | import contextlib | ||||
| import dataclasses | import dataclasses | ||||
| import datetime | import datetime | ||||
| import logging | import logging | ||||
| from typing import Iterable, Optional, Tuple, Union | from typing import Iterable, Optional, Tuple, Union | ||||
| import psycopg2 | |||||
| import tenacity | |||||
| from swh.core.statsd import Statsd | from swh.core.statsd import Statsd | ||||
| from swh.journal.serializers import value_to_kafka | from swh.journal.serializers import value_to_kafka | ||||
| from swh.model import swhids | from swh.model import swhids | ||||
| from swh.model.model import ( | from swh.model.model import ( | ||||
| Content, | Content, | ||||
| Directory, | Directory, | ||||
| ObjectType, | ObjectType, | ||||
| Release, | Release, | ||||
| ▲ Show 20 Lines • Show All 100 Lines • ▼ Show 20 Lines | def statsd(self) -> Statsd: | ||||
| ) | ) | ||||
| return self._statsd | return self._statsd | ||||
| def run(self): | def run(self): | ||||
| """Runs on all objects of ``object_type`` and with id between | """Runs on all objects of ``object_type`` and with id between | ||||
| ``start_object`` and ``end_object``. | ``start_object`` and ``end_object``. | ||||
| """ | """ | ||||
| if isinstance(self.storage, PostgresqlStorage): | if isinstance(self.storage, PostgresqlStorage): | ||||
| with storage_db(self.storage) as db: | return self._check_postgresql() | ||||
| return self._check_postgresql(db) | |||||
| else: | else: | ||||
| raise NotImplementedError( | raise NotImplementedError( | ||||
| f"StorageChecker(storage={self.storage!r}).check_storage()" | f"StorageChecker(storage={self.storage!r}).check_storage()" | ||||
| ) | ) | ||||
| def _check_postgresql(self, db): | def _check_postgresql(self): | ||||
| object_type = getattr(swhids.ObjectType, self.object_type.upper()) | object_type = getattr(swhids.ObjectType, self.object_type.upper()) | ||||
| for range_start, range_end in backfill.RANGE_GENERATORS[self.object_type]( | for range_start, range_end in backfill.RANGE_GENERATORS[self.object_type]( | ||||
| self.start_object, self.end_object | self.start_object, self.end_object | ||||
| ): | ): | ||||
| (range_start_swhid, range_end_swhid) = _get_inclusive_range_swhids( | (range_start_swhid, range_end_swhid) = _get_inclusive_range_swhids( | ||||
| range_start, range_end, object_type | range_start, range_end, object_type | ||||
| ) | ) | ||||
| Show All 20 Lines | def _check_postgresql(self): | ||||
| logger.debug( | logger.debug( | ||||
| "Processing %s range %s to %s", | "Processing %s range %s to %s", | ||||
| self.object_type, | self.object_type, | ||||
| backfill._format_range_bound(range_start), | backfill._format_range_bound(range_start), | ||||
| backfill._format_range_bound(range_end), | backfill._format_range_bound(range_end), | ||||
| ) | ) | ||||
| self._check_postgresql_range(object_type, range_start, range_end) | |||||
| self.db.checked_range_upsert( | |||||
| self.datastore_info(), | |||||
| range_start_swhid, | |||||
| range_end_swhid, | |||||
| start_time, | |||||
| ) | |||||
| @tenacity.retry( | |||||
| retry=tenacity.retry_if_exception_type(psycopg2.OperationalError), | |||||
| wait=tenacity.wait_random_exponential(min=10, max=180), | |||||
| ) | |||||
| def _check_postgresql_range( | |||||
| self, object_type: swhids.ObjectType, range_start, range_end | |||||
| ) -> None: | |||||
| assert isinstance( | |||||
| self.storage, PostgresqlStorage | |||||
| ), f"_check_postgresql_range called with self.storage={self.storage!r}" | |||||
| with storage_db(self.storage) as db: | |||||
| objects = backfill.fetch( | objects = backfill.fetch( | ||||
| db, self.object_type, start=range_start, end=range_end | db, self.object_type, start=range_start, end=range_end | ||||
| ) | ) | ||||
| objects = list(objects) | objects = list(objects) | ||||
| with self.statsd().timed( | with self.statsd().timed( | ||||
| "batch_duration_seconds", tags={"operation": "check_hashes"} | "batch_duration_seconds", tags={"operation": "check_hashes"} | ||||
| ): | ): | ||||
| self.check_object_hashes(objects) | self.check_object_hashes(objects) | ||||
| with self.statsd().timed( | with self.statsd().timed( | ||||
| "batch_duration_seconds", tags={"operation": "check_references"} | "batch_duration_seconds", tags={"operation": "check_references"} | ||||
| ): | ): | ||||
| self.check_object_references(objects) | self.check_object_references(objects) | ||||
| self.db.checked_range_upsert( | |||||
| self.datastore_info(), | |||||
| range_start_swhid, | |||||
| range_end_swhid, | |||||
| start_time, | |||||
| ) | |||||
| def check_object_hashes(self, objects: Iterable[ScrubbableObject]): | def check_object_hashes(self, objects: Iterable[ScrubbableObject]): | ||||
| """Recomputes hashes, and reports mismatches.""" | """Recomputes hashes, and reports mismatches.""" | ||||
| count = 0 | count = 0 | ||||
| for object_ in objects: | for object_ in objects: | ||||
| if isinstance(object_, Content): | if isinstance(object_, Content): | ||||
| # TODO | # TODO | ||||
| continue | continue | ||||
| real_id = object_.compute_hash() | real_id = object_.compute_hash() | ||||
| ▲ Show 20 Lines • Show All 148 Lines • Show Last 20 Lines | |||||