diff --git a/swh/scrubber/db.py b/swh/scrubber/db.py --- a/swh/scrubber/db.py +++ b/swh/scrubber/db.py @@ -7,7 +7,7 @@ import dataclasses import datetime import functools -from typing import Iterator, List, Optional +from typing import Iterable, Iterator, List, Optional import psycopg2 @@ -36,6 +36,21 @@ object_: bytes +@dataclasses.dataclass(frozen=True) +class MissingObject: + id: CoreSWHID + datastore: Datastore + first_occurrence: datetime.datetime + + +@dataclasses.dataclass(frozen=True) +class MissingObjectReference: + missing_id: CoreSWHID + reference_id: CoreSWHID + datastore: Datastore + first_occurrence: datetime.datetime + + @dataclasses.dataclass(frozen=True) class FixedObject: id: CoreSWHID @@ -47,6 +62,10 @@ class ScrubberDb(BaseDb): current_version = 2 + #################################### + # Shared tables + #################################### + @functools.lru_cache(1000) def datastore_get_or_add(self, datastore: Datastore) -> int: """Creates a datastore if it does not exist, and returns its id.""" @@ -79,6 +98,10 @@ (id_,) = res return id_ + #################################### + # Inventory of objects with issues + #################################### + def corrupt_object_add( self, id: CoreSWHID, @@ -256,6 +279,113 @@ ) return self._corrupt_object_list_from_cursor(cur) + def missing_object_add( + self, + id: CoreSWHID, + reference_ids: Iterable[CoreSWHID], + datastore: Datastore, + ) -> None: + """ + Adds a "hole" to the inventory, ie. an object missing from a datastore + that is referenced by an other object of the same datastore. + + If the missing object is already known to be missing by the scrubber database, + this only records the reference (which can be useful to locate an origin + to recover the object from). + If that reference is already known too, this is a noop. + + Args: + id: SWHID of the missing object (the hole) + reference_id: SWHID of the object referencing the missing object + datastore: representation of the swh-storage/swh-journal/... instance + containing this hole + """ + if not reference_ids: + raise ValueError("reference_ids is empty") + datastore_id = self.datastore_get_or_add(datastore) + with self.transaction() as cur: + cur.execute( + """ + INSERT INTO missing_object (id, datastore) + VALUES (%s, %s) + ON CONFLICT DO NOTHING + """, + (str(id), datastore_id), + ) + psycopg2.extras.execute_batch( + cur, + """ + INSERT INTO missing_object_reference (missing_id, reference_id, datastore) + VALUES (%s, %s, %s) + ON CONFLICT DO NOTHING + """, + [ + (str(id), str(reference_id), datastore_id) + for reference_id in reference_ids + ], + ) + + def missing_object_iter(self) -> Iterator[MissingObject]: + """Yields all records in the 'missing_object' table.""" + with self.transaction() as cur: + cur.execute( + """ + SELECT + mo.id, mo.first_occurrence, + ds.package, ds.class, ds.instance + FROM missing_object AS mo + INNER JOIN datastore AS ds ON (ds.id=mo.datastore) + """ + ) + + for row in cur: + (id, first_occurrence, ds_package, ds_class, ds_instance) = row + yield MissingObject( + id=CoreSWHID.from_string(id), + first_occurrence=first_occurrence, + datastore=Datastore( + package=ds_package, cls=ds_class, instance=ds_instance + ), + ) + + def missing_object_reference_iter( + self, missing_id: CoreSWHID + ) -> Iterator[MissingObjectReference]: + """Yields all records in the 'missing_object_reference' table.""" + with self.transaction() as cur: + cur.execute( + """ + SELECT + mor.reference_id, mor.first_occurrence, + ds.package, ds.class, ds.instance + FROM missing_object_reference AS mor + INNER JOIN datastore AS ds ON (ds.id=mor.datastore) + WHERE mor.missing_id=%s + """, + (str(missing_id),), + ) + + for row in cur: + ( + reference_id, + first_occurrence, + ds_package, + ds_class, + ds_instance, + ) = row + yield MissingObjectReference( + missing_id=missing_id, + reference_id=CoreSWHID.from_string(reference_id), + first_occurrence=first_occurrence, + datastore=Datastore( + package=ds_package, cls=ds_class, instance=ds_instance + ), + ) + + #################################### + # Issue resolution + #################################### + def object_origin_add( self, cur: psycopg2.extensions.cursor, swhid: CoreSWHID, origins: List[str] ) -> None: diff --git a/swh/scrubber/sql/30-schema.sql b/swh/scrubber/sql/30-schema.sql --- a/swh/scrubber/sql/30-schema.sql +++ b/swh/scrubber/sql/30-schema.sql @@ -1,3 +1,7 @@ +------------------------------------- +-- Shared definitions +------------------------------------- + create domain swhid as text check (value ~ '^swh:[0-9]+:.*'); create table datastore @@ -14,6 +18,11 @@ comment on column datastore.class is 'For datastores with multiple backends, name of the backend (postgresql/cassandra for storage, kafka for journal, pathslicer/azure/winery/... for objstorage)'; comment on column datastore.instance is 'Human-readable way to uniquely identify the datastore; eg. its URL or DSN.'; + +------------------------------------- +-- Inventory of objects with issues +------------------------------------- + create table corrupt_object ( id swhid not null, @@ -27,6 +36,37 @@ comment on column corrupt_object.object is 'Corrupt object, as found in the datastore (possibly msgpack-encoded, using the journal''s serializer)'; comment on column corrupt_object.first_occurrence is 'Moment the object was found to be corrupt for the first time'; + +create table missing_object +( + id swhid not null, + datastore int not null, + first_occurrence timestamptz not null default now() +); + +comment on table missing_object is 'Each row identifies an object that are missing but referenced by another object (aka "holes")'; +comment on column missing_object.datastore is 'Datastore where the hole is.'; +comment on column missing_object.first_occurrence is 'Moment the object was found to be corrupt for the first time'; + +create table missing_object_reference +( + missing_id swhid not null, + reference_id swhid not null, + datastore int not null, + first_occurrence timestamptz not null default now() +); + +comment on table missing_object_reference is 'Each row identifies an object that points to an object that does not exist (aka a "hole")'; +comment on column missing_object_reference.missing_id is 'SWHID of the missing object.'; +comment on column missing_object_reference.reference_id is 'SWHID of the object referencing the missing object.'; +comment on column missing_object_reference.datastore is 'Datastore where the referencing object is.'; +comment on column missing_object_reference.first_occurrence is 'Moment the object was found to reference a missing object'; + + +------------------------------------- +-- Issue resolution +------------------------------------- + create table object_origin ( object_id swhid not null, diff --git a/swh/scrubber/sql/60-indexes.sql b/swh/scrubber/sql/60-indexes.sql --- a/swh/scrubber/sql/60-indexes.sql +++ b/swh/scrubber/sql/60-indexes.sql @@ -1,3 +1,7 @@ +------------------------------------- +-- Shared tables +------------------------------------- + -- datastore create unique index concurrently datastore_pkey on datastore(id); @@ -6,6 +10,10 @@ create unique index concurrently datastore_package_class_instance on datastore(package, class, instance); +------------------------------------- +-- Inventory of objects with issues +------------------------------------- + -- corrupt_object alter table corrupt_object add constraint corrupt_object_datastore_fkey foreign key (datastore) references datastore(id) not valid; @@ -14,6 +22,28 @@ create unique index concurrently corrupt_object_pkey on corrupt_object(id, datastore); alter table corrupt_object add primary key using index corrupt_object_pkey; + +-- missing_object + +alter table missing_object add constraint missing_object_datastore_fkey foreign key (datastore) references datastore(id) not valid; +alter table missing_object validate constraint missing_object_datastore_fkey; + +create unique index concurrently missing_object_pkey on missing_object(id, datastore); +alter table missing_object add primary key using index missing_object_pkey; + + +-- missing_object_reference + +alter table missing_object_reference add constraint missing_object_reference_datastore_fkey foreign key (datastore) references datastore(id) not valid; +alter table missing_object_reference validate constraint missing_object_reference_datastore_fkey; + +create unique index concurrently missing_object_reference_missing_id_reference_id_datastore on missing_object_reference(missing_id, reference_id, datastore); +create unique index concurrently missing_object_reference_reference_id_missing_id_datastore on missing_object_reference(reference_id, missing_id, datastore); + +------------------------------------- +-- Issue resolution +------------------------------------- + -- object_origin create unique index concurrently object_origin_pkey on object_origin (object_id, origin_url); diff --git a/swh/scrubber/storage_checker.py b/swh/scrubber/storage_checker.py --- a/swh/scrubber/storage_checker.py +++ b/swh/scrubber/storage_checker.py @@ -5,13 +5,23 @@ """Reads all objects in a swh-storage instance and recomputes their checksums.""" +import collections import contextlib import dataclasses import logging from typing import Iterable, Union from swh.journal.serializers import value_to_kafka -from swh.model.model import Directory, Release, Revision, Snapshot +from swh.model import swhids +from swh.model.model import ( + Content, + Directory, + ObjectType, + Release, + Revision, + Snapshot, + TargetType, +) from swh.storage import backfill from swh.storage.interface import StorageInterface from swh.storage.postgresql.storage import Storage as PostgresqlStorage @@ -20,7 +30,7 @@ logger = logging.getLogger(__name__) -ScrubbableObject = Union[Revision, Release, Snapshot, Directory] +ScrubbableObject = Union[Revision, Release, Snapshot, Directory, Content] @contextlib.contextmanager @@ -93,10 +103,15 @@ ) objects = list(objects) - self.process_objects(objects) + self.check_object_hashes(objects) + self.check_object_references(objects) - def process_objects(self, objects: Iterable[ScrubbableObject]): + def check_object_hashes(self, objects: Iterable[ScrubbableObject]): + """Recomputes hashes, and reports mismatches.""" for object_ in objects: + if isinstance(object_, Content): + # TODO + continue real_id = object_.compute_hash() if object_.id != real_id: self.db.corrupt_object_add( @@ -104,3 +119,113 @@ self.datastore_info(), value_to_kafka(object_.to_dict()), ) + + def check_object_references(self, objects: Iterable[ScrubbableObject]): + """Check all objects references by these objects exist.""" + cnt_references = collections.defaultdict(set) + dir_references = collections.defaultdict(set) + rev_references = collections.defaultdict(set) + rel_references = collections.defaultdict(set) + snp_references = collections.defaultdict(set) + + for object_ in objects: + swhid = object_.swhid() + + if isinstance(object_, Content): + pass + elif isinstance(object_, Directory): + for entry in object_.entries: + if entry.type == "file": + cnt_references[entry.target].add(swhid) + elif entry.type == "dir": + dir_references[entry.target].add(swhid) + elif entry.type == "rev": + # dir->rev holes are not considered a problem because they + # happen whenever git submodules point to repositories that + # were not loaded yet; ignore them + pass + else: + assert False, entry + elif isinstance(object_, Revision): + dir_references[object_.directory].add(swhid) + for parent in object_.parents: + rev_references[parent].add(swhid) + elif isinstance(object_, Release): + if object_.target is None: + pass + elif object_.target_type == ObjectType.CONTENT: + cnt_references[object_.target].add(swhid) + elif object_.target_type == ObjectType.DIRECTORY: + dir_references[object_.target].add(swhid) + elif object_.target_type == ObjectType.REVISION: + rev_references[object_.target].add(swhid) + elif object_.target_type == ObjectType.RELEASE: + rel_references[object_.target].add(swhid) + else: + assert False, object_ + elif isinstance(object_, Snapshot): + for branch in object_.branches.values(): + if branch is None: + pass + elif branch.target_type == TargetType.ALIAS: + pass + elif branch.target_type == TargetType.CONTENT: + cnt_references[branch.target].add(swhid) + elif branch.target_type == TargetType.DIRECTORY: + dir_references[branch.target].add(swhid) + elif branch.target_type == TargetType.REVISION: + rev_references[branch.target].add(swhid) + elif branch.target_type == TargetType.RELEASE: + rel_references[branch.target].add(swhid) + elif branch.target_type == TargetType.SNAPSHOT: + snp_references[branch.target].add(swhid) + else: + assert False, (str(object_.swhid()), branch) + else: + assert False, object_.swhid() + + missing_cnts = self.storage.content_missing_per_sha1_git(list(cnt_references)) + missing_dirs = self.storage.directory_missing(list(dir_references)) + missing_revs = self.storage.revision_missing(list(rev_references)) + missing_rels = self.storage.release_missing(list(rel_references)) + missing_snps = self.storage.snapshot_missing(list(snp_references)) + + for missing_id in missing_cnts: + missing_swhid = swhids.CoreSWHID( + object_type=swhids.ObjectType.CONTENT, object_id=missing_id + ) + self.db.missing_object_add( + missing_swhid, cnt_references[missing_id], self.datastore_info() + ) + + for missing_id in missing_dirs: + missing_swhid = swhids.CoreSWHID( + object_type=swhids.ObjectType.DIRECTORY, object_id=missing_id + ) + self.db.missing_object_add( + missing_swhid, dir_references[missing_id], self.datastore_info() + ) + + for missing_id in missing_revs: + missing_swhid = swhids.CoreSWHID( + object_type=swhids.ObjectType.REVISION, object_id=missing_id + ) + self.db.missing_object_add( + missing_swhid, rev_references[missing_id], self.datastore_info() + ) + + for missing_id in missing_rels: + missing_swhid = swhids.CoreSWHID( + object_type=swhids.ObjectType.RELEASE, object_id=missing_id + ) + self.db.missing_object_add( + missing_swhid, rel_references[missing_id], self.datastore_info() + ) + + for missing_id in missing_snps: + missing_swhid = swhids.CoreSWHID( + object_type=swhids.ObjectType.SNAPSHOT, object_id=missing_id + ) + self.db.missing_object_add( + missing_swhid, snp_references[missing_id], self.datastore_info() + ) diff --git a/swh/scrubber/tests/test_storage_postgresql.py b/swh/scrubber/tests/test_storage_postgresql.py --- a/swh/scrubber/tests/test_storage_postgresql.py +++ b/swh/scrubber/tests/test_storage_postgresql.py @@ -10,12 +10,55 @@ import pytest from swh.journal.serializers import kafka_to_value -from swh.model import swhids +from swh.model import model, swhids from swh.model.tests import swh_model_data from swh.scrubber.storage_checker import StorageChecker from swh.storage.backfill import byte_ranges -# decorator to make swh.storage.backfill use less ranges, so tests run faster +CONTENT1 = model.Content.from_data(b"foo") +DIRECTORY1 = model.Directory( + entries=( + model.DirectoryEntry( + target=CONTENT1.sha1_git, type="file", name=b"file1", perms=0o1 + ), + ) +) +DIRECTORY2 = model.Directory( + entries=( + model.DirectoryEntry( + target=CONTENT1.sha1_git, type="file", name=b"file2", perms=0o1 + ), + model.DirectoryEntry(target=DIRECTORY1.id, type="dir", name=b"dir1", perms=0o1), + model.DirectoryEntry(target=b"\x00" * 20, type="rev", name=b"rev1", perms=0o1), + ) +) +REVISION1 = model.Revision( + message=b"blah", + directory=DIRECTORY2.id, + author=None, + committer=None, + date=None, + committer_date=None, + type=model.RevisionType.GIT, + synthetic=True, +) +RELEASE1 = model.Release( + message=b"blih", + name=b"bluh", + target_type=model.ObjectType.REVISION, + target=REVISION1.id, + synthetic=True, +) +SNAPSHOT1 = model.Snapshot( + branches={ + b"rel1": model.SnapshotBranch( + target_type=model.TargetType.RELEASE, target=RELEASE1.id + ), + } +) + + +# decorator to make swh.storage.backfill use fewer ranges, so tests run faster patch_byte_ranges = unittest.mock.patch( "swh.storage.backfill.byte_ranges", lambda numbits, start, end: byte_ranges(numbits // 8, start, end), @@ -142,3 +185,107 @@ "swh:1:snp:ffffffffffffffffffffffffffffffffffffffff", ] } + + +@patch_byte_ranges +def test_no_hole(scrubber_db, swh_storage): + swh_storage.content_add([CONTENT1]) + swh_storage.directory_add([DIRECTORY1, DIRECTORY2]) + swh_storage.revision_add([REVISION1]) + swh_storage.release_add([RELEASE1]) + swh_storage.snapshot_add([SNAPSHOT1]) + + for object_type in ("snapshot", "release", "revision", "directory"): + StorageChecker( + db=scrubber_db, + storage=swh_storage, + object_type=object_type, + start_object="00" * 20, + end_object="ff" * 20, + ).run() + + assert list(scrubber_db.missing_object_iter()) == [] + + +@pytest.mark.parametrize( + "missing_object", + ["content1", "directory1", "directory2", "revision1", "release1"], +) +@patch_byte_ranges +def test_one_hole(scrubber_db, swh_storage, missing_object): + if missing_object == "content1": + missing_swhid = CONTENT1.swhid() + reference_swhids = [DIRECTORY1.swhid(), DIRECTORY2.swhid()] + else: + swh_storage.content_add([CONTENT1]) + + if missing_object == "directory1": + missing_swhid = DIRECTORY1.swhid() + reference_swhids = [DIRECTORY2.swhid()] + else: + swh_storage.directory_add([DIRECTORY1]) + + if missing_object == "directory2": + missing_swhid = DIRECTORY2.swhid() + reference_swhids = [REVISION1.swhid()] + else: + swh_storage.directory_add([DIRECTORY2]) + + if missing_object == "revision1": + missing_swhid = REVISION1.swhid() + reference_swhids = [RELEASE1.swhid()] + else: + swh_storage.revision_add([REVISION1]) + + if missing_object == "release1": + missing_swhid = RELEASE1.swhid() + reference_swhids = [SNAPSHOT1.swhid()] + else: + swh_storage.release_add([RELEASE1]) + + swh_storage.snapshot_add([SNAPSHOT1]) + + for object_type in ("snapshot", "release", "revision", "directory"): + StorageChecker( + db=scrubber_db, + storage=swh_storage, + object_type=object_type, + start_object="00" * 20, + end_object="ff" * 20, + ).run() + + assert [mo.id for mo in scrubber_db.missing_object_iter()] == [missing_swhid] + assert { + (mor.missing_id, mor.reference_id) + for mor in scrubber_db.missing_object_reference_iter(missing_swhid) + } == {(missing_swhid, reference_swhid) for reference_swhid in reference_swhids} + + +@patch_byte_ranges +def test_two_holes(scrubber_db, swh_storage): + # missing content and revision + swh_storage.directory_add([DIRECTORY1, DIRECTORY2]) + swh_storage.release_add([RELEASE1]) + swh_storage.snapshot_add([SNAPSHOT1]) + + for object_type in ("snapshot", "release", "revision", "directory"): + StorageChecker( + db=scrubber_db, + storage=swh_storage, + object_type=object_type, + start_object="00" * 20, + end_object="ff" * 20, + ).run() + + assert {mo.id for mo in scrubber_db.missing_object_iter()} == { + CONTENT1.swhid(), + REVISION1.swhid(), + } + assert { + mor.reference_id + for mor in scrubber_db.missing_object_reference_iter(CONTENT1.swhid()) + } == {DIRECTORY1.swhid(), DIRECTORY2.swhid()} + assert { + mor.reference_id + for mor in scrubber_db.missing_object_reference_iter(REVISION1.swhid()) + } == {RELEASE1.swhid()}