diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 2cb9644..2665767 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,48 +1,49 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v2.4.0 hooks: - id: trailing-whitespace - id: check-json - id: check-yaml - repo: https://gitlab.com/pycqa/flake8 rev: 3.8.3 hooks: - id: flake8 - repo: https://github.com/codespell-project/codespell rev: v1.16.0 hooks: - id: codespell + args: [-L mor] - repo: local hooks: - id: mypy name: mypy entry: mypy args: [swh] pass_filenames: false language: system types: [python] # unfortunately, we are far from being able to enable this... # - repo: https://github.com/PyCQA/pydocstyle.git # rev: 4.0.0 # hooks: # - id: pydocstyle # name: pydocstyle # description: pydocstyle is a static analysis tool for checking compliance with Python docstring conventions. # entry: pydocstyle --convention=google # language: python # types: [python] - repo: https://github.com/PyCQA/isort rev: 5.5.2 hooks: - id: isort - repo: https://github.com/python/black rev: 22.3.0 hooks: - id: black diff --git a/PKG-INFO b/PKG-INFO index 6a80e88..33a2bcf 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,61 +1,65 @@ Metadata-Version: 2.1 Name: swh.scrubber -Version: 0.1.0 +Version: 0.1.1 Summary: Software Heritage Datastore Scrubber Home-page: https://forge.softwareheritage.org/diffusion/swh-scrubber Author: Software Heritage developers Author-email: swh-devel@inria.fr Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-scrubber Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-scrubber/ Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 3 - Alpha Requires-Python: >=3.7 Description-Content-Type: text/x-rst Provides-Extra: testing License-File: LICENSE License-File: AUTHORS Software Heritage - Datastore Scrubber ====================================== Tools to periodically checks data integrity in swh-storage and swh-objstorage, reports errors, and (try to) fix them. This is a work in progress; some of the components described below do not exist yet (cassandra storage checker, objstorage checker, recovery, and reinjection) The Scrubber package is made of the following parts: Checking -------- Highly parallel processes continuously read objects from a data store, compute checksums, and write any failure in a database, along with the data of the corrupt object. There is one "checker" for each datastore package: storage (postgresql and cassandra), journal (kafka), and objstorage. +The journal is "crawled" using its native streaming; others are crawled by range, +reusing swh-storage's backfiller utilities, and checkpointed from time to time +to the scrubber's database (in the ``checked_range`` table). + Recovery -------- Then, from time to time, jobs go through the list of known corrupt objects, and try to recover the original objects, through various means: * Brute-forcing variations until they match their checksum * Recovering from another data store * As a last resort, recovering from known origins, if any Reinjection ----------- Finally, when an original object is recovered, it is reinjected in the original data store, replacing the corrupt one. diff --git a/README.rst b/README.rst index bc3ab62..01a5b56 100644 --- a/README.rst +++ b/README.rst @@ -1,39 +1,43 @@ Software Heritage - Datastore Scrubber ====================================== Tools to periodically checks data integrity in swh-storage and swh-objstorage, reports errors, and (try to) fix them. This is a work in progress; some of the components described below do not exist yet (cassandra storage checker, objstorage checker, recovery, and reinjection) The Scrubber package is made of the following parts: Checking -------- Highly parallel processes continuously read objects from a data store, compute checksums, and write any failure in a database, along with the data of the corrupt object. There is one "checker" for each datastore package: storage (postgresql and cassandra), journal (kafka), and objstorage. +The journal is "crawled" using its native streaming; others are crawled by range, +reusing swh-storage's backfiller utilities, and checkpointed from time to time +to the scrubber's database (in the ``checked_range`` table). + Recovery -------- Then, from time to time, jobs go through the list of known corrupt objects, and try to recover the original objects, through various means: * Brute-forcing variations until they match their checksum * Recovering from another data store * As a last resort, recovering from known origins, if any Reinjection ----------- Finally, when an original object is recovered, it is reinjected in the original data store, replacing the corrupt one. diff --git a/docs/README.rst b/docs/README.rst index bc3ab62..01a5b56 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -1,39 +1,43 @@ Software Heritage - Datastore Scrubber ====================================== Tools to periodically checks data integrity in swh-storage and swh-objstorage, reports errors, and (try to) fix them. This is a work in progress; some of the components described below do not exist yet (cassandra storage checker, objstorage checker, recovery, and reinjection) The Scrubber package is made of the following parts: Checking -------- Highly parallel processes continuously read objects from a data store, compute checksums, and write any failure in a database, along with the data of the corrupt object. There is one "checker" for each datastore package: storage (postgresql and cassandra), journal (kafka), and objstorage. +The journal is "crawled" using its native streaming; others are crawled by range, +reusing swh-storage's backfiller utilities, and checkpointed from time to time +to the scrubber's database (in the ``checked_range`` table). + Recovery -------- Then, from time to time, jobs go through the list of known corrupt objects, and try to recover the original objects, through various means: * Brute-forcing variations until they match their checksum * Recovering from another data store * As a last resort, recovering from known origins, if any Reinjection ----------- Finally, when an original object is recovered, it is reinjected in the original data store, replacing the corrupt one. diff --git a/swh.scrubber.egg-info/PKG-INFO b/swh.scrubber.egg-info/PKG-INFO index 6a80e88..33a2bcf 100644 --- a/swh.scrubber.egg-info/PKG-INFO +++ b/swh.scrubber.egg-info/PKG-INFO @@ -1,61 +1,65 @@ Metadata-Version: 2.1 Name: swh.scrubber -Version: 0.1.0 +Version: 0.1.1 Summary: Software Heritage Datastore Scrubber Home-page: https://forge.softwareheritage.org/diffusion/swh-scrubber Author: Software Heritage developers Author-email: swh-devel@inria.fr Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-scrubber Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-scrubber/ Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 3 - Alpha Requires-Python: >=3.7 Description-Content-Type: text/x-rst Provides-Extra: testing License-File: LICENSE License-File: AUTHORS Software Heritage - Datastore Scrubber ====================================== Tools to periodically checks data integrity in swh-storage and swh-objstorage, reports errors, and (try to) fix them. This is a work in progress; some of the components described below do not exist yet (cassandra storage checker, objstorage checker, recovery, and reinjection) The Scrubber package is made of the following parts: Checking -------- Highly parallel processes continuously read objects from a data store, compute checksums, and write any failure in a database, along with the data of the corrupt object. There is one "checker" for each datastore package: storage (postgresql and cassandra), journal (kafka), and objstorage. +The journal is "crawled" using its native streaming; others are crawled by range, +reusing swh-storage's backfiller utilities, and checkpointed from time to time +to the scrubber's database (in the ``checked_range`` table). + Recovery -------- Then, from time to time, jobs go through the list of known corrupt objects, and try to recover the original objects, through various means: * Brute-forcing variations until they match their checksum * Recovering from another data store * As a last resort, recovering from known origins, if any Reinjection ----------- Finally, when an original object is recovered, it is reinjected in the original data store, replacing the corrupt one. diff --git a/swh.scrubber.egg-info/SOURCES.txt b/swh.scrubber.egg-info/SOURCES.txt index 959e1d6..d88ef3d 100644 --- a/swh.scrubber.egg-info/SOURCES.txt +++ b/swh.scrubber.egg-info/SOURCES.txt @@ -1,56 +1,58 @@ .git-blame-ignore-revs .gitignore .pre-commit-config.yaml AUTHORS CODE_OF_CONDUCT.md CONTRIBUTORS LICENSE MANIFEST.in Makefile README.rst conftest.py mypy.ini pyproject.toml pytest.ini requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py tox.ini docs/.gitignore docs/Makefile docs/README.rst docs/conf.py docs/index.rst docs/_static/.placeholder docs/_templates/.placeholder swh/__init__.py swh.scrubber.egg-info/PKG-INFO swh.scrubber.egg-info/SOURCES.txt swh.scrubber.egg-info/dependency_links.txt swh.scrubber.egg-info/entry_points.txt swh.scrubber.egg-info/requires.txt swh.scrubber.egg-info/top_level.txt swh/scrubber/__init__.py swh/scrubber/cli.py swh/scrubber/db.py swh/scrubber/fixer.py swh/scrubber/journal_checker.py swh/scrubber/origin_locator.py swh/scrubber/py.typed swh/scrubber/storage_checker.py swh/scrubber/utils.py swh/scrubber/sql/20-enums.sql swh/scrubber/sql/30-schema.sql swh/scrubber/sql/60-indexes.sql swh/scrubber/sql/upgrades/2.sql swh/scrubber/sql/upgrades/3.sql +swh/scrubber/sql/upgrades/4.sql swh/scrubber/tests/__init__.py swh/scrubber/tests/conftest.py swh/scrubber/tests/test_cli.py +swh/scrubber/tests/test_db.py swh/scrubber/tests/test_fixer.py swh/scrubber/tests/test_init.py swh/scrubber/tests/test_journal_kafka.py swh/scrubber/tests/test_origin_locator.py swh/scrubber/tests/test_storage_postgresql.py \ No newline at end of file diff --git a/swh/scrubber/db.py b/swh/scrubber/db.py index 6b2d767..8178d03 100644 --- a/swh/scrubber/db.py +++ b/swh/scrubber/db.py @@ -1,452 +1,527 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import dataclasses import datetime import functools -from typing import Iterable, Iterator, List, Optional +from typing import Iterable, Iterator, List, Optional, Tuple import psycopg2 from swh.core.db import BaseDb from swh.model.swhids import CoreSWHID @dataclasses.dataclass(frozen=True) class Datastore: """Represents a datastore being scrubbed; eg. swh-storage or swh-journal.""" package: str """'storage', 'journal', or 'objstorage'.""" cls: str """'postgresql'/'cassandra' for storage, 'kafka' for journal, 'pathslicer'/'winery'/... for objstorage.""" instance: str """Human readable string.""" @dataclasses.dataclass(frozen=True) class CorruptObject: id: CoreSWHID datastore: Datastore first_occurrence: datetime.datetime object_: bytes @dataclasses.dataclass(frozen=True) class MissingObject: id: CoreSWHID datastore: Datastore first_occurrence: datetime.datetime @dataclasses.dataclass(frozen=True) class MissingObjectReference: missing_id: CoreSWHID reference_id: CoreSWHID datastore: Datastore first_occurrence: datetime.datetime @dataclasses.dataclass(frozen=True) class FixedObject: id: CoreSWHID object_: bytes method: str recovery_date: Optional[datetime.datetime] = None class ScrubberDb(BaseDb): - current_version = 3 + current_version = 4 #################################### # Shared tables #################################### @functools.lru_cache(1000) def datastore_get_or_add(self, datastore: Datastore) -> int: """Creates a datastore if it does not exist, and returns its id.""" with self.transaction() as cur: cur.execute( """ WITH inserted AS ( INSERT INTO datastore (package, class, instance) VALUES (%(package)s, %(cls)s, %(instance)s) ON CONFLICT DO NOTHING RETURNING id ) SELECT id FROM inserted UNION ( -- If the datastore already exists, we need to fetch its id SELECT id FROM datastore WHERE package=%(package)s AND class=%(cls)s AND instance=%(instance)s ) LIMIT 1 """, (dataclasses.asdict(datastore)), ) res = cur.fetchone() assert res is not None (id_,) = res return id_ + #################################### + # Checkpointing/progress tracking + #################################### + + def checked_range_upsert( + self, + datastore: Datastore, + range_start: CoreSWHID, + range_end: CoreSWHID, + date: datetime.datetime, + ) -> None: + """ + Records in the database the given range was last checked at the given date. + """ + datastore_id = self.datastore_get_or_add(datastore) + with self.transaction() as cur: + cur.execute( + """ + INSERT INTO checked_range(datastore, range_start, range_end, last_date) + VALUES (%s, %s, %s, %s) + ON CONFLICT (datastore, range_start, range_end) DO UPDATE + SET last_date = GREATEST(checked_range.last_date, EXCLUDED.last_date) + """, + (datastore_id, str(range_start), str(range_end), date), + ) + + def checked_range_get_last_date( + self, datastore: Datastore, range_start: CoreSWHID, range_end: CoreSWHID + ) -> Optional[datetime.datetime]: + """ + Returns the last date the given range was checked in the given datastore, + or :const:`None` if it was never checked. + + Currently, this matches range boundaries exactly, with no regard for + ranges that contain or are contained by it. + """ + datastore_id = self.datastore_get_or_add(datastore) + with self.transaction() as cur: + cur.execute( + """ + SELECT last_date + FROM checked_range + WHERE datastore=%s AND range_start=%s AND range_end=%s + """, + (datastore_id, str(range_start), str(range_end)), + ) + + res = cur.fetchone() + if res is None: + return None + else: + (date,) = res + return date + + def checked_range_iter( + self, datastore: Datastore + ) -> Iterator[Tuple[CoreSWHID, CoreSWHID, datetime.datetime]]: + datastore_id = self.datastore_get_or_add(datastore) + with self.transaction() as cur: + cur.execute( + """ + SELECT range_start, range_end, last_date + FROM checked_range + WHERE datastore=%s + """, + (datastore_id,), + ) + + for (range_start, range_end, last_date) in cur: + yield ( + CoreSWHID.from_string(range_start), + CoreSWHID.from_string(range_end), + last_date, + ) + #################################### # Inventory of objects with issues #################################### def corrupt_object_add( self, id: CoreSWHID, datastore: Datastore, serialized_object: bytes, ) -> None: datastore_id = self.datastore_get_or_add(datastore) with self.transaction() as cur: cur.execute( """ INSERT INTO corrupt_object (id, datastore, object) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING """, (str(id), datastore_id, serialized_object), ) def corrupt_object_iter(self) -> Iterator[CorruptObject]: """Yields all records in the 'corrupt_object' table.""" with self.transaction() as cur: cur.execute( """ SELECT co.id, co.first_occurrence, co.object, ds.package, ds.class, ds.instance FROM corrupt_object AS co INNER JOIN datastore AS ds ON (ds.id=co.datastore) """ ) for row in cur: (id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row yield CorruptObject( id=CoreSWHID.from_string(id), first_occurrence=first_occurrence, object_=object_, datastore=Datastore( package=ds_package, cls=ds_class, instance=ds_instance ), ) def _corrupt_object_list_from_cursor( self, cur: psycopg2.extensions.cursor ) -> List[CorruptObject]: results = [] for row in cur: (id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row results.append( CorruptObject( id=CoreSWHID.from_string(id), first_occurrence=first_occurrence, object_=object_, datastore=Datastore( package=ds_package, cls=ds_class, instance=ds_instance ), ) ) return results def corrupt_object_get( self, start_id: CoreSWHID, end_id: CoreSWHID, limit: int = 100, ) -> List[CorruptObject]: """Yields a page of records in the 'corrupt_object' table, ordered by id. Arguments: start_id: Only return objects after this id end_id: Only return objects before this id in_origin: An origin URL. If provided, only returns objects that may be found in the given origin """ with self.transaction() as cur: cur.execute( """ SELECT co.id, co.first_occurrence, co.object, ds.package, ds.class, ds.instance FROM corrupt_object AS co INNER JOIN datastore AS ds ON (ds.id=co.datastore) WHERE co.id >= %s AND co.id <= %s ORDER BY co.id LIMIT %s """, (str(start_id), str(end_id), limit), ) return self._corrupt_object_list_from_cursor(cur) def corrupt_object_grab_by_id( self, cur: psycopg2.extensions.cursor, start_id: CoreSWHID, end_id: CoreSWHID, limit: int = 100, ) -> List[CorruptObject]: """Returns a page of records in the 'corrupt_object' table for a fixer, ordered by id These records are not already fixed (ie. do not have a corresponding entry in the 'fixed_object' table), and they are selected with an exclusive update lock. Arguments: start_id: Only return objects after this id end_id: Only return objects before this id """ cur.execute( """ SELECT co.id, co.first_occurrence, co.object, ds.package, ds.class, ds.instance FROM corrupt_object AS co INNER JOIN datastore AS ds ON (ds.id=co.datastore) WHERE co.id >= %(start_id)s AND co.id <= %(end_id)s AND NOT EXISTS (SELECT 1 FROM fixed_object WHERE fixed_object.id=co.id) ORDER BY co.id LIMIT %(limit)s FOR UPDATE SKIP LOCKED """, dict( start_id=str(start_id), end_id=str(end_id), limit=limit, ), ) return self._corrupt_object_list_from_cursor(cur) def corrupt_object_grab_by_origin( self, cur: psycopg2.extensions.cursor, origin_url: str, start_id: Optional[CoreSWHID] = None, end_id: Optional[CoreSWHID] = None, limit: int = 100, ) -> List[CorruptObject]: """Returns a page of records in the 'corrupt_object' table for a fixer, ordered by id These records are not already fixed (ie. do not have a corresponding entry in the 'fixed_object' table), and they are selected with an exclusive update lock. Arguments: origin_url: only returns objects that may be found in the given origin """ cur.execute( """ SELECT co.id, co.first_occurrence, co.object, ds.package, ds.class, ds.instance FROM corrupt_object AS co INNER JOIN datastore AS ds ON (ds.id=co.datastore) INNER JOIN object_origin AS oo ON (oo.object_id=co.id) WHERE (co.id >= %(start_id)s OR %(start_id)s IS NULL) AND (co.id <= %(end_id)s OR %(end_id)s IS NULL) AND NOT EXISTS (SELECT 1 FROM fixed_object WHERE fixed_object.id=co.id) AND oo.origin_url=%(origin_url)s ORDER BY co.id LIMIT %(limit)s FOR UPDATE SKIP LOCKED """, dict( start_id=None if start_id is None else str(start_id), end_id=None if end_id is None else str(end_id), origin_url=origin_url, limit=limit, ), ) return self._corrupt_object_list_from_cursor(cur) def missing_object_add( self, id: CoreSWHID, reference_ids: Iterable[CoreSWHID], datastore: Datastore, ) -> None: """ Adds a "hole" to the inventory, ie. an object missing from a datastore that is referenced by an other object of the same datastore. If the missing object is already known to be missing by the scrubber database, this only records the reference (which can be useful to locate an origin to recover the object from). If that reference is already known too, this is a noop. Args: id: SWHID of the missing object (the hole) reference_id: SWHID of the object referencing the missing object datastore: representation of the swh-storage/swh-journal/... instance containing this hole """ if not reference_ids: raise ValueError("reference_ids is empty") datastore_id = self.datastore_get_or_add(datastore) with self.transaction() as cur: cur.execute( """ INSERT INTO missing_object (id, datastore) VALUES (%s, %s) ON CONFLICT DO NOTHING """, (str(id), datastore_id), ) psycopg2.extras.execute_batch( cur, """ INSERT INTO missing_object_reference (missing_id, reference_id, datastore) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING """, [ (str(id), str(reference_id), datastore_id) for reference_id in reference_ids ], ) def missing_object_iter(self) -> Iterator[MissingObject]: """Yields all records in the 'missing_object' table.""" with self.transaction() as cur: cur.execute( """ SELECT mo.id, mo.first_occurrence, ds.package, ds.class, ds.instance FROM missing_object AS mo INNER JOIN datastore AS ds ON (ds.id=mo.datastore) """ ) for row in cur: (id, first_occurrence, ds_package, ds_class, ds_instance) = row yield MissingObject( id=CoreSWHID.from_string(id), first_occurrence=first_occurrence, datastore=Datastore( package=ds_package, cls=ds_class, instance=ds_instance ), ) def missing_object_reference_iter( self, missing_id: CoreSWHID ) -> Iterator[MissingObjectReference]: """Yields all records in the 'missing_object_reference' table.""" with self.transaction() as cur: cur.execute( """ SELECT mor.reference_id, mor.first_occurrence, ds.package, ds.class, ds.instance FROM missing_object_reference AS mor INNER JOIN datastore AS ds ON (ds.id=mor.datastore) WHERE mor.missing_id=%s """, (str(missing_id),), ) for row in cur: ( reference_id, first_occurrence, ds_package, ds_class, ds_instance, ) = row yield MissingObjectReference( missing_id=missing_id, reference_id=CoreSWHID.from_string(reference_id), first_occurrence=first_occurrence, datastore=Datastore( package=ds_package, cls=ds_class, instance=ds_instance ), ) #################################### # Issue resolution #################################### def object_origin_add( self, cur: psycopg2.extensions.cursor, swhid: CoreSWHID, origins: List[str] ) -> None: psycopg2.extras.execute_values( cur, """ INSERT INTO object_origin (object_id, origin_url) VALUES %s ON CONFLICT DO NOTHING """, [(str(swhid), origin_url) for origin_url in origins], ) def object_origin_get(self, after: str = "", limit: int = 1000) -> List[str]: """Returns origins with non-fixed corrupt objects, ordered by URL. Arguments: after: if given, only returns origins with an URL after this value """ with self.transaction() as cur: cur.execute( """ SELECT DISTINCT origin_url FROM object_origin WHERE origin_url > %(after)s AND object_id IN ( (SELECT id FROM corrupt_object) EXCEPT (SELECT id FROM fixed_object) ) ORDER BY origin_url LIMIT %(limit)s """, dict(after=after, limit=limit), ) return [origin_url for (origin_url,) in cur] def fixed_object_add( self, cur: psycopg2.extensions.cursor, fixed_objects: List[FixedObject] ) -> None: psycopg2.extras.execute_values( cur, """ INSERT INTO fixed_object (id, object, method) VALUES %s ON CONFLICT DO NOTHING """, [ (str(fixed_object.id), fixed_object.object_, fixed_object.method) for fixed_object in fixed_objects ], ) def fixed_object_iter(self) -> Iterator[FixedObject]: with self.transaction() as cur: cur.execute("SELECT id, object, method, recovery_date FROM fixed_object") for (id, object_, method, recovery_date) in cur: yield FixedObject( id=CoreSWHID.from_string(id), object_=object_, method=method, recovery_date=recovery_date, ) diff --git a/swh/scrubber/sql/30-schema.sql b/swh/scrubber/sql/30-schema.sql index a67eb67..b28ea3c 100644 --- a/swh/scrubber/sql/30-schema.sql +++ b/swh/scrubber/sql/30-schema.sql @@ -1,90 +1,107 @@ ------------------------------------- -- Shared definitions ------------------------------------- create domain swhid as text check (value ~ '^swh:[0-9]+:.*'); create table datastore ( id bigserial not null, package datastore_type not null, class text, instance text ); comment on table datastore is 'Each row identifies a data store being scrubbed'; comment on column datastore.id is 'Internal identifier of the datastore'; comment on column datastore.package is 'Name of the component using this datastore (storage/journal/objstorage)'; comment on column datastore.class is 'For datastores with multiple backends, name of the backend (postgresql/cassandra for storage, kafka for journal, pathslicer/azure/winery/... for objstorage)'; comment on column datastore.instance is 'Human-readable way to uniquely identify the datastore; eg. its URL or DSN.'; +------------------------------------- +-- Checkpointing/progress tracking +------------------------------------- + +create table checked_range +( + datastore int not null, + range_start swhid not null, + range_end swhid not null, + last_date timestamptz not null +); + +comment on table checked_range is 'Each row represents a range of objects in a datastore that were fetched, checksumed, and checked at some point in the past.'; +comment on column checked_range.range_start is 'First SWHID of the range that was checked (inclusive, possibly non-existent).'; +comment on column checked_range.range_end is 'Last SWHID of the range that was checked (inclusive, possiby non-existent).'; +comment on column checked_range.last_date is 'Date the last scrub of that range *started*.'; + ------------------------------------- -- Inventory of objects with issues ------------------------------------- create table corrupt_object ( id swhid not null, datastore int not null, object bytea not null, first_occurrence timestamptz not null default now() ); comment on table corrupt_object is 'Each row identifies an object that was found to be corrupt'; comment on column corrupt_object.datastore is 'Datastore the corrupt object was found in.'; comment on column corrupt_object.object is 'Corrupt object, as found in the datastore (possibly msgpack-encoded, using the journal''s serializer)'; comment on column corrupt_object.first_occurrence is 'Moment the object was found to be corrupt for the first time'; create table missing_object ( id swhid not null, datastore int not null, first_occurrence timestamptz not null default now() ); comment on table missing_object is 'Each row identifies an object that are missing but referenced by another object (aka "holes")'; comment on column missing_object.datastore is 'Datastore where the hole is.'; comment on column missing_object.first_occurrence is 'Moment the object was found to be corrupt for the first time'; create table missing_object_reference ( missing_id swhid not null, reference_id swhid not null, datastore int not null, first_occurrence timestamptz not null default now() ); comment on table missing_object_reference is 'Each row identifies an object that points to an object that does not exist (aka a "hole")'; comment on column missing_object_reference.missing_id is 'SWHID of the missing object.'; comment on column missing_object_reference.reference_id is 'SWHID of the object referencing the missing object.'; comment on column missing_object_reference.datastore is 'Datastore where the referencing object is.'; comment on column missing_object_reference.first_occurrence is 'Moment the object was found to reference a missing object'; ------------------------------------- -- Issue resolution ------------------------------------- create table object_origin ( object_id swhid not null, origin_url text not null, last_attempt timestamptz -- NULL if not tried yet ); comment on table object_origin is 'Maps objects to origins they might be found in.'; create table fixed_object ( id swhid not null, object bytea not null, method text, recovery_date timestamptz not null default now() ); comment on table fixed_object is 'Each row identifies an object that was found to be corrupt, along with the original version of the object'; comment on column fixed_object.object is 'The recovered object itself, as a msgpack-encoded dict'; comment on column fixed_object.recovery_date is 'Moment the object was recovered.'; comment on column fixed_object.method is 'How the object was recovered. For example: "from_origin", "negative_utc", "capitalized_revision_parent".'; diff --git a/swh/scrubber/sql/60-indexes.sql b/swh/scrubber/sql/60-indexes.sql index 88b5e61..98694cc 100644 --- a/swh/scrubber/sql/60-indexes.sql +++ b/swh/scrubber/sql/60-indexes.sql @@ -1,59 +1,65 @@ ------------------------------------- -- Shared tables ------------------------------------- -- datastore create unique index concurrently datastore_pkey on datastore(id); alter table datastore add primary key using index datastore_pkey; create unique index concurrently datastore_package_class_instance on datastore(package, class, instance); +------------------------------------- +-- Checkpointing/progress tracking +------------------------------------- + +create unique index concurrently checked_range_pkey on checked_range(datastore, range_start, range_end); +alter table checked_range add primary key using index checked_range_pkey; ------------------------------------- -- Inventory of objects with issues ------------------------------------- -- corrupt_object alter table corrupt_object add constraint corrupt_object_datastore_fkey foreign key (datastore) references datastore(id) not valid; alter table corrupt_object validate constraint corrupt_object_datastore_fkey; create unique index concurrently corrupt_object_pkey on corrupt_object(id, datastore); alter table corrupt_object add primary key using index corrupt_object_pkey; -- missing_object alter table missing_object add constraint missing_object_datastore_fkey foreign key (datastore) references datastore(id) not valid; alter table missing_object validate constraint missing_object_datastore_fkey; create unique index concurrently missing_object_pkey on missing_object(id, datastore); alter table missing_object add primary key using index missing_object_pkey; -- missing_object_reference alter table missing_object_reference add constraint missing_object_reference_datastore_fkey foreign key (datastore) references datastore(id) not valid; alter table missing_object_reference validate constraint missing_object_reference_datastore_fkey; create unique index concurrently missing_object_reference_missing_id_reference_id_datastore on missing_object_reference(missing_id, reference_id, datastore); create unique index concurrently missing_object_reference_reference_id_missing_id_datastore on missing_object_reference(reference_id, missing_id, datastore); ------------------------------------- -- Issue resolution ------------------------------------- -- object_origin create unique index concurrently object_origin_pkey on object_origin (object_id, origin_url); create index concurrently object_origin_by_origin on object_origin (origin_url, object_id); -- FIXME: not valid, because corrupt_object(id) is not unique -- alter table object_origin add constraint object_origin_object_fkey foreign key (object_id) references corrupt_object(id) not valid; -- alter table object_origin validate constraint object_origin_object_fkey; -- fixed_object create unique index concurrently fixed_object_pkey on fixed_object(id); alter table fixed_object add primary key using index fixed_object_pkey; diff --git a/swh/scrubber/sql/upgrades/4.sql b/swh/scrubber/sql/upgrades/4.sql new file mode 100644 index 0000000..9dc7e2f --- /dev/null +++ b/swh/scrubber/sql/upgrades/4.sql @@ -0,0 +1,21 @@ +-- SWH Scrubber DB schema upgrade +-- from_version: 3 +-- to_version: 4 +-- description: Add checked_range + + +create table checked_range +( + datastore int not null, + range_start swhid not null, + range_end swhid not null, + last_date timestamptz not null +); + +comment on table checked_range is 'Each row represents a range of objects in a datastore that were fetched, checksumed, and checked at some point in the past.'; +comment on column checked_range.range_start is 'First SWHID of the range that was checked (inclusive, possibly non-existent).'; +comment on column checked_range.range_end is 'Last SWHID of the range that was checked (inclusive, possiby non-existent).'; +comment on column checked_range.last_date is 'Date the last scrub of that range *started*.'; + +create unique index concurrently checked_range_pkey on checked_range(datastore, range_start, range_end); +alter table checked_range add primary key using index checked_range_pkey; diff --git a/swh/scrubber/storage_checker.py b/swh/scrubber/storage_checker.py index 2305fa7..c29a903 100644 --- a/swh/scrubber/storage_checker.py +++ b/swh/scrubber/storage_checker.py @@ -1,280 +1,351 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Reads all objects in a swh-storage instance and recomputes their checksums.""" import collections import contextlib import dataclasses +import datetime import logging -from typing import Iterable, Union +from typing import Iterable, Optional, Tuple, Union from swh.core.statsd import Statsd from swh.journal.serializers import value_to_kafka from swh.model import swhids from swh.model.model import ( Content, Directory, ObjectType, Release, Revision, Snapshot, TargetType, ) from swh.storage import backfill from swh.storage.interface import StorageInterface from swh.storage.postgresql.storage import Storage as PostgresqlStorage from .db import Datastore, ScrubberDb logger = logging.getLogger(__name__) ScrubbableObject = Union[Revision, Release, Snapshot, Directory, Content] @contextlib.contextmanager def storage_db(storage): db = storage.get_db() try: yield db finally: storage.put_db(db) +def _get_inclusive_range_swhids( + inclusive_range_start: Optional[bytes], + exclusive_range_end: Optional[bytes], + object_type: swhids.ObjectType, +) -> Tuple[swhids.CoreSWHID, swhids.CoreSWHID]: + r""" + Given a ``[range_start, range_end)`` right-open interval of id prefixes + and an object type (as returned by :const:`swh.storage.backfill.RANGE_GENERATORS`), + returns a ``[range_start_swhid, range_end_swhid]`` closed interval of SWHIDs + suitable for the scrubber database. + + >>> _get_inclusive_range_swhids(b"\x42", None, swhids.ObjectType.SNAPSHOT) + (CoreSWHID.from_string('swh:1:snp:4200000000000000000000000000000000000000'), CoreSWHID.from_string('swh:1:snp:ffffffffffffffffffffffffffffffffffffffff')) + + >>> _get_inclusive_range_swhids(b"\x00", b"\x12\x34", swhids.ObjectType.REVISION) + (CoreSWHID.from_string('swh:1:rev:0000000000000000000000000000000000000000'), CoreSWHID.from_string('swh:1:rev:1233ffffffffffffffffffffffffffffffffffff')) + + """ # noqa + range_start_swhid = swhids.CoreSWHID( + object_type=object_type, + object_id=(inclusive_range_start or b"").ljust(20, b"\00"), + ) + if exclusive_range_end is None: + inclusive_range_end = b"\xff" * 20 + else: + # convert "1230000000..." to "122fffffff..." + inclusive_range_end = ( + int.from_bytes(exclusive_range_end.ljust(20, b"\x00"), "big") - 1 + ).to_bytes(20, "big") + range_end_swhid = swhids.CoreSWHID( + object_type=object_type, + object_id=inclusive_range_end, + ) + + return (range_start_swhid, range_end_swhid) + + @dataclasses.dataclass class StorageChecker: """Reads a chunk of a swh-storage database, recomputes checksums, and reports errors in a separate database.""" db: ScrubberDb storage: StorageInterface object_type: str """``directory``/``revision``/``release``/``snapshot``""" start_object: str """minimum value of the hexdigest of the object's sha1.""" end_object: str """maximum value of the hexdigest of the object's sha1.""" _datastore = None _statsd = None def datastore_info(self) -> Datastore: """Returns a :class:`Datastore` instance representing the swh-storage instance being checked.""" if self._datastore is None: if isinstance(self.storage, PostgresqlStorage): with storage_db(self.storage) as db: self._datastore = Datastore( package="storage", cls="postgresql", instance=db.conn.dsn, ) else: raise NotImplementedError( f"StorageChecker(storage={self.storage!r}).datastore()" ) return self._datastore def statsd(self) -> Statsd: if self._statsd is None: self._statsd = Statsd( namespace="swh_scrubber", constant_tags={"object_type": self.object_type}, ) return self._statsd def run(self): """Runs on all objects of ``object_type`` and with id between ``start_object`` and ``end_object``. """ if isinstance(self.storage, PostgresqlStorage): with storage_db(self.storage) as db: return self._check_postgresql(db) else: raise NotImplementedError( f"StorageChecker(storage={self.storage!r}).check_storage()" ) def _check_postgresql(self, db): + object_type = getattr(swhids.ObjectType, self.object_type.upper()) for range_start, range_end in backfill.RANGE_GENERATORS[self.object_type]( self.start_object, self.end_object ): + (range_start_swhid, range_end_swhid) = _get_inclusive_range_swhids( + range_start, range_end, object_type + ) + + start_time = datetime.datetime.now(tz=datetime.timezone.utc) + + # Currently, this matches range boundaries exactly, with no regard for + # ranges that contain or are contained by it. + last_check_time = self.db.checked_range_get_last_date( + self.datastore_info(), + range_start_swhid, + range_end_swhid, + ) + + if last_check_time is not None: + # TODO: re-check if 'last_check_time' was a long ago. + logger.debug( + "Skipping processing of %s range %s to %s: already done at %s", + self.object_type, + backfill._format_range_bound(range_start), + backfill._format_range_bound(range_end), + last_check_time, + ) + continue + logger.debug( "Processing %s range %s to %s", self.object_type, backfill._format_range_bound(range_start), backfill._format_range_bound(range_end), ) objects = backfill.fetch( db, self.object_type, start=range_start, end=range_end ) objects = list(objects) with self.statsd().timed( "batch_duration_seconds", tags={"operation": "check_hashes"} ): self.check_object_hashes(objects) with self.statsd().timed( "batch_duration_seconds", tags={"operation": "check_references"} ): self.check_object_references(objects) + self.db.checked_range_upsert( + self.datastore_info(), + range_start_swhid, + range_end_swhid, + start_time, + ) + def check_object_hashes(self, objects: Iterable[ScrubbableObject]): """Recomputes hashes, and reports mismatches.""" count = 0 for object_ in objects: if isinstance(object_, Content): # TODO continue real_id = object_.compute_hash() count += 1 if object_.id != real_id: self.statsd().increment("hash_mismatch_total") self.db.corrupt_object_add( object_.swhid(), self.datastore_info(), value_to_kafka(object_.to_dict()), ) if count: self.statsd().increment("objects_hashed_total", count) def check_object_references(self, objects: Iterable[ScrubbableObject]): """Check all objects references by these objects exist.""" cnt_references = collections.defaultdict(set) dir_references = collections.defaultdict(set) rev_references = collections.defaultdict(set) rel_references = collections.defaultdict(set) snp_references = collections.defaultdict(set) for object_ in objects: swhid = object_.swhid() if isinstance(object_, Content): pass elif isinstance(object_, Directory): for entry in object_.entries: if entry.type == "file": cnt_references[entry.target].add(swhid) elif entry.type == "dir": dir_references[entry.target].add(swhid) elif entry.type == "rev": # dir->rev holes are not considered a problem because they # happen whenever git submodules point to repositories that # were not loaded yet; ignore them pass else: assert False, entry elif isinstance(object_, Revision): dir_references[object_.directory].add(swhid) for parent in object_.parents: rev_references[parent].add(swhid) elif isinstance(object_, Release): if object_.target is None: pass elif object_.target_type == ObjectType.CONTENT: cnt_references[object_.target].add(swhid) elif object_.target_type == ObjectType.DIRECTORY: dir_references[object_.target].add(swhid) elif object_.target_type == ObjectType.REVISION: rev_references[object_.target].add(swhid) elif object_.target_type == ObjectType.RELEASE: rel_references[object_.target].add(swhid) else: assert False, object_ elif isinstance(object_, Snapshot): for branch in object_.branches.values(): if branch is None: pass elif branch.target_type == TargetType.ALIAS: pass elif branch.target_type == TargetType.CONTENT: cnt_references[branch.target].add(swhid) elif branch.target_type == TargetType.DIRECTORY: dir_references[branch.target].add(swhid) elif branch.target_type == TargetType.REVISION: rev_references[branch.target].add(swhid) elif branch.target_type == TargetType.RELEASE: rel_references[branch.target].add(swhid) elif branch.target_type == TargetType.SNAPSHOT: snp_references[branch.target].add(swhid) else: assert False, (str(object_.swhid()), branch) else: assert False, object_.swhid() missing_cnts = set( self.storage.content_missing_per_sha1_git(list(cnt_references)) ) missing_dirs = set(self.storage.directory_missing(list(dir_references))) missing_revs = set(self.storage.revision_missing(list(rev_references))) missing_rels = set(self.storage.release_missing(list(rel_references))) missing_snps = set(self.storage.snapshot_missing(list(snp_references))) self.statsd().increment( "missing_object_total", len(missing_cnts), tags={"target_object_type": "content"}, ) self.statsd().increment( "missing_object_total", len(missing_dirs), tags={"target_object_type": "directory"}, ) self.statsd().increment( "missing_object_total", len(missing_revs), tags={"target_object_type": "revision"}, ) self.statsd().increment( "missing_object_total", len(missing_rels), tags={"target_object_type": "release"}, ) self.statsd().increment( "missing_object_total", len(missing_snps), tags={"target_object_type": "snapshot"}, ) for missing_id in missing_cnts: missing_swhid = swhids.CoreSWHID( object_type=swhids.ObjectType.CONTENT, object_id=missing_id ) self.db.missing_object_add( missing_swhid, cnt_references[missing_id], self.datastore_info() ) for missing_id in missing_dirs: missing_swhid = swhids.CoreSWHID( object_type=swhids.ObjectType.DIRECTORY, object_id=missing_id ) self.db.missing_object_add( missing_swhid, dir_references[missing_id], self.datastore_info() ) for missing_id in missing_revs: missing_swhid = swhids.CoreSWHID( object_type=swhids.ObjectType.REVISION, object_id=missing_id ) self.db.missing_object_add( missing_swhid, rev_references[missing_id], self.datastore_info() ) for missing_id in missing_rels: missing_swhid = swhids.CoreSWHID( object_type=swhids.ObjectType.RELEASE, object_id=missing_id ) self.db.missing_object_add( missing_swhid, rel_references[missing_id], self.datastore_info() ) for missing_id in missing_snps: missing_swhid = swhids.CoreSWHID( object_type=swhids.ObjectType.SNAPSHOT, object_id=missing_id ) self.db.missing_object_add( missing_swhid, snp_references[missing_id], self.datastore_info() ) diff --git a/swh/scrubber/tests/test_db.py b/swh/scrubber/tests/test_db.py new file mode 100644 index 0000000..2406575 --- /dev/null +++ b/swh/scrubber/tests/test_db.py @@ -0,0 +1,58 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime + +from swh.model import swhids +from swh.scrubber.db import Datastore, ScrubberDb + +DATASTORE = Datastore(package="storage", cls="postgresql", instance="service=swh-test") +SNP_SWHID1 = swhids.CoreSWHID.from_string( + "swh:1:snp:5000000000000000000000000000000000000000" +) +SNP_SWHID2 = swhids.CoreSWHID.from_string( + "swh:1:snp:e000000000000000000000000000000000000000" +) +DATE = datetime.datetime(2022, 10, 4, 12, 1, 23, tzinfo=datetime.timezone.utc) + + +def test_checked_range_insert(scrubber_db: ScrubberDb): + scrubber_db.checked_range_upsert(DATASTORE, SNP_SWHID1, SNP_SWHID2, DATE) + + assert list(scrubber_db.checked_range_iter(DATASTORE)) == [ + (SNP_SWHID1, SNP_SWHID2, DATE) + ] + + +def test_checked_range_update(scrubber_db: ScrubberDb): + scrubber_db.checked_range_upsert(DATASTORE, SNP_SWHID1, SNP_SWHID2, DATE) + + date2 = DATE + datetime.timedelta(days=1) + scrubber_db.checked_range_upsert(DATASTORE, SNP_SWHID1, SNP_SWHID2, date2) + + assert list(scrubber_db.checked_range_iter(DATASTORE)) == [ + (SNP_SWHID1, SNP_SWHID2, date2) + ] + + date3 = DATE + datetime.timedelta(days=-1) + scrubber_db.checked_range_upsert(DATASTORE, SNP_SWHID1, SNP_SWHID2, date3) + + assert list(scrubber_db.checked_range_iter(DATASTORE)) == [ + (SNP_SWHID1, SNP_SWHID2, date2) # newest date wins + ] + + +def test_checked_range_get(scrubber_db: ScrubberDb): + assert ( + scrubber_db.checked_range_get_last_date(DATASTORE, SNP_SWHID1, SNP_SWHID2) + is None + ) + + scrubber_db.checked_range_upsert(DATASTORE, SNP_SWHID1, SNP_SWHID2, DATE) + + assert ( + scrubber_db.checked_range_get_last_date(DATASTORE, SNP_SWHID1, SNP_SWHID2) + == DATE + ) diff --git a/swh/scrubber/tests/test_storage_postgresql.py b/swh/scrubber/tests/test_storage_postgresql.py index efd38d7..8e75e94 100644 --- a/swh/scrubber/tests/test_storage_postgresql.py +++ b/swh/scrubber/tests/test_storage_postgresql.py @@ -1,291 +1,447 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import unittest.mock import attr import pytest from swh.journal.serializers import kafka_to_value from swh.model import model, swhids from swh.model.tests import swh_model_data -from swh.scrubber.storage_checker import StorageChecker +from swh.scrubber.db import Datastore +from swh.scrubber.storage_checker import StorageChecker, storage_db from swh.storage.backfill import byte_ranges CONTENT1 = model.Content.from_data(b"foo") DIRECTORY1 = model.Directory( entries=( model.DirectoryEntry( target=CONTENT1.sha1_git, type="file", name=b"file1", perms=0o1 ), ) ) DIRECTORY2 = model.Directory( entries=( model.DirectoryEntry( target=CONTENT1.sha1_git, type="file", name=b"file2", perms=0o1 ), model.DirectoryEntry(target=DIRECTORY1.id, type="dir", name=b"dir1", perms=0o1), model.DirectoryEntry(target=b"\x00" * 20, type="rev", name=b"rev1", perms=0o1), ) ) REVISION1 = model.Revision( message=b"blah", directory=DIRECTORY2.id, author=None, committer=None, date=None, committer_date=None, type=model.RevisionType.GIT, synthetic=True, ) RELEASE1 = model.Release( message=b"blih", name=b"bluh", target_type=model.ObjectType.REVISION, target=REVISION1.id, synthetic=True, ) SNAPSHOT1 = model.Snapshot( branches={ b"rel1": model.SnapshotBranch( target_type=model.TargetType.RELEASE, target=RELEASE1.id ), } ) +@pytest.fixture +def datastore(swh_storage): + with storage_db(swh_storage) as db: + return Datastore( + package="storage", + cls="postgresql", + instance=db.conn.dsn, + ) + + # decorator to make swh.storage.backfill use fewer ranges, so tests run faster patch_byte_ranges = unittest.mock.patch( "swh.storage.backfill.byte_ranges", lambda numbits, start, end: byte_ranges(numbits // 8, start, end), ) +def _short_ranges(type_): + return [ + ( + f"swh:1:{type_}:0000000000000000000000000000000000000000", + f"swh:1:{type_}:1fffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:2000000000000000000000000000000000000000", + f"swh:1:{type_}:3fffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:4000000000000000000000000000000000000000", + f"swh:1:{type_}:5fffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:6000000000000000000000000000000000000000", + f"swh:1:{type_}:7fffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:8000000000000000000000000000000000000000", + f"swh:1:{type_}:9fffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:a000000000000000000000000000000000000000", + f"swh:1:{type_}:bfffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:c000000000000000000000000000000000000000", + f"swh:1:{type_}:dfffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:e000000000000000000000000000000000000000", + f"swh:1:{type_}:ffffffffffffffffffffffffffffffffffffffff", + ), + ] + + +def _long_ranges(type_): + return [ + ( + f"swh:1:{type_}:0000000000000000000000000000000000000000", + f"swh:1:{type_}:3fffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:4000000000000000000000000000000000000000", + f"swh:1:{type_}:7fffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:8000000000000000000000000000000000000000", + f"swh:1:{type_}:bfffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:c000000000000000000000000000000000000000", + f"swh:1:{type_}:ffffffffffffffffffffffffffffffffffffffff", + ), + ] + + +EXPECTED_RANGES = [ + *_short_ranges("dir"), + *_long_ranges("rel"), + *_short_ranges("rev"), + *_long_ranges("snp"), +] + + +def assert_checked_ranges( + scrubber_db, datastore, expected_ranges, before_date=None, after_date=None +): + if before_date is not None: + assert all( + before_date < date < after_date + for (_, _, date) in scrubber_db.checked_range_iter(datastore) + ) + + checked_ranges = [ + (str(start), str(end)) + for (start, end, date) in scrubber_db.checked_range_iter(datastore) + ] + checked_ranges.sort(key=str) + + assert checked_ranges == expected_ranges + + @patch_byte_ranges -def test_no_corruption(scrubber_db, swh_storage): +def test_no_corruption(scrubber_db, datastore, swh_storage): swh_storage.directory_add(swh_model_data.DIRECTORIES) swh_storage.revision_add(swh_model_data.REVISIONS) swh_storage.release_add(swh_model_data.RELEASES) swh_storage.snapshot_add(swh_model_data.SNAPSHOTS) + before_date = datetime.datetime.now(tz=datetime.timezone.utc) for object_type in ("snapshot", "release", "revision", "directory"): StorageChecker( db=scrubber_db, storage=swh_storage, object_type=object_type, start_object="00" * 20, end_object="ff" * 20, ).run() + after_date = datetime.datetime.now(tz=datetime.timezone.utc) assert list(scrubber_db.corrupt_object_iter()) == [] + assert_checked_ranges( + scrubber_db, datastore, EXPECTED_RANGES, before_date, after_date + ) + @pytest.mark.parametrize("corrupt_idx", range(len(swh_model_data.SNAPSHOTS))) @patch_byte_ranges -def test_corrupt_snapshot(scrubber_db, swh_storage, corrupt_idx): +def test_corrupt_snapshot(scrubber_db, datastore, swh_storage, corrupt_idx): storage_dsn = swh_storage.get_db().conn.dsn snapshots = list(swh_model_data.SNAPSHOTS) snapshots[corrupt_idx] = attr.evolve(snapshots[corrupt_idx], id=b"\x00" * 20) swh_storage.snapshot_add(snapshots) before_date = datetime.datetime.now(tz=datetime.timezone.utc) for object_type in ("snapshot", "release", "revision", "directory"): StorageChecker( db=scrubber_db, storage=swh_storage, object_type=object_type, start_object="00" * 20, end_object="ff" * 20, ).run() after_date = datetime.datetime.now(tz=datetime.timezone.utc) corrupt_objects = list(scrubber_db.corrupt_object_iter()) assert len(corrupt_objects) == 1 assert corrupt_objects[0].id == swhids.CoreSWHID.from_string( "swh:1:snp:0000000000000000000000000000000000000000" ) assert corrupt_objects[0].datastore.package == "storage" assert corrupt_objects[0].datastore.cls == "postgresql" assert corrupt_objects[0].datastore.instance.startswith(storage_dsn) assert ( before_date - datetime.timedelta(seconds=5) <= corrupt_objects[0].first_occurrence <= after_date + datetime.timedelta(seconds=5) ) assert ( kafka_to_value(corrupt_objects[0].object_) == snapshots[corrupt_idx].to_dict() ) + assert_checked_ranges( + scrubber_db, datastore, EXPECTED_RANGES, before_date, after_date + ) + @patch_byte_ranges -def test_corrupt_snapshots_same_batch(scrubber_db, swh_storage): +def test_corrupt_snapshots_same_batch(scrubber_db, datastore, swh_storage): snapshots = list(swh_model_data.SNAPSHOTS) for i in (0, 1): snapshots[i] = attr.evolve(snapshots[i], id=bytes([i]) * 20) swh_storage.snapshot_add(snapshots) StorageChecker( db=scrubber_db, storage=swh_storage, object_type="snapshot", start_object="00" * 20, end_object="ff" * 20, ).run() corrupt_objects = list(scrubber_db.corrupt_object_iter()) assert len(corrupt_objects) == 2 assert {co.id for co in corrupt_objects} == { swhids.CoreSWHID.from_string(swhid) for swhid in [ "swh:1:snp:0000000000000000000000000000000000000000", "swh:1:snp:0101010101010101010101010101010101010101", ] } + assert_checked_ranges(scrubber_db, datastore, _long_ranges("snp")) + @patch_byte_ranges -def test_corrupt_snapshots_different_batches(scrubber_db, swh_storage): +def test_corrupt_snapshots_different_batches(scrubber_db, datastore, swh_storage): snapshots = list(swh_model_data.SNAPSHOTS) for i in (0, 1): snapshots[i] = attr.evolve(snapshots[i], id=bytes([i * 255]) * 20) swh_storage.snapshot_add(snapshots) StorageChecker( db=scrubber_db, storage=swh_storage, object_type="snapshot", start_object="00" * 20, end_object="87" * 20, ).run() corrupt_objects = list(scrubber_db.corrupt_object_iter()) assert len(corrupt_objects) == 1 # Simulates resuming from a different process, with an empty lru_cache scrubber_db.datastore_get_or_add.cache_clear() StorageChecker( db=scrubber_db, storage=swh_storage, object_type="snapshot", start_object="88" * 20, end_object="ff" * 20, ).run() corrupt_objects = list(scrubber_db.corrupt_object_iter()) assert len(corrupt_objects) == 2 assert {co.id for co in corrupt_objects} == { swhids.CoreSWHID.from_string(swhid) for swhid in [ "swh:1:snp:0000000000000000000000000000000000000000", "swh:1:snp:ffffffffffffffffffffffffffffffffffffffff", ] } + assert_checked_ranges(scrubber_db, datastore, _long_ranges("snp")) + @patch_byte_ranges -def test_no_hole(scrubber_db, swh_storage): +def test_no_recheck(scrubber_db, datastore, swh_storage): + """ + Tests that objects that were already checked are not checked again on + the next run. + """ + # Corrupt two snapshots + snapshots = list(swh_model_data.SNAPSHOTS) + for i in (0, 1): + snapshots[i] = attr.evolve(snapshots[i], id=bytes([i]) * 20) + swh_storage.snapshot_add(snapshots) + + # Mark ranges as already checked + now = datetime.datetime.now(tz=datetime.timezone.utc) + for (range_start, range_end) in EXPECTED_RANGES: + scrubber_db.checked_range_upsert(datastore, range_start, range_end, now) + + StorageChecker( + db=scrubber_db, + storage=swh_storage, + object_type="snapshot", + start_object="00" * 20, + end_object="ff" * 20, + ).run() + + corrupt_objects = list(scrubber_db.corrupt_object_iter()) + assert ( + corrupt_objects == [] + ), "Detected corrupt objects in ranges that should have been skipped." + + # Make sure the DB was not changed (in particular, that timestamps were not bumped) + ranges = [ + (str(range_start), str(range_end), date) + for (range_start, range_end, date) in scrubber_db.checked_range_iter(datastore) + ] + ranges.sort(key=str) + assert ranges == [ + (range_start, range_end, now) for (range_start, range_end) in EXPECTED_RANGES + ] + + +@patch_byte_ranges +def test_no_hole(scrubber_db, datastore, swh_storage): swh_storage.content_add([CONTENT1]) swh_storage.directory_add([DIRECTORY1, DIRECTORY2]) swh_storage.revision_add([REVISION1]) swh_storage.release_add([RELEASE1]) swh_storage.snapshot_add([SNAPSHOT1]) for object_type in ("snapshot", "release", "revision", "directory"): StorageChecker( db=scrubber_db, storage=swh_storage, object_type=object_type, start_object="00" * 20, end_object="ff" * 20, ).run() assert list(scrubber_db.missing_object_iter()) == [] + assert_checked_ranges(scrubber_db, datastore, EXPECTED_RANGES) + @pytest.mark.parametrize( "missing_object", ["content1", "directory1", "directory2", "revision1", "release1"], ) @patch_byte_ranges -def test_one_hole(scrubber_db, swh_storage, missing_object): +def test_one_hole(scrubber_db, datastore, swh_storage, missing_object): if missing_object == "content1": missing_swhid = CONTENT1.swhid() reference_swhids = [DIRECTORY1.swhid(), DIRECTORY2.swhid()] else: swh_storage.content_add([CONTENT1]) if missing_object == "directory1": missing_swhid = DIRECTORY1.swhid() reference_swhids = [DIRECTORY2.swhid()] else: swh_storage.directory_add([DIRECTORY1]) if missing_object == "directory2": missing_swhid = DIRECTORY2.swhid() reference_swhids = [REVISION1.swhid()] else: swh_storage.directory_add([DIRECTORY2]) if missing_object == "revision1": missing_swhid = REVISION1.swhid() reference_swhids = [RELEASE1.swhid()] else: swh_storage.revision_add([REVISION1]) if missing_object == "release1": missing_swhid = RELEASE1.swhid() reference_swhids = [SNAPSHOT1.swhid()] else: swh_storage.release_add([RELEASE1]) swh_storage.snapshot_add([SNAPSHOT1]) for object_type in ("snapshot", "release", "revision", "directory"): StorageChecker( db=scrubber_db, storage=swh_storage, object_type=object_type, start_object="00" * 20, end_object="ff" * 20, ).run() assert [mo.id for mo in scrubber_db.missing_object_iter()] == [missing_swhid] assert { (mor.missing_id, mor.reference_id) for mor in scrubber_db.missing_object_reference_iter(missing_swhid) } == {(missing_swhid, reference_swhid) for reference_swhid in reference_swhids} + assert_checked_ranges(scrubber_db, datastore, EXPECTED_RANGES) + @patch_byte_ranges -def test_two_holes(scrubber_db, swh_storage): +def test_two_holes(scrubber_db, datastore, swh_storage): # missing content and revision swh_storage.directory_add([DIRECTORY1, DIRECTORY2]) swh_storage.release_add([RELEASE1]) swh_storage.snapshot_add([SNAPSHOT1]) for object_type in ("snapshot", "release", "revision", "directory"): StorageChecker( db=scrubber_db, storage=swh_storage, object_type=object_type, start_object="00" * 20, end_object="ff" * 20, ).run() assert {mo.id for mo in scrubber_db.missing_object_iter()} == { CONTENT1.swhid(), REVISION1.swhid(), } assert { mor.reference_id for mor in scrubber_db.missing_object_reference_iter(CONTENT1.swhid()) } == {DIRECTORY1.swhid(), DIRECTORY2.swhid()} assert { mor.reference_id for mor in scrubber_db.missing_object_reference_iter(REVISION1.swhid()) } == {RELEASE1.swhid()} + + assert_checked_ranges(scrubber_db, datastore, EXPECTED_RANGES)