Changeset View
Changeset View
Standalone View
Standalone View
swh/scrubber/storage_checker.py
# Copyright (C) 2021-2022 The Software Heritage developers | # Copyright (C) 2021-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
"""Reads all objects in a swh-storage instance and recomputes their checksums.""" | """Reads all objects in a swh-storage instance and recomputes their checksums.""" | ||||
import collections | import collections | ||||
import contextlib | import contextlib | ||||
import dataclasses | import dataclasses | ||||
import datetime | import datetime | ||||
import logging | import logging | ||||
from typing import Iterable, Optional, Tuple, Union | from typing import Iterable, Optional, Tuple, Union | ||||
import psycopg2 | |||||
import tenacity | |||||
from swh.core.statsd import Statsd | from swh.core.statsd import Statsd | ||||
from swh.journal.serializers import value_to_kafka | from swh.journal.serializers import value_to_kafka | ||||
from swh.model import swhids | from swh.model import swhids | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Content, | Content, | ||||
Directory, | Directory, | ||||
ObjectType, | ObjectType, | ||||
Release, | Release, | ||||
▲ Show 20 Lines • Show All 100 Lines • ▼ Show 20 Lines | def statsd(self) -> Statsd: | ||||
) | ) | ||||
return self._statsd | return self._statsd | ||||
def run(self): | def run(self): | ||||
"""Runs on all objects of ``object_type`` and with id between | """Runs on all objects of ``object_type`` and with id between | ||||
``start_object`` and ``end_object``. | ``start_object`` and ``end_object``. | ||||
""" | """ | ||||
if isinstance(self.storage, PostgresqlStorage): | if isinstance(self.storage, PostgresqlStorage): | ||||
with storage_db(self.storage) as db: | return self._check_postgresql() | ||||
return self._check_postgresql(db) | |||||
else: | else: | ||||
raise NotImplementedError( | raise NotImplementedError( | ||||
f"StorageChecker(storage={self.storage!r}).check_storage()" | f"StorageChecker(storage={self.storage!r}).check_storage()" | ||||
) | ) | ||||
def _check_postgresql(self, db): | def _check_postgresql(self): | ||||
object_type = getattr(swhids.ObjectType, self.object_type.upper()) | object_type = getattr(swhids.ObjectType, self.object_type.upper()) | ||||
for range_start, range_end in backfill.RANGE_GENERATORS[self.object_type]( | for range_start, range_end in backfill.RANGE_GENERATORS[self.object_type]( | ||||
self.start_object, self.end_object | self.start_object, self.end_object | ||||
): | ): | ||||
(range_start_swhid, range_end_swhid) = _get_inclusive_range_swhids( | (range_start_swhid, range_end_swhid) = _get_inclusive_range_swhids( | ||||
range_start, range_end, object_type | range_start, range_end, object_type | ||||
) | ) | ||||
Show All 20 Lines | def _check_postgresql(self): | ||||
logger.debug( | logger.debug( | ||||
"Processing %s range %s to %s", | "Processing %s range %s to %s", | ||||
self.object_type, | self.object_type, | ||||
backfill._format_range_bound(range_start), | backfill._format_range_bound(range_start), | ||||
backfill._format_range_bound(range_end), | backfill._format_range_bound(range_end), | ||||
) | ) | ||||
self._check_postgresql_range(object_type, range_start, range_end) | |||||
self.db.checked_range_upsert( | |||||
self.datastore_info(), | |||||
range_start_swhid, | |||||
range_end_swhid, | |||||
start_time, | |||||
) | |||||
@tenacity.retry( | |||||
retry=tenacity.retry_if_exception_type(psycopg2.OperationalError), | |||||
wait=tenacity.wait_random_exponential(min=10, max=180), | |||||
) | |||||
def _check_postgresql_range( | |||||
self, object_type: swhids.ObjectType, range_start, range_end | |||||
) -> None: | |||||
assert isinstance( | |||||
self.storage, PostgresqlStorage | |||||
), f"_check_postgresql_range called with self.storage={self.storage!r}" | |||||
with storage_db(self.storage) as db: | |||||
objects = backfill.fetch( | objects = backfill.fetch( | ||||
db, self.object_type, start=range_start, end=range_end | db, self.object_type, start=range_start, end=range_end | ||||
) | ) | ||||
objects = list(objects) | objects = list(objects) | ||||
with self.statsd().timed( | with self.statsd().timed( | ||||
"batch_duration_seconds", tags={"operation": "check_hashes"} | "batch_duration_seconds", tags={"operation": "check_hashes"} | ||||
): | ): | ||||
self.check_object_hashes(objects) | self.check_object_hashes(objects) | ||||
with self.statsd().timed( | with self.statsd().timed( | ||||
"batch_duration_seconds", tags={"operation": "check_references"} | "batch_duration_seconds", tags={"operation": "check_references"} | ||||
): | ): | ||||
self.check_object_references(objects) | self.check_object_references(objects) | ||||
self.db.checked_range_upsert( | |||||
self.datastore_info(), | |||||
range_start_swhid, | |||||
range_end_swhid, | |||||
start_time, | |||||
) | |||||
def check_object_hashes(self, objects: Iterable[ScrubbableObject]): | def check_object_hashes(self, objects: Iterable[ScrubbableObject]): | ||||
"""Recomputes hashes, and reports mismatches.""" | """Recomputes hashes, and reports mismatches.""" | ||||
count = 0 | count = 0 | ||||
for object_ in objects: | for object_ in objects: | ||||
if isinstance(object_, Content): | if isinstance(object_, Content): | ||||
# TODO | # TODO | ||||
continue | continue | ||||
real_id = object_.compute_hash() | real_id = object_.compute_hash() | ||||
▲ Show 20 Lines • Show All 148 Lines • Show Last 20 Lines |