Changeset View
Changeset View
Standalone View
Standalone View
swh/scrubber/storage_checker.py
# Copyright (C) 2021-2022 The Software Heritage developers | # Copyright (C) 2021-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
"""Reads all objects in a swh-storage instance and recomputes their checksums.""" | """Reads all objects in a swh-storage instance and recomputes their checksums.""" | ||||
import collections | |||||
import contextlib | import contextlib | ||||
import dataclasses | import dataclasses | ||||
import logging | import logging | ||||
from typing import Iterable, Union | from typing import Iterable, Union | ||||
from swh.journal.serializers import value_to_kafka | from swh.journal.serializers import value_to_kafka | ||||
from swh.model.model import Directory, Release, Revision, Snapshot | from swh.model import swhids | ||||
from swh.model.model import ( | |||||
Content, | |||||
Directory, | |||||
ObjectType, | |||||
Release, | |||||
Revision, | |||||
Snapshot, | |||||
TargetType, | |||||
) | |||||
from swh.storage import backfill | from swh.storage import backfill | ||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||
from swh.storage.postgresql.storage import Storage as PostgresqlStorage | from swh.storage.postgresql.storage import Storage as PostgresqlStorage | ||||
from .db import Datastore, ScrubberDb | from .db import Datastore, ScrubberDb | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
ScrubbableObject = Union[Revision, Release, Snapshot, Directory] | ScrubbableObject = Union[Revision, Release, Snapshot, Directory, Content] | ||||
@contextlib.contextmanager | @contextlib.contextmanager | ||||
def storage_db(storage): | def storage_db(storage): | ||||
db = storage.get_db() | db = storage.get_db() | ||||
try: | try: | ||||
yield db | yield db | ||||
finally: | finally: | ||||
▲ Show 20 Lines • Show All 56 Lines • ▼ Show 20 Lines | def _check_postgresql(self, db): | ||||
backfill._format_range_bound(range_end), | backfill._format_range_bound(range_end), | ||||
) | ) | ||||
objects = backfill.fetch( | objects = backfill.fetch( | ||||
db, self.object_type, start=range_start, end=range_end | db, self.object_type, start=range_start, end=range_end | ||||
) | ) | ||||
objects = list(objects) | objects = list(objects) | ||||
self.process_objects(objects) | self.check_object_hashes(objects) | ||||
self.check_object_references(objects) | |||||
def process_objects(self, objects: Iterable[ScrubbableObject]): | def check_object_hashes(self, objects: Iterable[ScrubbableObject]): | ||||
anlambert: missing dot at end of sentence. | |||||
"""Recomputes hashes, and reports mismatches.""" | |||||
for object_ in objects: | for object_ in objects: | ||||
if isinstance(object_, Content): | |||||
# TODO | |||||
continue | |||||
real_id = object_.compute_hash() | real_id = object_.compute_hash() | ||||
if object_.id != real_id: | if object_.id != real_id: | ||||
self.db.corrupt_object_add( | self.db.corrupt_object_add( | ||||
object_.swhid(), | object_.swhid(), | ||||
self.datastore_info(), | self.datastore_info(), | ||||
value_to_kafka(object_.to_dict()), | value_to_kafka(object_.to_dict()), | ||||
) | ) | ||||
def check_object_references(self, objects: Iterable[ScrubbableObject]): | |||||
"""Check all objects references by these objects exist.""" | |||||
cnt_references = collections.defaultdict(set) | |||||
dir_references = collections.defaultdict(set) | |||||
rev_references = collections.defaultdict(set) | |||||
rel_references = collections.defaultdict(set) | |||||
snp_references = collections.defaultdict(set) | |||||
for object_ in objects: | |||||
swhid = object_.swhid() | |||||
if isinstance(object_, Content): | |||||
pass | |||||
elif isinstance(object_, Directory): | |||||
for entry in object_.entries: | |||||
if entry.type == "file": | |||||
cnt_references[entry.target].add(swhid) | |||||
elif entry.type == "dir": | |||||
dir_references[entry.target].add(swhid) | |||||
elif entry.type == "rev": | |||||
# dir->rev holes are not considered a problem because they | |||||
# happen whenever git submodules point to repositories that | |||||
# were not loaded yet; ignore them | |||||
pass | |||||
else: | |||||
assert False, entry | |||||
elif isinstance(object_, Revision): | |||||
dir_references[object_.directory].add(swhid) | |||||
for parent in object_.parents: | |||||
rev_references[parent].add(swhid) | |||||
elif isinstance(object_, Release): | |||||
if object_.target is None: | |||||
pass | |||||
elif object_.target_type == ObjectType.CONTENT: | |||||
cnt_references[object_.target].add(swhid) | |||||
elif object_.target_type == ObjectType.DIRECTORY: | |||||
dir_references[object_.target].add(swhid) | |||||
elif object_.target_type == ObjectType.REVISION: | |||||
rev_references[object_.target].add(swhid) | |||||
elif object_.target_type == ObjectType.RELEASE: | |||||
rel_references[object_.target].add(swhid) | |||||
else: | |||||
assert False, object_ | |||||
elif isinstance(object_, Snapshot): | |||||
for branch in object_.branches.values(): | |||||
if branch is None: | |||||
pass | |||||
elif branch.target_type == TargetType.ALIAS: | |||||
pass | |||||
elif branch.target_type == TargetType.CONTENT: | |||||
cnt_references[branch.target].add(swhid) | |||||
elif branch.target_type == TargetType.DIRECTORY: | |||||
dir_references[branch.target].add(swhid) | |||||
elif branch.target_type == TargetType.REVISION: | |||||
rev_references[branch.target].add(swhid) | |||||
elif branch.target_type == TargetType.RELEASE: | |||||
rel_references[branch.target].add(swhid) | |||||
elif branch.target_type == TargetType.SNAPSHOT: | |||||
snp_references[branch.target].add(swhid) | |||||
else: | |||||
assert False, (str(object_.swhid()), branch) | |||||
else: | |||||
assert False, object_.swhid() | |||||
missing_cnts = self.storage.content_missing_per_sha1_git(list(cnt_references)) | |||||
missing_dirs = self.storage.directory_missing(list(dir_references)) | |||||
missing_revs = self.storage.revision_missing(list(rev_references)) | |||||
missing_rels = self.storage.release_missing(list(rel_references)) | |||||
Not Done Inline ActionsShould this queries be executed in batch in case the number of objects to check is really large ? We do not have any timeouts on those though. anlambert: Should this queries be executed in batch in case the number of objects to check is really large… | |||||
Done Inline ActionsIt's indirectly batched by the length of objects, so I'll keep it this way until it becomes an issue vlorentz: It's indirectly batched by the length of `objects`, so I'll keep it this way until it becomes… | |||||
missing_snps = self.storage.snapshot_missing(list(snp_references)) | |||||
for missing_id in missing_cnts: | |||||
missing_swhid = swhids.CoreSWHID( | |||||
object_type=swhids.ObjectType.CONTENT, object_id=missing_id | |||||
) | |||||
self.db.missing_object_add( | |||||
missing_swhid, cnt_references[missing_id], self.datastore_info() | |||||
) | |||||
for missing_id in missing_dirs: | |||||
missing_swhid = swhids.CoreSWHID( | |||||
object_type=swhids.ObjectType.DIRECTORY, object_id=missing_id | |||||
) | |||||
self.db.missing_object_add( | |||||
missing_swhid, dir_references[missing_id], self.datastore_info() | |||||
) | |||||
for missing_id in missing_revs: | |||||
missing_swhid = swhids.CoreSWHID( | |||||
object_type=swhids.ObjectType.REVISION, object_id=missing_id | |||||
) | |||||
self.db.missing_object_add( | |||||
missing_swhid, rev_references[missing_id], self.datastore_info() | |||||
) | |||||
for missing_id in missing_rels: | |||||
missing_swhid = swhids.CoreSWHID( | |||||
object_type=swhids.ObjectType.RELEASE, object_id=missing_id | |||||
) | |||||
self.db.missing_object_add( | |||||
missing_swhid, rel_references[missing_id], self.datastore_info() | |||||
) | |||||
for missing_id in missing_snps: | |||||
missing_swhid = swhids.CoreSWHID( | |||||
object_type=swhids.ObjectType.SNAPSHOT, object_id=missing_id | |||||
) | |||||
self.db.missing_object_add( | |||||
missing_swhid, snp_references[missing_id], self.datastore_info() | |||||
) |
missing dot at end of sentence.