diff --git a/PKG-INFO b/PKG-INFO index b0d372e..6a80e88 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,61 +1,61 @@ Metadata-Version: 2.1 Name: swh.scrubber -Version: 0.0.6 +Version: 0.1.0 Summary: Software Heritage Datastore Scrubber Home-page: https://forge.softwareheritage.org/diffusion/swh-scrubber Author: Software Heritage developers Author-email: swh-devel@inria.fr Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-scrubber Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-scrubber/ Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 3 - Alpha Requires-Python: >=3.7 Description-Content-Type: text/x-rst Provides-Extra: testing License-File: LICENSE License-File: AUTHORS Software Heritage - Datastore Scrubber ====================================== Tools to periodically checks data integrity in swh-storage and swh-objstorage, reports errors, and (try to) fix them. This is a work in progress; some of the components described below do not exist yet (cassandra storage checker, objstorage checker, recovery, and reinjection) The Scrubber package is made of the following parts: Checking -------- Highly parallel processes continuously read objects from a data store, compute checksums, and write any failure in a database, along with the data of the corrupt object. There is one "checker" for each datastore package: storage (postgresql and cassandra), journal (kafka), and objstorage. Recovery -------- Then, from time to time, jobs go through the list of known corrupt objects, and try to recover the original objects, through various means: * Brute-forcing variations until they match their checksum * Recovering from another data store * As a last resort, recovering from known origins, if any Reinjection ----------- Finally, when an original object is recovered, it is reinjected in the original data store, replacing the corrupt one. diff --git a/swh.scrubber.egg-info/PKG-INFO b/swh.scrubber.egg-info/PKG-INFO index b0d372e..6a80e88 100644 --- a/swh.scrubber.egg-info/PKG-INFO +++ b/swh.scrubber.egg-info/PKG-INFO @@ -1,61 +1,61 @@ Metadata-Version: 2.1 Name: swh.scrubber -Version: 0.0.6 +Version: 0.1.0 Summary: Software Heritage Datastore Scrubber Home-page: https://forge.softwareheritage.org/diffusion/swh-scrubber Author: Software Heritage developers Author-email: swh-devel@inria.fr Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-scrubber Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-scrubber/ Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 3 - Alpha Requires-Python: >=3.7 Description-Content-Type: text/x-rst Provides-Extra: testing License-File: LICENSE License-File: AUTHORS Software Heritage - Datastore Scrubber ====================================== Tools to periodically checks data integrity in swh-storage and swh-objstorage, reports errors, and (try to) fix them. This is a work in progress; some of the components described below do not exist yet (cassandra storage checker, objstorage checker, recovery, and reinjection) The Scrubber package is made of the following parts: Checking -------- Highly parallel processes continuously read objects from a data store, compute checksums, and write any failure in a database, along with the data of the corrupt object. There is one "checker" for each datastore package: storage (postgresql and cassandra), journal (kafka), and objstorage. Recovery -------- Then, from time to time, jobs go through the list of known corrupt objects, and try to recover the original objects, through various means: * Brute-forcing variations until they match their checksum * Recovering from another data store * As a last resort, recovering from known origins, if any Reinjection ----------- Finally, when an original object is recovered, it is reinjected in the original data store, replacing the corrupt one. diff --git a/swh.scrubber.egg-info/SOURCES.txt b/swh.scrubber.egg-info/SOURCES.txt index 69c4c3b..959e1d6 100644 --- a/swh.scrubber.egg-info/SOURCES.txt +++ b/swh.scrubber.egg-info/SOURCES.txt @@ -1,55 +1,56 @@ .git-blame-ignore-revs .gitignore .pre-commit-config.yaml AUTHORS CODE_OF_CONDUCT.md CONTRIBUTORS LICENSE MANIFEST.in Makefile README.rst conftest.py mypy.ini pyproject.toml pytest.ini requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py tox.ini docs/.gitignore docs/Makefile docs/README.rst docs/conf.py docs/index.rst docs/_static/.placeholder docs/_templates/.placeholder swh/__init__.py swh.scrubber.egg-info/PKG-INFO swh.scrubber.egg-info/SOURCES.txt swh.scrubber.egg-info/dependency_links.txt swh.scrubber.egg-info/entry_points.txt swh.scrubber.egg-info/requires.txt swh.scrubber.egg-info/top_level.txt swh/scrubber/__init__.py swh/scrubber/cli.py swh/scrubber/db.py swh/scrubber/fixer.py swh/scrubber/journal_checker.py swh/scrubber/origin_locator.py swh/scrubber/py.typed swh/scrubber/storage_checker.py swh/scrubber/utils.py swh/scrubber/sql/20-enums.sql swh/scrubber/sql/30-schema.sql swh/scrubber/sql/60-indexes.sql swh/scrubber/sql/upgrades/2.sql +swh/scrubber/sql/upgrades/3.sql swh/scrubber/tests/__init__.py swh/scrubber/tests/conftest.py swh/scrubber/tests/test_cli.py swh/scrubber/tests/test_fixer.py swh/scrubber/tests/test_init.py swh/scrubber/tests/test_journal_kafka.py swh/scrubber/tests/test_origin_locator.py swh/scrubber/tests/test_storage_postgresql.py \ No newline at end of file diff --git a/swh/scrubber/db.py b/swh/scrubber/db.py index 092ed60..6b2d767 100644 --- a/swh/scrubber/db.py +++ b/swh/scrubber/db.py @@ -1,320 +1,452 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import dataclasses import datetime import functools -from typing import Iterator, List, Optional +from typing import Iterable, Iterator, List, Optional import psycopg2 from swh.core.db import BaseDb from swh.model.swhids import CoreSWHID @dataclasses.dataclass(frozen=True) class Datastore: """Represents a datastore being scrubbed; eg. swh-storage or swh-journal.""" package: str """'storage', 'journal', or 'objstorage'.""" cls: str """'postgresql'/'cassandra' for storage, 'kafka' for journal, 'pathslicer'/'winery'/... for objstorage.""" instance: str """Human readable string.""" @dataclasses.dataclass(frozen=True) class CorruptObject: id: CoreSWHID datastore: Datastore first_occurrence: datetime.datetime object_: bytes +@dataclasses.dataclass(frozen=True) +class MissingObject: + id: CoreSWHID + datastore: Datastore + first_occurrence: datetime.datetime + + +@dataclasses.dataclass(frozen=True) +class MissingObjectReference: + missing_id: CoreSWHID + reference_id: CoreSWHID + datastore: Datastore + first_occurrence: datetime.datetime + + @dataclasses.dataclass(frozen=True) class FixedObject: id: CoreSWHID object_: bytes method: str recovery_date: Optional[datetime.datetime] = None class ScrubberDb(BaseDb): - current_version = 2 + current_version = 3 + + #################################### + # Shared tables + #################################### @functools.lru_cache(1000) def datastore_get_or_add(self, datastore: Datastore) -> int: """Creates a datastore if it does not exist, and returns its id.""" with self.transaction() as cur: cur.execute( """ WITH inserted AS ( INSERT INTO datastore (package, class, instance) VALUES (%(package)s, %(cls)s, %(instance)s) ON CONFLICT DO NOTHING RETURNING id ) SELECT id FROM inserted UNION ( -- If the datastore already exists, we need to fetch its id SELECT id FROM datastore WHERE package=%(package)s AND class=%(cls)s AND instance=%(instance)s ) LIMIT 1 """, (dataclasses.asdict(datastore)), ) - (id_,) = cur.fetchone() + res = cur.fetchone() + assert res is not None + (id_,) = res return id_ + #################################### + # Inventory of objects with issues + #################################### + def corrupt_object_add( self, id: CoreSWHID, datastore: Datastore, serialized_object: bytes, ) -> None: datastore_id = self.datastore_get_or_add(datastore) with self.transaction() as cur: cur.execute( """ INSERT INTO corrupt_object (id, datastore, object) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING """, (str(id), datastore_id, serialized_object), ) def corrupt_object_iter(self) -> Iterator[CorruptObject]: """Yields all records in the 'corrupt_object' table.""" with self.transaction() as cur: cur.execute( """ SELECT co.id, co.first_occurrence, co.object, ds.package, ds.class, ds.instance FROM corrupt_object AS co INNER JOIN datastore AS ds ON (ds.id=co.datastore) """ ) for row in cur: (id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row yield CorruptObject( id=CoreSWHID.from_string(id), first_occurrence=first_occurrence, object_=object_, datastore=Datastore( package=ds_package, cls=ds_class, instance=ds_instance ), ) def _corrupt_object_list_from_cursor( self, cur: psycopg2.extensions.cursor ) -> List[CorruptObject]: results = [] for row in cur: (id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row results.append( CorruptObject( id=CoreSWHID.from_string(id), first_occurrence=first_occurrence, object_=object_, datastore=Datastore( package=ds_package, cls=ds_class, instance=ds_instance ), ) ) return results def corrupt_object_get( self, start_id: CoreSWHID, end_id: CoreSWHID, limit: int = 100, ) -> List[CorruptObject]: """Yields a page of records in the 'corrupt_object' table, ordered by id. Arguments: start_id: Only return objects after this id end_id: Only return objects before this id in_origin: An origin URL. If provided, only returns objects that may be found in the given origin """ with self.transaction() as cur: cur.execute( """ SELECT co.id, co.first_occurrence, co.object, ds.package, ds.class, ds.instance FROM corrupt_object AS co INNER JOIN datastore AS ds ON (ds.id=co.datastore) WHERE co.id >= %s AND co.id <= %s ORDER BY co.id LIMIT %s """, (str(start_id), str(end_id), limit), ) return self._corrupt_object_list_from_cursor(cur) def corrupt_object_grab_by_id( self, cur: psycopg2.extensions.cursor, start_id: CoreSWHID, end_id: CoreSWHID, limit: int = 100, ) -> List[CorruptObject]: """Returns a page of records in the 'corrupt_object' table for a fixer, ordered by id These records are not already fixed (ie. do not have a corresponding entry in the 'fixed_object' table), and they are selected with an exclusive update lock. Arguments: start_id: Only return objects after this id end_id: Only return objects before this id """ cur.execute( """ SELECT co.id, co.first_occurrence, co.object, ds.package, ds.class, ds.instance FROM corrupt_object AS co INNER JOIN datastore AS ds ON (ds.id=co.datastore) WHERE co.id >= %(start_id)s AND co.id <= %(end_id)s AND NOT EXISTS (SELECT 1 FROM fixed_object WHERE fixed_object.id=co.id) ORDER BY co.id LIMIT %(limit)s FOR UPDATE SKIP LOCKED """, dict( start_id=str(start_id), end_id=str(end_id), limit=limit, ), ) return self._corrupt_object_list_from_cursor(cur) def corrupt_object_grab_by_origin( self, cur: psycopg2.extensions.cursor, origin_url: str, start_id: Optional[CoreSWHID] = None, end_id: Optional[CoreSWHID] = None, limit: int = 100, ) -> List[CorruptObject]: """Returns a page of records in the 'corrupt_object' table for a fixer, ordered by id These records are not already fixed (ie. do not have a corresponding entry in the 'fixed_object' table), and they are selected with an exclusive update lock. Arguments: origin_url: only returns objects that may be found in the given origin """ cur.execute( """ SELECT co.id, co.first_occurrence, co.object, ds.package, ds.class, ds.instance FROM corrupt_object AS co INNER JOIN datastore AS ds ON (ds.id=co.datastore) INNER JOIN object_origin AS oo ON (oo.object_id=co.id) WHERE (co.id >= %(start_id)s OR %(start_id)s IS NULL) AND (co.id <= %(end_id)s OR %(end_id)s IS NULL) AND NOT EXISTS (SELECT 1 FROM fixed_object WHERE fixed_object.id=co.id) AND oo.origin_url=%(origin_url)s ORDER BY co.id LIMIT %(limit)s FOR UPDATE SKIP LOCKED """, dict( start_id=None if start_id is None else str(start_id), end_id=None if end_id is None else str(end_id), origin_url=origin_url, limit=limit, ), ) return self._corrupt_object_list_from_cursor(cur) + def missing_object_add( + self, + id: CoreSWHID, + reference_ids: Iterable[CoreSWHID], + datastore: Datastore, + ) -> None: + """ + Adds a "hole" to the inventory, ie. an object missing from a datastore + that is referenced by an other object of the same datastore. + + If the missing object is already known to be missing by the scrubber database, + this only records the reference (which can be useful to locate an origin + to recover the object from). + If that reference is already known too, this is a noop. + + Args: + id: SWHID of the missing object (the hole) + reference_id: SWHID of the object referencing the missing object + datastore: representation of the swh-storage/swh-journal/... instance + containing this hole + """ + if not reference_ids: + raise ValueError("reference_ids is empty") + datastore_id = self.datastore_get_or_add(datastore) + with self.transaction() as cur: + cur.execute( + """ + INSERT INTO missing_object (id, datastore) + VALUES (%s, %s) + ON CONFLICT DO NOTHING + """, + (str(id), datastore_id), + ) + psycopg2.extras.execute_batch( + cur, + """ + INSERT INTO missing_object_reference (missing_id, reference_id, datastore) + VALUES (%s, %s, %s) + ON CONFLICT DO NOTHING + """, + [ + (str(id), str(reference_id), datastore_id) + for reference_id in reference_ids + ], + ) + + def missing_object_iter(self) -> Iterator[MissingObject]: + """Yields all records in the 'missing_object' table.""" + with self.transaction() as cur: + cur.execute( + """ + SELECT + mo.id, mo.first_occurrence, + ds.package, ds.class, ds.instance + FROM missing_object AS mo + INNER JOIN datastore AS ds ON (ds.id=mo.datastore) + """ + ) + + for row in cur: + (id, first_occurrence, ds_package, ds_class, ds_instance) = row + yield MissingObject( + id=CoreSWHID.from_string(id), + first_occurrence=first_occurrence, + datastore=Datastore( + package=ds_package, cls=ds_class, instance=ds_instance + ), + ) + + def missing_object_reference_iter( + self, missing_id: CoreSWHID + ) -> Iterator[MissingObjectReference]: + """Yields all records in the 'missing_object_reference' table.""" + with self.transaction() as cur: + cur.execute( + """ + SELECT + mor.reference_id, mor.first_occurrence, + ds.package, ds.class, ds.instance + FROM missing_object_reference AS mor + INNER JOIN datastore AS ds ON (ds.id=mor.datastore) + WHERE mor.missing_id=%s + """, + (str(missing_id),), + ) + + for row in cur: + ( + reference_id, + first_occurrence, + ds_package, + ds_class, + ds_instance, + ) = row + yield MissingObjectReference( + missing_id=missing_id, + reference_id=CoreSWHID.from_string(reference_id), + first_occurrence=first_occurrence, + datastore=Datastore( + package=ds_package, cls=ds_class, instance=ds_instance + ), + ) + + #################################### + # Issue resolution + #################################### + def object_origin_add( self, cur: psycopg2.extensions.cursor, swhid: CoreSWHID, origins: List[str] ) -> None: psycopg2.extras.execute_values( cur, """ INSERT INTO object_origin (object_id, origin_url) VALUES %s ON CONFLICT DO NOTHING """, [(str(swhid), origin_url) for origin_url in origins], ) def object_origin_get(self, after: str = "", limit: int = 1000) -> List[str]: """Returns origins with non-fixed corrupt objects, ordered by URL. Arguments: after: if given, only returns origins with an URL after this value """ with self.transaction() as cur: cur.execute( """ SELECT DISTINCT origin_url FROM object_origin WHERE origin_url > %(after)s AND object_id IN ( (SELECT id FROM corrupt_object) EXCEPT (SELECT id FROM fixed_object) ) ORDER BY origin_url LIMIT %(limit)s """, dict(after=after, limit=limit), ) return [origin_url for (origin_url,) in cur] def fixed_object_add( self, cur: psycopg2.extensions.cursor, fixed_objects: List[FixedObject] ) -> None: psycopg2.extras.execute_values( cur, """ INSERT INTO fixed_object (id, object, method) VALUES %s ON CONFLICT DO NOTHING """, [ (str(fixed_object.id), fixed_object.object_, fixed_object.method) for fixed_object in fixed_objects ], ) def fixed_object_iter(self) -> Iterator[FixedObject]: with self.transaction() as cur: cur.execute("SELECT id, object, method, recovery_date FROM fixed_object") for (id, object_, method, recovery_date) in cur: yield FixedObject( id=CoreSWHID.from_string(id), object_=object_, method=method, recovery_date=recovery_date, ) diff --git a/swh/scrubber/sql/30-schema.sql b/swh/scrubber/sql/30-schema.sql index 1d4f423..a67eb67 100644 --- a/swh/scrubber/sql/30-schema.sql +++ b/swh/scrubber/sql/30-schema.sql @@ -1,50 +1,90 @@ +------------------------------------- +-- Shared definitions +------------------------------------- + create domain swhid as text check (value ~ '^swh:[0-9]+:.*'); create table datastore ( id bigserial not null, package datastore_type not null, class text, instance text ); comment on table datastore is 'Each row identifies a data store being scrubbed'; comment on column datastore.id is 'Internal identifier of the datastore'; comment on column datastore.package is 'Name of the component using this datastore (storage/journal/objstorage)'; comment on column datastore.class is 'For datastores with multiple backends, name of the backend (postgresql/cassandra for storage, kafka for journal, pathslicer/azure/winery/... for objstorage)'; comment on column datastore.instance is 'Human-readable way to uniquely identify the datastore; eg. its URL or DSN.'; + +------------------------------------- +-- Inventory of objects with issues +------------------------------------- + create table corrupt_object ( id swhid not null, datastore int not null, object bytea not null, first_occurrence timestamptz not null default now() ); comment on table corrupt_object is 'Each row identifies an object that was found to be corrupt'; comment on column corrupt_object.datastore is 'Datastore the corrupt object was found in.'; comment on column corrupt_object.object is 'Corrupt object, as found in the datastore (possibly msgpack-encoded, using the journal''s serializer)'; comment on column corrupt_object.first_occurrence is 'Moment the object was found to be corrupt for the first time'; + +create table missing_object +( + id swhid not null, + datastore int not null, + first_occurrence timestamptz not null default now() +); + +comment on table missing_object is 'Each row identifies an object that are missing but referenced by another object (aka "holes")'; +comment on column missing_object.datastore is 'Datastore where the hole is.'; +comment on column missing_object.first_occurrence is 'Moment the object was found to be corrupt for the first time'; + +create table missing_object_reference +( + missing_id swhid not null, + reference_id swhid not null, + datastore int not null, + first_occurrence timestamptz not null default now() +); + +comment on table missing_object_reference is 'Each row identifies an object that points to an object that does not exist (aka a "hole")'; +comment on column missing_object_reference.missing_id is 'SWHID of the missing object.'; +comment on column missing_object_reference.reference_id is 'SWHID of the object referencing the missing object.'; +comment on column missing_object_reference.datastore is 'Datastore where the referencing object is.'; +comment on column missing_object_reference.first_occurrence is 'Moment the object was found to reference a missing object'; + + +------------------------------------- +-- Issue resolution +------------------------------------- + create table object_origin ( object_id swhid not null, origin_url text not null, last_attempt timestamptz -- NULL if not tried yet ); comment on table object_origin is 'Maps objects to origins they might be found in.'; create table fixed_object ( id swhid not null, object bytea not null, method text, recovery_date timestamptz not null default now() ); comment on table fixed_object is 'Each row identifies an object that was found to be corrupt, along with the original version of the object'; comment on column fixed_object.object is 'The recovered object itself, as a msgpack-encoded dict'; comment on column fixed_object.recovery_date is 'Moment the object was recovered.'; comment on column fixed_object.method is 'How the object was recovered. For example: "from_origin", "negative_utc", "capitalized_revision_parent".'; diff --git a/swh/scrubber/sql/60-indexes.sql b/swh/scrubber/sql/60-indexes.sql index cdea5fe..88b5e61 100644 --- a/swh/scrubber/sql/60-indexes.sql +++ b/swh/scrubber/sql/60-indexes.sql @@ -1,29 +1,59 @@ +------------------------------------- +-- Shared tables +------------------------------------- + -- datastore create unique index concurrently datastore_pkey on datastore(id); alter table datastore add primary key using index datastore_pkey; create unique index concurrently datastore_package_class_instance on datastore(package, class, instance); +------------------------------------- +-- Inventory of objects with issues +------------------------------------- + -- corrupt_object alter table corrupt_object add constraint corrupt_object_datastore_fkey foreign key (datastore) references datastore(id) not valid; alter table corrupt_object validate constraint corrupt_object_datastore_fkey; create unique index concurrently corrupt_object_pkey on corrupt_object(id, datastore); alter table corrupt_object add primary key using index corrupt_object_pkey; + +-- missing_object + +alter table missing_object add constraint missing_object_datastore_fkey foreign key (datastore) references datastore(id) not valid; +alter table missing_object validate constraint missing_object_datastore_fkey; + +create unique index concurrently missing_object_pkey on missing_object(id, datastore); +alter table missing_object add primary key using index missing_object_pkey; + + +-- missing_object_reference + +alter table missing_object_reference add constraint missing_object_reference_datastore_fkey foreign key (datastore) references datastore(id) not valid; +alter table missing_object_reference validate constraint missing_object_reference_datastore_fkey; + +create unique index concurrently missing_object_reference_missing_id_reference_id_datastore on missing_object_reference(missing_id, reference_id, datastore); +create unique index concurrently missing_object_reference_reference_id_missing_id_datastore on missing_object_reference(reference_id, missing_id, datastore); + +------------------------------------- +-- Issue resolution +------------------------------------- + -- object_origin create unique index concurrently object_origin_pkey on object_origin (object_id, origin_url); create index concurrently object_origin_by_origin on object_origin (origin_url, object_id); -- FIXME: not valid, because corrupt_object(id) is not unique -- alter table object_origin add constraint object_origin_object_fkey foreign key (object_id) references corrupt_object(id) not valid; -- alter table object_origin validate constraint object_origin_object_fkey; -- fixed_object create unique index concurrently fixed_object_pkey on fixed_object(id); alter table fixed_object add primary key using index fixed_object_pkey; diff --git a/swh/scrubber/sql/upgrades/3.sql b/swh/scrubber/sql/upgrades/3.sql new file mode 100644 index 0000000..a376f53 --- /dev/null +++ b/swh/scrubber/sql/upgrades/3.sql @@ -0,0 +1,43 @@ +-- SWH Scrubber DB schema upgrade +-- from_version: 2 +-- to_version: 3 +-- description: Add missing_object + +create table missing_object +( + id swhid not null, + datastore int not null, + first_occurrence timestamptz not null default now() +); + +comment on table missing_object is 'Each row identifies an object that are missing but referenced by another object (aka "holes")'; +comment on column missing_object.datastore is 'Datastore where the hole is.'; +comment on column missing_object.first_occurrence is 'Moment the object was found to be corrupt for the first time'; + +create table missing_object_reference +( + missing_id swhid not null, + reference_id swhid not null, + datastore int not null, + first_occurrence timestamptz not null default now() +); + +comment on table missing_object_reference is 'Each row identifies an object that points to an object that does not exist (aka a "hole")'; +comment on column missing_object_reference.missing_id is 'SWHID of the missing object.'; +comment on column missing_object_reference.reference_id is 'SWHID of the object referencing the missing object.'; +comment on column missing_object_reference.datastore is 'Datastore where the referencing object is.'; +comment on column missing_object_reference.first_occurrence is 'Moment the object was found to reference a missing object'; + + + +alter table missing_object add constraint missing_object_datastore_fkey foreign key (datastore) references datastore(id) not valid; +alter table missing_object validate constraint missing_object_datastore_fkey; + +create unique index concurrently missing_object_pkey on missing_object(id, datastore); +alter table missing_object add primary key using index missing_object_pkey; + +alter table missing_object_reference add constraint missing_object_reference_datastore_fkey foreign key (datastore) references datastore(id) not valid; +alter table missing_object_reference validate constraint missing_object_reference_datastore_fkey; + +create unique index concurrently missing_object_reference_missing_id_reference_id_datastore on missing_object_reference(missing_id, reference_id, datastore); +create unique index concurrently missing_object_reference_reference_id_missing_id_datastore on missing_object_reference(reference_id, missing_id, datastore); diff --git a/swh/scrubber/storage_checker.py b/swh/scrubber/storage_checker.py index f3a42fe..2305fa7 100644 --- a/swh/scrubber/storage_checker.py +++ b/swh/scrubber/storage_checker.py @@ -1,106 +1,280 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Reads all objects in a swh-storage instance and recomputes their checksums.""" +import collections import contextlib import dataclasses import logging from typing import Iterable, Union +from swh.core.statsd import Statsd from swh.journal.serializers import value_to_kafka -from swh.model.model import Directory, Release, Revision, Snapshot +from swh.model import swhids +from swh.model.model import ( + Content, + Directory, + ObjectType, + Release, + Revision, + Snapshot, + TargetType, +) from swh.storage import backfill from swh.storage.interface import StorageInterface from swh.storage.postgresql.storage import Storage as PostgresqlStorage from .db import Datastore, ScrubberDb logger = logging.getLogger(__name__) -ScrubbableObject = Union[Revision, Release, Snapshot, Directory] +ScrubbableObject = Union[Revision, Release, Snapshot, Directory, Content] @contextlib.contextmanager def storage_db(storage): db = storage.get_db() try: yield db finally: storage.put_db(db) @dataclasses.dataclass class StorageChecker: """Reads a chunk of a swh-storage database, recomputes checksums, and reports errors in a separate database.""" db: ScrubberDb storage: StorageInterface object_type: str """``directory``/``revision``/``release``/``snapshot``""" start_object: str """minimum value of the hexdigest of the object's sha1.""" end_object: str """maximum value of the hexdigest of the object's sha1.""" _datastore = None + _statsd = None def datastore_info(self) -> Datastore: """Returns a :class:`Datastore` instance representing the swh-storage instance being checked.""" if self._datastore is None: if isinstance(self.storage, PostgresqlStorage): with storage_db(self.storage) as db: self._datastore = Datastore( package="storage", cls="postgresql", instance=db.conn.dsn, ) else: raise NotImplementedError( f"StorageChecker(storage={self.storage!r}).datastore()" ) return self._datastore + def statsd(self) -> Statsd: + if self._statsd is None: + self._statsd = Statsd( + namespace="swh_scrubber", + constant_tags={"object_type": self.object_type}, + ) + return self._statsd + def run(self): """Runs on all objects of ``object_type`` and with id between ``start_object`` and ``end_object``. """ if isinstance(self.storage, PostgresqlStorage): with storage_db(self.storage) as db: return self._check_postgresql(db) else: raise NotImplementedError( f"StorageChecker(storage={self.storage!r}).check_storage()" ) def _check_postgresql(self, db): for range_start, range_end in backfill.RANGE_GENERATORS[self.object_type]( self.start_object, self.end_object ): - logger.info( + logger.debug( "Processing %s range %s to %s", self.object_type, backfill._format_range_bound(range_start), backfill._format_range_bound(range_end), ) objects = backfill.fetch( db, self.object_type, start=range_start, end=range_end ) objects = list(objects) - self.process_objects(objects) + with self.statsd().timed( + "batch_duration_seconds", tags={"operation": "check_hashes"} + ): + self.check_object_hashes(objects) + with self.statsd().timed( + "batch_duration_seconds", tags={"operation": "check_references"} + ): + self.check_object_references(objects) - def process_objects(self, objects: Iterable[ScrubbableObject]): + def check_object_hashes(self, objects: Iterable[ScrubbableObject]): + """Recomputes hashes, and reports mismatches.""" + count = 0 for object_ in objects: + if isinstance(object_, Content): + # TODO + continue real_id = object_.compute_hash() + count += 1 if object_.id != real_id: + self.statsd().increment("hash_mismatch_total") self.db.corrupt_object_add( object_.swhid(), self.datastore_info(), value_to_kafka(object_.to_dict()), ) + if count: + self.statsd().increment("objects_hashed_total", count) + + def check_object_references(self, objects: Iterable[ScrubbableObject]): + """Check all objects references by these objects exist.""" + cnt_references = collections.defaultdict(set) + dir_references = collections.defaultdict(set) + rev_references = collections.defaultdict(set) + rel_references = collections.defaultdict(set) + snp_references = collections.defaultdict(set) + + for object_ in objects: + swhid = object_.swhid() + + if isinstance(object_, Content): + pass + elif isinstance(object_, Directory): + for entry in object_.entries: + if entry.type == "file": + cnt_references[entry.target].add(swhid) + elif entry.type == "dir": + dir_references[entry.target].add(swhid) + elif entry.type == "rev": + # dir->rev holes are not considered a problem because they + # happen whenever git submodules point to repositories that + # were not loaded yet; ignore them + pass + else: + assert False, entry + elif isinstance(object_, Revision): + dir_references[object_.directory].add(swhid) + for parent in object_.parents: + rev_references[parent].add(swhid) + elif isinstance(object_, Release): + if object_.target is None: + pass + elif object_.target_type == ObjectType.CONTENT: + cnt_references[object_.target].add(swhid) + elif object_.target_type == ObjectType.DIRECTORY: + dir_references[object_.target].add(swhid) + elif object_.target_type == ObjectType.REVISION: + rev_references[object_.target].add(swhid) + elif object_.target_type == ObjectType.RELEASE: + rel_references[object_.target].add(swhid) + else: + assert False, object_ + elif isinstance(object_, Snapshot): + for branch in object_.branches.values(): + if branch is None: + pass + elif branch.target_type == TargetType.ALIAS: + pass + elif branch.target_type == TargetType.CONTENT: + cnt_references[branch.target].add(swhid) + elif branch.target_type == TargetType.DIRECTORY: + dir_references[branch.target].add(swhid) + elif branch.target_type == TargetType.REVISION: + rev_references[branch.target].add(swhid) + elif branch.target_type == TargetType.RELEASE: + rel_references[branch.target].add(swhid) + elif branch.target_type == TargetType.SNAPSHOT: + snp_references[branch.target].add(swhid) + else: + assert False, (str(object_.swhid()), branch) + else: + assert False, object_.swhid() + + missing_cnts = set( + self.storage.content_missing_per_sha1_git(list(cnt_references)) + ) + missing_dirs = set(self.storage.directory_missing(list(dir_references))) + missing_revs = set(self.storage.revision_missing(list(rev_references))) + missing_rels = set(self.storage.release_missing(list(rel_references))) + missing_snps = set(self.storage.snapshot_missing(list(snp_references))) + + self.statsd().increment( + "missing_object_total", + len(missing_cnts), + tags={"target_object_type": "content"}, + ) + self.statsd().increment( + "missing_object_total", + len(missing_dirs), + tags={"target_object_type": "directory"}, + ) + self.statsd().increment( + "missing_object_total", + len(missing_revs), + tags={"target_object_type": "revision"}, + ) + self.statsd().increment( + "missing_object_total", + len(missing_rels), + tags={"target_object_type": "release"}, + ) + self.statsd().increment( + "missing_object_total", + len(missing_snps), + tags={"target_object_type": "snapshot"}, + ) + + for missing_id in missing_cnts: + missing_swhid = swhids.CoreSWHID( + object_type=swhids.ObjectType.CONTENT, object_id=missing_id + ) + self.db.missing_object_add( + missing_swhid, cnt_references[missing_id], self.datastore_info() + ) + + for missing_id in missing_dirs: + missing_swhid = swhids.CoreSWHID( + object_type=swhids.ObjectType.DIRECTORY, object_id=missing_id + ) + self.db.missing_object_add( + missing_swhid, dir_references[missing_id], self.datastore_info() + ) + + for missing_id in missing_revs: + missing_swhid = swhids.CoreSWHID( + object_type=swhids.ObjectType.REVISION, object_id=missing_id + ) + self.db.missing_object_add( + missing_swhid, rev_references[missing_id], self.datastore_info() + ) + + for missing_id in missing_rels: + missing_swhid = swhids.CoreSWHID( + object_type=swhids.ObjectType.RELEASE, object_id=missing_id + ) + self.db.missing_object_add( + missing_swhid, rel_references[missing_id], self.datastore_info() + ) + + for missing_id in missing_snps: + missing_swhid = swhids.CoreSWHID( + object_type=swhids.ObjectType.SNAPSHOT, object_id=missing_id + ) + self.db.missing_object_add( + missing_swhid, snp_references[missing_id], self.datastore_info() + ) diff --git a/swh/scrubber/tests/conftest.py b/swh/scrubber/tests/conftest.py index 19fefb6..f70ced5 100644 --- a/swh/scrubber/tests/conftest.py +++ b/swh/scrubber/tests/conftest.py @@ -1,28 +1,27 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from functools import partial import pytest from pytest_postgresql import factories -from swh.core.db.pytest_plugin import initialize_database_for_module, postgresql_fact +from swh.core.db.pytest_plugin import initialize_database_for_module from swh.scrubber.db import ScrubberDb scrubber_postgresql_proc = factories.postgresql_proc( - dbname="scrubber", load=[partial(initialize_database_for_module, modname="scrubber", version=1)], ) -postgresql_scrubber = postgresql_fact("scrubber_postgresql_proc") +postgresql_scrubber = factories.postgresql("scrubber_postgresql_proc") @pytest.fixture def scrubber_db(postgresql_scrubber): db = ScrubberDb(postgresql_scrubber) with db.conn.cursor() as cur: cur.execute("TRUNCATE TABLE corrupt_object") cur.execute("TRUNCATE TABLE datastore CASCADE") yield db diff --git a/swh/scrubber/tests/test_storage_postgresql.py b/swh/scrubber/tests/test_storage_postgresql.py index cc424fc..efd38d7 100644 --- a/swh/scrubber/tests/test_storage_postgresql.py +++ b/swh/scrubber/tests/test_storage_postgresql.py @@ -1,145 +1,291 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import unittest.mock import attr import pytest from swh.journal.serializers import kafka_to_value -from swh.model import swhids +from swh.model import model, swhids from swh.model.tests import swh_model_data from swh.scrubber.storage_checker import StorageChecker from swh.storage.backfill import byte_ranges -# decorator to make swh.storage.backfill use less ranges, so tests run faster +CONTENT1 = model.Content.from_data(b"foo") +DIRECTORY1 = model.Directory( + entries=( + model.DirectoryEntry( + target=CONTENT1.sha1_git, type="file", name=b"file1", perms=0o1 + ), + ) +) +DIRECTORY2 = model.Directory( + entries=( + model.DirectoryEntry( + target=CONTENT1.sha1_git, type="file", name=b"file2", perms=0o1 + ), + model.DirectoryEntry(target=DIRECTORY1.id, type="dir", name=b"dir1", perms=0o1), + model.DirectoryEntry(target=b"\x00" * 20, type="rev", name=b"rev1", perms=0o1), + ) +) +REVISION1 = model.Revision( + message=b"blah", + directory=DIRECTORY2.id, + author=None, + committer=None, + date=None, + committer_date=None, + type=model.RevisionType.GIT, + synthetic=True, +) +RELEASE1 = model.Release( + message=b"blih", + name=b"bluh", + target_type=model.ObjectType.REVISION, + target=REVISION1.id, + synthetic=True, +) +SNAPSHOT1 = model.Snapshot( + branches={ + b"rel1": model.SnapshotBranch( + target_type=model.TargetType.RELEASE, target=RELEASE1.id + ), + } +) + + +# decorator to make swh.storage.backfill use fewer ranges, so tests run faster patch_byte_ranges = unittest.mock.patch( "swh.storage.backfill.byte_ranges", lambda numbits, start, end: byte_ranges(numbits // 8, start, end), ) @patch_byte_ranges def test_no_corruption(scrubber_db, swh_storage): swh_storage.directory_add(swh_model_data.DIRECTORIES) swh_storage.revision_add(swh_model_data.REVISIONS) swh_storage.release_add(swh_model_data.RELEASES) swh_storage.snapshot_add(swh_model_data.SNAPSHOTS) for object_type in ("snapshot", "release", "revision", "directory"): StorageChecker( db=scrubber_db, storage=swh_storage, object_type=object_type, start_object="00" * 20, end_object="ff" * 20, ).run() assert list(scrubber_db.corrupt_object_iter()) == [] @pytest.mark.parametrize("corrupt_idx", range(len(swh_model_data.SNAPSHOTS))) @patch_byte_ranges def test_corrupt_snapshot(scrubber_db, swh_storage, corrupt_idx): + storage_dsn = swh_storage.get_db().conn.dsn snapshots = list(swh_model_data.SNAPSHOTS) snapshots[corrupt_idx] = attr.evolve(snapshots[corrupt_idx], id=b"\x00" * 20) swh_storage.snapshot_add(snapshots) before_date = datetime.datetime.now(tz=datetime.timezone.utc) for object_type in ("snapshot", "release", "revision", "directory"): StorageChecker( db=scrubber_db, storage=swh_storage, object_type=object_type, start_object="00" * 20, end_object="ff" * 20, ).run() after_date = datetime.datetime.now(tz=datetime.timezone.utc) corrupt_objects = list(scrubber_db.corrupt_object_iter()) assert len(corrupt_objects) == 1 assert corrupt_objects[0].id == swhids.CoreSWHID.from_string( "swh:1:snp:0000000000000000000000000000000000000000" ) assert corrupt_objects[0].datastore.package == "storage" assert corrupt_objects[0].datastore.cls == "postgresql" - assert corrupt_objects[0].datastore.instance.startswith( - "user=postgres password=xxx dbname=storage host=" - ) + assert corrupt_objects[0].datastore.instance.startswith(storage_dsn) assert ( before_date - datetime.timedelta(seconds=5) <= corrupt_objects[0].first_occurrence <= after_date + datetime.timedelta(seconds=5) ) assert ( kafka_to_value(corrupt_objects[0].object_) == snapshots[corrupt_idx].to_dict() ) @patch_byte_ranges def test_corrupt_snapshots_same_batch(scrubber_db, swh_storage): snapshots = list(swh_model_data.SNAPSHOTS) for i in (0, 1): snapshots[i] = attr.evolve(snapshots[i], id=bytes([i]) * 20) swh_storage.snapshot_add(snapshots) StorageChecker( db=scrubber_db, storage=swh_storage, object_type="snapshot", start_object="00" * 20, end_object="ff" * 20, ).run() corrupt_objects = list(scrubber_db.corrupt_object_iter()) assert len(corrupt_objects) == 2 assert {co.id for co in corrupt_objects} == { swhids.CoreSWHID.from_string(swhid) for swhid in [ "swh:1:snp:0000000000000000000000000000000000000000", "swh:1:snp:0101010101010101010101010101010101010101", ] } @patch_byte_ranges def test_corrupt_snapshots_different_batches(scrubber_db, swh_storage): snapshots = list(swh_model_data.SNAPSHOTS) for i in (0, 1): snapshots[i] = attr.evolve(snapshots[i], id=bytes([i * 255]) * 20) swh_storage.snapshot_add(snapshots) StorageChecker( db=scrubber_db, storage=swh_storage, object_type="snapshot", start_object="00" * 20, end_object="87" * 20, ).run() corrupt_objects = list(scrubber_db.corrupt_object_iter()) assert len(corrupt_objects) == 1 # Simulates resuming from a different process, with an empty lru_cache scrubber_db.datastore_get_or_add.cache_clear() StorageChecker( db=scrubber_db, storage=swh_storage, object_type="snapshot", start_object="88" * 20, end_object="ff" * 20, ).run() corrupt_objects = list(scrubber_db.corrupt_object_iter()) assert len(corrupt_objects) == 2 assert {co.id for co in corrupt_objects} == { swhids.CoreSWHID.from_string(swhid) for swhid in [ "swh:1:snp:0000000000000000000000000000000000000000", "swh:1:snp:ffffffffffffffffffffffffffffffffffffffff", ] } + + +@patch_byte_ranges +def test_no_hole(scrubber_db, swh_storage): + swh_storage.content_add([CONTENT1]) + swh_storage.directory_add([DIRECTORY1, DIRECTORY2]) + swh_storage.revision_add([REVISION1]) + swh_storage.release_add([RELEASE1]) + swh_storage.snapshot_add([SNAPSHOT1]) + + for object_type in ("snapshot", "release", "revision", "directory"): + StorageChecker( + db=scrubber_db, + storage=swh_storage, + object_type=object_type, + start_object="00" * 20, + end_object="ff" * 20, + ).run() + + assert list(scrubber_db.missing_object_iter()) == [] + + +@pytest.mark.parametrize( + "missing_object", + ["content1", "directory1", "directory2", "revision1", "release1"], +) +@patch_byte_ranges +def test_one_hole(scrubber_db, swh_storage, missing_object): + if missing_object == "content1": + missing_swhid = CONTENT1.swhid() + reference_swhids = [DIRECTORY1.swhid(), DIRECTORY2.swhid()] + else: + swh_storage.content_add([CONTENT1]) + + if missing_object == "directory1": + missing_swhid = DIRECTORY1.swhid() + reference_swhids = [DIRECTORY2.swhid()] + else: + swh_storage.directory_add([DIRECTORY1]) + + if missing_object == "directory2": + missing_swhid = DIRECTORY2.swhid() + reference_swhids = [REVISION1.swhid()] + else: + swh_storage.directory_add([DIRECTORY2]) + + if missing_object == "revision1": + missing_swhid = REVISION1.swhid() + reference_swhids = [RELEASE1.swhid()] + else: + swh_storage.revision_add([REVISION1]) + + if missing_object == "release1": + missing_swhid = RELEASE1.swhid() + reference_swhids = [SNAPSHOT1.swhid()] + else: + swh_storage.release_add([RELEASE1]) + + swh_storage.snapshot_add([SNAPSHOT1]) + + for object_type in ("snapshot", "release", "revision", "directory"): + StorageChecker( + db=scrubber_db, + storage=swh_storage, + object_type=object_type, + start_object="00" * 20, + end_object="ff" * 20, + ).run() + + assert [mo.id for mo in scrubber_db.missing_object_iter()] == [missing_swhid] + assert { + (mor.missing_id, mor.reference_id) + for mor in scrubber_db.missing_object_reference_iter(missing_swhid) + } == {(missing_swhid, reference_swhid) for reference_swhid in reference_swhids} + + +@patch_byte_ranges +def test_two_holes(scrubber_db, swh_storage): + # missing content and revision + swh_storage.directory_add([DIRECTORY1, DIRECTORY2]) + swh_storage.release_add([RELEASE1]) + swh_storage.snapshot_add([SNAPSHOT1]) + + for object_type in ("snapshot", "release", "revision", "directory"): + StorageChecker( + db=scrubber_db, + storage=swh_storage, + object_type=object_type, + start_object="00" * 20, + end_object="ff" * 20, + ).run() + + assert {mo.id for mo in scrubber_db.missing_object_iter()} == { + CONTENT1.swhid(), + REVISION1.swhid(), + } + assert { + mor.reference_id + for mor in scrubber_db.missing_object_reference_iter(CONTENT1.swhid()) + } == {DIRECTORY1.swhid(), DIRECTORY2.swhid()} + assert { + mor.reference_id + for mor in scrubber_db.missing_object_reference_iter(REVISION1.swhid()) + } == {RELEASE1.swhid()}