diff --git a/README.rst b/README.rst new file mode 120000 --- /dev/null +++ b/README.rst @@ -0,0 +1 @@ +docs/README.rst \ No newline at end of file diff --git a/conftest.py b/conftest.py new file mode 100644 --- /dev/null +++ b/conftest.py @@ -0,0 +1 @@ +pytest_plugins = ["swh.storage.pytest_plugin", "swh.core.db.pytest_plugin"] diff --git a/docs/README.rst b/docs/README.rst --- a/docs/README.rst +++ b/docs/README.rst @@ -3,3 +3,37 @@ Tools to periodically checks data integrity in swh-storage and swh-objstorage, reports errors, and (try to) fix them. + +This is a work in progress; some of the components described below do not +exist yet (cassandra storage checker, objstorage checker, recovery, and reinjection) + +The Scrubber package is made of the following parts: + + +Checking +-------- + +Highly parallel processes continuously read objects from a data store, +compute checksums, and write any failure in a database, along with the data of +the corrupt object. + +There is one "checker" for each datastore package: storage (postgresql and cassandra), +journal (kafka), and objstorage. + + +Recovery +-------- + +Then, from time to time, jobs go through the list of known corrupt objects, +and try to recover the original objects, through various means: + +* Brute-forcing variations until they match their checksum +* Recovering from another data store +* As a last resort, recovering from known origins, if any + + +Reinjection +----------- + +Finally, when an original object is recovered, it is reinjected in the original +data store, replacing the corrupt one. diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,2 +1,5 @@ # Add here internal Software Heritage dependencies, one per line. swh.core[http] >= 0.3 # [http] is required by swh.core.pytest_plugin +swh.model >= 5.0.0 +swh.storage > 1.0.0 +swh.journal >= 0.9.0 diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1 +1,3 @@ pytest < 7.0.0 # v7.0.0 removed _pytest.tmpdir.TempdirFactory, which is used by some of the pytest plugins we use +pytest-mock +yaml diff --git a/swh/scrubber/__init__.py b/swh/scrubber/__init__.py --- a/swh/scrubber/__init__.py +++ b/swh/scrubber/__init__.py @@ -0,0 +1,23 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .db import ScrubberDb + + +def get_scrubber_db(cls: str, **kwargs) -> ScrubberDb: + if cls != "local": + raise ValueError(f"Unknown scrubber db class '{cls}', use 'local' instead.") + + from .db import ScrubberDb + + return ScrubberDb.connect(kwargs.pop("db"), **kwargs) + + +get_datastore = get_scrubber_db diff --git a/swh/scrubber/bar.py b/swh/scrubber/bar.py deleted file mode 100644 --- a/swh/scrubber/bar.py +++ /dev/null @@ -1,4 +0,0 @@ -# Copyright (C) 2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information diff --git a/swh/scrubber/check_storage.py b/swh/scrubber/check_storage.py new file mode 100644 --- /dev/null +++ b/swh/scrubber/check_storage.py @@ -0,0 +1,91 @@ +# Copyright (C) 2021-2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Reads all objects in a swh-storage instance and recomputes their checksums.""" + +import contextlib +import dataclasses +import logging +from typing import Iterable, Union + +from swh.journal.serializers import value_to_kafka +from swh.model.model import Directory, Release, Revision, Snapshot +from swh.storage import backfill +from swh.storage.interface import StorageInterface +from swh.storage.postgresql.storage import Storage as PostgresqlStorage + +from .db import Datastore, ScrubberDb + +logger = logging.getLogger(__name__) + +ScrubbableObject = Union[Revision, Release, Snapshot, Directory] + + +@contextlib.contextmanager +def storage_db(storage): + db = storage.get_db() + try: + yield db + finally: + storage.put_db(db) + + +@dataclasses.dataclass +class StorageChecker: + db: ScrubberDb + storage: StorageInterface + object_type: str + start_object: str + end_object: str + + _datastore = None + + def datastore_info(self) -> Datastore: + if self._datastore is None: + if isinstance(self.storage, PostgresqlStorage): + with storage_db(self.storage) as db: + self._datastore = Datastore( + package="storage", cls="postgresql", instance=db.conn.dsn, + ) + else: + raise NotImplementedError( + f"StorageChecker(storage={self.storage!r}).datastore()" + ) + return self._datastore + + def check_storage(self): + if isinstance(self.storage, PostgresqlStorage): + with storage_db(self.storage) as db: + return self._check_postgresql(db) + else: + raise NotImplementedError( + f"StorageChecker(storage={self.storage!r}).check_storage()" + ) + + def _check_postgresql(self, db): + for range_start, range_end in backfill.RANGE_GENERATORS[self.object_type]( + self.start_object, self.end_object + ): + logger.info( + "Processing %s range %s to %s", + self.object_type, + backfill._format_range_bound(range_start), + backfill._format_range_bound(range_end), + ) + + objects = backfill.fetch( + db, self.object_type, start=range_start, end=range_end + ) + objects = list(objects) + + self.process_objects(objects) + + def process_objects(self, objects: Iterable[ScrubbableObject]): + for object_ in objects: + real_id = object_.compute_hash() + if object_.id != real_id: + self.db.corrupt_object_add( + self.datastore_info(), object_, value_to_kafka(object_.to_dict()) + ) diff --git a/swh/scrubber/cli.py b/swh/scrubber/cli.py --- a/swh/scrubber/cli.py +++ b/swh/scrubber/cli.py @@ -1,3 +1,10 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import os + import click from swh.core.cli import CONTEXT_SETTINGS @@ -5,7 +12,82 @@ @swh_cli_group.group(name="scrubber", context_settings=CONTEXT_SETTINGS) +@click.option( + "--config-file", + "-C", + default=None, + type=click.Path(exists=True, dir_okay=False,), + help="Configuration file.", +) +@click.pass_context +def scrubber_cli_group(ctx, config_file): + """main command group of the datastore scrubber + """ + from swh.core import config + + from . import get_scrubber_db + + if not config_file: + config_file = os.environ.get("SWH_CONFIG_FILENAME") + + if config_file: + if not os.path.exists(config_file): + raise ValueError("%s does not exist" % config_file) + conf = config.read(config_file) + else: + conf = {} + + if "scrubber_db" not in conf: + ctx.fail("You must have a scrubber_db configured in your config file.") + + ctx.ensure_object(dict) + ctx.obj["config"] = conf + ctx.obj["db"] = get_scrubber_db(**conf["scrubber_db"]) + + +@scrubber_cli_group.group(name="check") @click.pass_context -def scrubber_cli_group(ctx): - """main command of the datastore scrubber +def scrubber_check_cli_group(ctx): + """group of commands which read from data stores and report errors. """ + pass + + +@scrubber_check_cli_group.command(name="storage") +@click.option( + "--object-type", + type=click.Choice( + # use a hardcoded list to prevent having to load the + # replay module at cli loading time + [ + "snapshot", + "revision", + "release", + "directory", + # TODO: + # "raw_extrinsic_metadata", + # "extid", + ] + ), +) +@click.option("--start-object", default="0" * 40) +@click.option("--end-object", default="f" * 40) +@click.pass_context +def scrubber_check_storage(ctx, object_type: str, start_object: str, end_object: str): + conf = ctx.obj["config"] + if "storage" not in conf: + ctx.fail("You must have a storage configured in your config file.") + + from swh.storage import get_storage + + from .check_storage import StorageChecker + + checker = StorageChecker( + db=ctx.obj["db"], + storage=get_storage(**conf["storage"]), + object_type=object_type, + start_object=start_object, + end_object=end_object, + ) + + checker.check_storage() diff --git a/swh/scrubber/db.py b/swh/scrubber/db.py new file mode 100644 --- /dev/null +++ b/swh/scrubber/db.py @@ -0,0 +1,89 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +import dataclasses +import datetime +import functools +from typing import Iterator, Union + +from swh.core.db import BaseDb +from swh.model.model import Content, Directory, Release, Revision, Snapshot +from swh.model.swhids import CoreSWHID + + +@dataclasses.dataclass(frozen=True) +class Datastore: + package: str + cls: str + instance: str + + +@dataclasses.dataclass(frozen=True) +class CorruptObject: + id: CoreSWHID + datastore: Datastore + first_occurrence: datetime.datetime + object_: bytes + + +class ScrubberDb(BaseDb): + current_version = 1 + + @functools.lru_cache(1000) + def datastore_get_or_add(self, datastore: Datastore) -> int: + """Creates a datastore if it does not exist, and returns its id.""" + cur = self.cursor() + cur.execute( + """ + INSERT INTO datastore (package, class, instance) + VALUES (%s, %s, %s) + ON CONFLICT DO NOTHING + RETURNING id + """, + (datastore.package, datastore.cls, datastore.instance), + ) + (id_,) = cur.fetchone() + return id_ + + def corrupt_object_add( + self, + datastore: Datastore, + object_: Union[Content, Directory, Revision, Release, Snapshot], + serialized_object: bytes, + ) -> None: + datastore_id = self.datastore_get_or_add(datastore) + cur = self.cursor() + cur.execute( + """ + INSERT INTO corrupt_object (id, datastore, object) + VALUES (%s, %s, %s) + ON CONFLICT DO NOTHING + """, + (str(object_.swhid()), datastore_id, serialized_object), + ) + + def corrupt_object_iter(self) -> Iterator[CorruptObject]: + cur = self.cursor() + cur.execute( + """ + SELECT + co.id, co.first_occurrence, co.object, + ds.package, ds.class, ds.instance + FROM corrupt_object AS co + INNER JOIN datastore AS ds ON (ds.id=co.datastore) + """ + ) + + for row in cur: + (id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row + yield CorruptObject( + id=CoreSWHID.from_string(id), + first_occurrence=first_occurrence, + object_=object_, + datastore=Datastore( + package=ds_package, cls=ds_class, instance=ds_instance + ), + ) diff --git a/swh/scrubber/sql/20-enums.sql b/swh/scrubber/sql/20-enums.sql new file mode 100644 --- /dev/null +++ b/swh/scrubber/sql/20-enums.sql @@ -0,0 +1 @@ +create type datastore_type as enum ('storage', 'journal', 'objstorage'); diff --git a/swh/scrubber/sql/30-schema.sql b/swh/scrubber/sql/30-schema.sql new file mode 100644 --- /dev/null +++ b/swh/scrubber/sql/30-schema.sql @@ -0,0 +1,28 @@ +create domain swhid as text check (value ~ '^swh:[0-9]+:.*'); + +create table datastore +( + id bigserial not null, + package datastore_type not null, + class text, + instance text +); + +comment on table datastore is 'Each row identifies a data store being scrubbed'; +comment on column datastore.id is 'Internal identifier of the datastore'; +comment on column datastore.package is 'Name of the component using this datastore (storage/journal/objstorage)'; +comment on column datastore.class is 'For datastores with multiple backends, name of the backend (postgresql/cassandra for storage, kafka for journal, pathslicer/azure/winery/... for objstorage)'; +comment on column datastore.instance is 'Human-readable way to uniquely identify the datastore; eg. its URL or DSN.'; + +create table corrupt_object +( + id swhid not null, + datastore int not null, + first_occurrence timestamptz not null default now(), + object bytea not null +); + +comment on table corrupt_object is 'Each row identifies an object that was found to be corrupt'; +comment on column corrupt_object.datastore is 'Datastore the corrupt object was found in.'; +comment on column corrupt_object.first_occurrence is 'Moment the object was found to be corrupt for the first time'; +comment on column corrupt_object.object is 'Corrupt object, as found in the datastore (possibly msgpack-encoded, using the journal''s serializer)'; diff --git a/swh/scrubber/sql/60-indexes.sql b/swh/scrubber/sql/60-indexes.sql new file mode 100644 --- /dev/null +++ b/swh/scrubber/sql/60-indexes.sql @@ -0,0 +1,13 @@ +-- datastore + +create unique index concurrently datastore_pkey on datastore(id); +alter table datastore add primary key using index datastore_pkey; + +create unique index concurrently datastore_package_class_instance on datastore(package, class, instance); + +-- corrupt_object + +alter table corrupt_object add constraint corrupt_object_datastore_fkey foreign key (datastore) references datastore(id) not valid; +alter table corrupt_object validate constraint corrupt_object_datastore_fkey; + +create unique index corrupt_object_pkey on corrupt_object(datastore, id); diff --git a/swh/scrubber/tests/conftest.py b/swh/scrubber/tests/conftest.py new file mode 100644 --- /dev/null +++ b/swh/scrubber/tests/conftest.py @@ -0,0 +1,28 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from functools import partial + +import pytest +from pytest_postgresql import factories + +from swh.core.db.pytest_plugin import initialize_database_for_module, postgresql_fact +from swh.scrubber.db import ScrubberDb + +scrubber_postgresql_proc = factories.postgresql_proc( + dbname="scrubber", + load=[partial(initialize_database_for_module, modname="scrubber", version=1)], +) + +postgresql_scrubber = postgresql_fact("scrubber_postgresql_proc") + + +@pytest.fixture +def scrubber_db(postgresql_scrubber): + db = ScrubberDb(postgresql_scrubber) + with db.conn.cursor() as cur: + cur.execute("TRUNCATE TABLE corrupt_object") + cur.execute("TRUNCATE TABLE datastore CASCADE") + yield db diff --git a/swh/scrubber/tests/test_cli.py b/swh/scrubber/tests/test_cli.py new file mode 100644 --- /dev/null +++ b/swh/scrubber/tests/test_cli.py @@ -0,0 +1,64 @@ +# Copyright (C) 2020-2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import copy +import tempfile +from unittest.mock import MagicMock + +from click.testing import CliRunner +import yaml + +from swh.scrubber.check_storage import storage_db +from swh.scrubber.cli import scrubber_cli_group + +CLI_CONFIG = { + "storage": { + "cls": "postgresql", + "db": "", + "objstorage": {"cls": "memory"}, + }, + "scrubber_db": {"cls": "local", "db": ""}, +} + + +def invoke(swh_storage, scrubber_db, args): + runner = CliRunner() + + config = copy.deepcopy(CLI_CONFIG) + with storage_db(swh_storage) as db: + config["storage"]["db"] = db.conn.dsn + + config["scrubber_db"]["db"] = scrubber_db.conn.dsn + + with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: + yaml.dump(config, config_fd) + config_fd.seek(0) + args = ["-C" + config_fd.name] + list(args) + result = runner.invoke(scrubber_cli_group, args, catch_exceptions=False) + return result + + +def test_check_storage(swh_storage, mocker, scrubber_db): + storage_checker = MagicMock() + StorageChecker = mocker.patch( + "swh.scrubber.check_storage.StorageChecker", return_value=storage_checker + ) + get_scrubber_db = mocker.patch( + "swh.scrubber.get_scrubber_db", return_value=scrubber_db + ) + result = invoke( + swh_storage, scrubber_db, ["check", "storage", "--object-type=snapshot"] + ) + assert result.exit_code == 0, result.output + assert result.output == "" + + get_scrubber_db.assert_called_once_with(cls="local", db=scrubber_db.conn.dsn) + StorageChecker.assert_called_once_with( + db=scrubber_db, + storage=StorageChecker.mock_calls[0][2]["storage"], + object_type="snapshot", + start_object="0" * 40, + end_object="f" * 40, + ) diff --git a/swh/scrubber/tests/test_nothing.py b/swh/scrubber/tests/test_nothing.py deleted file mode 100644 --- a/swh/scrubber/tests/test_nothing.py +++ /dev/null @@ -1,3 +0,0 @@ -def test_nothing(): - # Placeholder; remove this when we add actual tests - pass diff --git a/swh/scrubber/tests/test_storage_postgresql.py b/swh/scrubber/tests/test_storage_postgresql.py new file mode 100644 --- /dev/null +++ b/swh/scrubber/tests/test_storage_postgresql.py @@ -0,0 +1,79 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime +import unittest.mock + +import attr +import pytest + +from swh.journal.serializers import kafka_to_value +from swh.model import swhids +from swh.model.tests import swh_model_data +from swh.scrubber.check_storage import StorageChecker +from swh.storage.backfill import byte_ranges + +# decorator to make swh.storage.backfill use less ranges, so tests run faster +patch_byte_ranges = unittest.mock.patch( + "swh.storage.backfill.byte_ranges", + lambda numbits, start, end: byte_ranges(numbits // 8, start, end), +) + + +@patch_byte_ranges +def test_no_corruption(scrubber_db, swh_storage): + swh_storage.directory_add(swh_model_data.DIRECTORIES) + swh_storage.revision_add(swh_model_data.REVISIONS) + swh_storage.release_add(swh_model_data.RELEASES) + swh_storage.snapshot_add(swh_model_data.SNAPSHOTS) + + for object_type in ("snapshot", "release", "revision", "directory"): + StorageChecker( + db=scrubber_db, + storage=swh_storage, + object_type=object_type, + start_object="00" * 20, + end_object="ff" * 20, + ).check_storage() + + assert list(scrubber_db.corrupt_object_iter()) == [] + + +@pytest.mark.parametrize("corrupt_idx", range(len(swh_model_data.SNAPSHOTS))) +@patch_byte_ranges +def test_corrupt_snapshot(scrubber_db, swh_storage, corrupt_idx): + snapshots = list(swh_model_data.SNAPSHOTS) + snapshots[corrupt_idx] = attr.evolve(snapshots[corrupt_idx], id=b"\x00" * 20) + swh_storage.snapshot_add(snapshots) + + before_date = datetime.datetime.now(tz=datetime.timezone.utc) + for object_type in ("snapshot", "release", "revision", "directory"): + StorageChecker( + db=scrubber_db, + storage=swh_storage, + object_type=object_type, + start_object="00" * 20, + end_object="ff" * 20, + ).check_storage() + after_date = datetime.datetime.now(tz=datetime.timezone.utc) + + corrupt_objects = list(scrubber_db.corrupt_object_iter()) + assert len(corrupt_objects) == 1 + assert corrupt_objects[0].id == swhids.CoreSWHID.from_string( + "swh:1:snp:0000000000000000000000000000000000000000" + ) + assert corrupt_objects[0].datastore.package == "storage" + assert corrupt_objects[0].datastore.cls == "postgresql" + assert corrupt_objects[0].datastore.instance.startswith( + "user=postgres password=xxx dbname=storage host=" + ) + assert ( + before_date - datetime.timedelta(seconds=5) + <= corrupt_objects[0].first_occurrence + <= after_date + datetime.timedelta(seconds=5) + ) + assert ( + kafka_to_value(corrupt_objects[0].object_) == snapshots[corrupt_idx].to_dict() + )