Page MenuHomeSoftware Heritage

D7360.diff
No OneTemporary

D7360.diff

diff --git a/README.rst b/README.rst
new file mode 120000
--- /dev/null
+++ b/README.rst
@@ -0,0 +1 @@
+docs/README.rst
\ No newline at end of file
diff --git a/conftest.py b/conftest.py
new file mode 100644
--- /dev/null
+++ b/conftest.py
@@ -0,0 +1 @@
+pytest_plugins = ["swh.storage.pytest_plugin", "swh.core.db.pytest_plugin"]
diff --git a/docs/README.rst b/docs/README.rst
--- a/docs/README.rst
+++ b/docs/README.rst
@@ -3,3 +3,37 @@
Tools to periodically checks data integrity in swh-storage and swh-objstorage,
reports errors, and (try to) fix them.
+
+This is a work in progress; some of the components described below do not
+exist yet (cassandra storage checker, objstorage checker, recovery, and reinjection)
+
+The Scrubber package is made of the following parts:
+
+
+Checking
+--------
+
+Highly parallel processes continuously read objects from a data store,
+compute checksums, and write any failure in a database, along with the data of
+the corrupt object.
+
+There is one "checker" for each datastore package: storage (postgresql and cassandra),
+journal (kafka), and objstorage.
+
+
+Recovery
+--------
+
+Then, from time to time, jobs go through the list of known corrupt objects,
+and try to recover the original objects, through various means:
+
+* Brute-forcing variations until they match their checksum
+* Recovering from another data store
+* As a last resort, recovering from known origins, if any
+
+
+Reinjection
+-----------
+
+Finally, when an original object is recovered, it is reinjected in the original
+data store, replacing the corrupt one.
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,2 +1,5 @@
# Add here internal Software Heritage dependencies, one per line.
swh.core[http] >= 0.3 # [http] is required by swh.core.pytest_plugin
+swh.model >= 5.0.0
+swh.storage >= 1.1.0
+swh.journal >= 0.9.0
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1 +1,4 @@
pytest < 7.0.0 # v7.0.0 removed _pytest.tmpdir.TempdirFactory, which is used by some of the pytest plugins we use
+pytest-mock
+pyyaml
+types-pyyaml
diff --git a/swh/scrubber/__init__.py b/swh/scrubber/__init__.py
--- a/swh/scrubber/__init__.py
+++ b/swh/scrubber/__init__.py
@@ -0,0 +1,23 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from .db import ScrubberDb
+
+
+def get_scrubber_db(cls: str, **kwargs) -> ScrubberDb:
+ if cls != "local":
+ raise ValueError(f"Unknown scrubber db class '{cls}', use 'local' instead.")
+
+ from .db import ScrubberDb
+
+ return ScrubberDb.connect(kwargs.pop("db"), **kwargs)
+
+
+get_datastore = get_scrubber_db
diff --git a/swh/scrubber/bar.py b/swh/scrubber/bar.py
deleted file mode 100644
--- a/swh/scrubber/bar.py
+++ /dev/null
@@ -1,4 +0,0 @@
-# Copyright (C) 2019 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
diff --git a/swh/scrubber/check_storage.py b/swh/scrubber/check_storage.py
new file mode 100644
--- /dev/null
+++ b/swh/scrubber/check_storage.py
@@ -0,0 +1,91 @@
+# Copyright (C) 2021-2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""Reads all objects in a swh-storage instance and recomputes their checksums."""
+
+import contextlib
+import dataclasses
+import logging
+from typing import Iterable, Union
+
+from swh.journal.serializers import value_to_kafka
+from swh.model.model import Directory, Release, Revision, Snapshot
+from swh.storage import backfill
+from swh.storage.interface import StorageInterface
+from swh.storage.postgresql.storage import Storage as PostgresqlStorage
+
+from .db import Datastore, ScrubberDb
+
+logger = logging.getLogger(__name__)
+
+ScrubbableObject = Union[Revision, Release, Snapshot, Directory]
+
+
+@contextlib.contextmanager
+def storage_db(storage):
+ db = storage.get_db()
+ try:
+ yield db
+ finally:
+ storage.put_db(db)
+
+
+@dataclasses.dataclass
+class StorageChecker:
+ db: ScrubberDb
+ storage: StorageInterface
+ object_type: str
+ start_object: str
+ end_object: str
+
+ _datastore = None
+
+ def datastore_info(self) -> Datastore:
+ if self._datastore is None:
+ if isinstance(self.storage, PostgresqlStorage):
+ with storage_db(self.storage) as db:
+ self._datastore = Datastore(
+ package="storage", cls="postgresql", instance=db.conn.dsn,
+ )
+ else:
+ raise NotImplementedError(
+ f"StorageChecker(storage={self.storage!r}).datastore()"
+ )
+ return self._datastore
+
+ def check_storage(self):
+ if isinstance(self.storage, PostgresqlStorage):
+ with storage_db(self.storage) as db:
+ return self._check_postgresql(db)
+ else:
+ raise NotImplementedError(
+ f"StorageChecker(storage={self.storage!r}).check_storage()"
+ )
+
+ def _check_postgresql(self, db):
+ for range_start, range_end in backfill.RANGE_GENERATORS[self.object_type](
+ self.start_object, self.end_object
+ ):
+ logger.info(
+ "Processing %s range %s to %s",
+ self.object_type,
+ backfill._format_range_bound(range_start),
+ backfill._format_range_bound(range_end),
+ )
+
+ objects = backfill.fetch(
+ db, self.object_type, start=range_start, end=range_end
+ )
+ objects = list(objects)
+
+ self.process_objects(objects)
+
+ def process_objects(self, objects: Iterable[ScrubbableObject]):
+ for object_ in objects:
+ real_id = object_.compute_hash()
+ if object_.id != real_id:
+ self.db.corrupt_object_add(
+ self.datastore_info(), object_, value_to_kafka(object_.to_dict())
+ )
diff --git a/swh/scrubber/cli.py b/swh/scrubber/cli.py
--- a/swh/scrubber/cli.py
+++ b/swh/scrubber/cli.py
@@ -1,3 +1,10 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import os
+
import click
from swh.core.cli import CONTEXT_SETTINGS
@@ -5,7 +12,82 @@
@swh_cli_group.group(name="scrubber", context_settings=CONTEXT_SETTINGS)
+@click.option(
+ "--config-file",
+ "-C",
+ default=None,
+ type=click.Path(exists=True, dir_okay=False,),
+ help="Configuration file.",
+)
+@click.pass_context
+def scrubber_cli_group(ctx, config_file):
+ """main command group of the datastore scrubber
+ """
+ from swh.core import config
+
+ from . import get_scrubber_db
+
+ if not config_file:
+ config_file = os.environ.get("SWH_CONFIG_FILENAME")
+
+ if config_file:
+ if not os.path.exists(config_file):
+ raise ValueError("%s does not exist" % config_file)
+ conf = config.read(config_file)
+ else:
+ conf = {}
+
+ if "scrubber_db" not in conf:
+ ctx.fail("You must have a scrubber_db configured in your config file.")
+
+ ctx.ensure_object(dict)
+ ctx.obj["config"] = conf
+ ctx.obj["db"] = get_scrubber_db(**conf["scrubber_db"])
+
+
+@scrubber_cli_group.group(name="check")
@click.pass_context
-def scrubber_cli_group(ctx):
- """main command of the datastore scrubber
+def scrubber_check_cli_group(ctx):
+ """group of commands which read from data stores and report errors.
"""
+ pass
+
+
+@scrubber_check_cli_group.command(name="storage")
+@click.option(
+ "--object-type",
+ type=click.Choice(
+ # use a hardcoded list to prevent having to load the
+ # replay module at cli loading time
+ [
+ "snapshot",
+ "revision",
+ "release",
+ "directory",
+ # TODO:
+ # "raw_extrinsic_metadata",
+ # "extid",
+ ]
+ ),
+)
+@click.option("--start-object", default="0" * 40)
+@click.option("--end-object", default="f" * 40)
+@click.pass_context
+def scrubber_check_storage(ctx, object_type: str, start_object: str, end_object: str):
+ conf = ctx.obj["config"]
+ if "storage" not in conf:
+ ctx.fail("You must have a storage configured in your config file.")
+
+ from swh.storage import get_storage
+
+ from .check_storage import StorageChecker
+
+ checker = StorageChecker(
+ db=ctx.obj["db"],
+ storage=get_storage(**conf["storage"]),
+ object_type=object_type,
+ start_object=start_object,
+ end_object=end_object,
+ )
+
+ checker.check_storage()
diff --git a/swh/scrubber/db.py b/swh/scrubber/db.py
new file mode 100644
--- /dev/null
+++ b/swh/scrubber/db.py
@@ -0,0 +1,89 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+
+import dataclasses
+import datetime
+import functools
+from typing import Iterator, Union
+
+from swh.core.db import BaseDb
+from swh.model.model import Content, Directory, Release, Revision, Snapshot
+from swh.model.swhids import CoreSWHID
+
+
+@dataclasses.dataclass(frozen=True)
+class Datastore:
+ package: str
+ cls: str
+ instance: str
+
+
+@dataclasses.dataclass(frozen=True)
+class CorruptObject:
+ id: CoreSWHID
+ datastore: Datastore
+ first_occurrence: datetime.datetime
+ object_: bytes
+
+
+class ScrubberDb(BaseDb):
+ current_version = 1
+
+ @functools.lru_cache(1000)
+ def datastore_get_or_add(self, datastore: Datastore) -> int:
+ """Creates a datastore if it does not exist, and returns its id."""
+ cur = self.cursor()
+ cur.execute(
+ """
+ INSERT INTO datastore (package, class, instance)
+ VALUES (%s, %s, %s)
+ ON CONFLICT DO NOTHING
+ RETURNING id
+ """,
+ (datastore.package, datastore.cls, datastore.instance),
+ )
+ (id_,) = cur.fetchone()
+ return id_
+
+ def corrupt_object_add(
+ self,
+ datastore: Datastore,
+ object_: Union[Content, Directory, Revision, Release, Snapshot],
+ serialized_object: bytes,
+ ) -> None:
+ datastore_id = self.datastore_get_or_add(datastore)
+ cur = self.cursor()
+ cur.execute(
+ """
+ INSERT INTO corrupt_object (id, datastore, object)
+ VALUES (%s, %s, %s)
+ ON CONFLICT DO NOTHING
+ """,
+ (str(object_.swhid()), datastore_id, serialized_object),
+ )
+
+ def corrupt_object_iter(self) -> Iterator[CorruptObject]:
+ cur = self.cursor()
+ cur.execute(
+ """
+ SELECT
+ co.id, co.first_occurrence, co.object,
+ ds.package, ds.class, ds.instance
+ FROM corrupt_object AS co
+ INNER JOIN datastore AS ds ON (ds.id=co.datastore)
+ """
+ )
+
+ for row in cur:
+ (id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row
+ yield CorruptObject(
+ id=CoreSWHID.from_string(id),
+ first_occurrence=first_occurrence,
+ object_=object_,
+ datastore=Datastore(
+ package=ds_package, cls=ds_class, instance=ds_instance
+ ),
+ )
diff --git a/swh/scrubber/sql/20-enums.sql b/swh/scrubber/sql/20-enums.sql
new file mode 100644
--- /dev/null
+++ b/swh/scrubber/sql/20-enums.sql
@@ -0,0 +1 @@
+create type datastore_type as enum ('storage', 'journal', 'objstorage');
diff --git a/swh/scrubber/sql/30-schema.sql b/swh/scrubber/sql/30-schema.sql
new file mode 100644
--- /dev/null
+++ b/swh/scrubber/sql/30-schema.sql
@@ -0,0 +1,28 @@
+create domain swhid as text check (value ~ '^swh:[0-9]+:.*');
+
+create table datastore
+(
+ id bigserial not null,
+ package datastore_type not null,
+ class text,
+ instance text
+);
+
+comment on table datastore is 'Each row identifies a data store being scrubbed';
+comment on column datastore.id is 'Internal identifier of the datastore';
+comment on column datastore.package is 'Name of the component using this datastore (storage/journal/objstorage)';
+comment on column datastore.class is 'For datastores with multiple backends, name of the backend (postgresql/cassandra for storage, kafka for journal, pathslicer/azure/winery/... for objstorage)';
+comment on column datastore.instance is 'Human-readable way to uniquely identify the datastore; eg. its URL or DSN.';
+
+create table corrupt_object
+(
+ id swhid not null,
+ datastore int not null,
+ first_occurrence timestamptz not null default now(),
+ object bytea not null
+);
+
+comment on table corrupt_object is 'Each row identifies an object that was found to be corrupt';
+comment on column corrupt_object.datastore is 'Datastore the corrupt object was found in.';
+comment on column corrupt_object.first_occurrence is 'Moment the object was found to be corrupt for the first time';
+comment on column corrupt_object.object is 'Corrupt object, as found in the datastore (possibly msgpack-encoded, using the journal''s serializer)';
diff --git a/swh/scrubber/sql/60-indexes.sql b/swh/scrubber/sql/60-indexes.sql
new file mode 100644
--- /dev/null
+++ b/swh/scrubber/sql/60-indexes.sql
@@ -0,0 +1,13 @@
+-- datastore
+
+create unique index concurrently datastore_pkey on datastore(id);
+alter table datastore add primary key using index datastore_pkey;
+
+create unique index concurrently datastore_package_class_instance on datastore(package, class, instance);
+
+-- corrupt_object
+
+alter table corrupt_object add constraint corrupt_object_datastore_fkey foreign key (datastore) references datastore(id) not valid;
+alter table corrupt_object validate constraint corrupt_object_datastore_fkey;
+
+create unique index corrupt_object_pkey on corrupt_object(id, datastore);
diff --git a/swh/scrubber/tests/conftest.py b/swh/scrubber/tests/conftest.py
new file mode 100644
--- /dev/null
+++ b/swh/scrubber/tests/conftest.py
@@ -0,0 +1,28 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from functools import partial
+
+import pytest
+from pytest_postgresql import factories
+
+from swh.core.db.pytest_plugin import initialize_database_for_module, postgresql_fact
+from swh.scrubber.db import ScrubberDb
+
+scrubber_postgresql_proc = factories.postgresql_proc(
+ dbname="scrubber",
+ load=[partial(initialize_database_for_module, modname="scrubber", version=1)],
+)
+
+postgresql_scrubber = postgresql_fact("scrubber_postgresql_proc")
+
+
+@pytest.fixture
+def scrubber_db(postgresql_scrubber):
+ db = ScrubberDb(postgresql_scrubber)
+ with db.conn.cursor() as cur:
+ cur.execute("TRUNCATE TABLE corrupt_object")
+ cur.execute("TRUNCATE TABLE datastore CASCADE")
+ yield db
diff --git a/swh/scrubber/tests/test_cli.py b/swh/scrubber/tests/test_cli.py
new file mode 100644
--- /dev/null
+++ b/swh/scrubber/tests/test_cli.py
@@ -0,0 +1,64 @@
+# Copyright (C) 2020-2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import copy
+import tempfile
+from unittest.mock import MagicMock
+
+from click.testing import CliRunner
+import yaml
+
+from swh.scrubber.check_storage import storage_db
+from swh.scrubber.cli import scrubber_cli_group
+
+CLI_CONFIG = {
+ "storage": {
+ "cls": "postgresql",
+ "db": "<replaced at runtime>",
+ "objstorage": {"cls": "memory"},
+ },
+ "scrubber_db": {"cls": "local", "db": "<replaced at runtime>"},
+}
+
+
+def invoke(swh_storage, scrubber_db, args):
+ runner = CliRunner()
+
+ config = copy.deepcopy(CLI_CONFIG)
+ with storage_db(swh_storage) as db:
+ config["storage"]["db"] = db.conn.dsn
+
+ config["scrubber_db"]["db"] = scrubber_db.conn.dsn
+
+ with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd:
+ yaml.dump(config, config_fd)
+ config_fd.seek(0)
+ args = ["-C" + config_fd.name] + list(args)
+ result = runner.invoke(scrubber_cli_group, args, catch_exceptions=False)
+ return result
+
+
+def test_check_storage(swh_storage, mocker, scrubber_db):
+ storage_checker = MagicMock()
+ StorageChecker = mocker.patch(
+ "swh.scrubber.check_storage.StorageChecker", return_value=storage_checker
+ )
+ get_scrubber_db = mocker.patch(
+ "swh.scrubber.get_scrubber_db", return_value=scrubber_db
+ )
+ result = invoke(
+ swh_storage, scrubber_db, ["check", "storage", "--object-type=snapshot"]
+ )
+ assert result.exit_code == 0, result.output
+ assert result.output == ""
+
+ get_scrubber_db.assert_called_once_with(cls="local", db=scrubber_db.conn.dsn)
+ StorageChecker.assert_called_once_with(
+ db=scrubber_db,
+ storage=StorageChecker.mock_calls[0][2]["storage"],
+ object_type="snapshot",
+ start_object="0" * 40,
+ end_object="f" * 40,
+ )
diff --git a/swh/scrubber/tests/test_nothing.py b/swh/scrubber/tests/test_nothing.py
deleted file mode 100644
--- a/swh/scrubber/tests/test_nothing.py
+++ /dev/null
@@ -1,3 +0,0 @@
-def test_nothing():
- # Placeholder; remove this when we add actual tests
- pass
diff --git a/swh/scrubber/tests/test_storage_postgresql.py b/swh/scrubber/tests/test_storage_postgresql.py
new file mode 100644
--- /dev/null
+++ b/swh/scrubber/tests/test_storage_postgresql.py
@@ -0,0 +1,105 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import datetime
+import unittest.mock
+
+import attr
+import pytest
+
+from swh.journal.serializers import kafka_to_value
+from swh.model import swhids
+from swh.model.tests import swh_model_data
+from swh.scrubber.check_storage import StorageChecker
+from swh.storage.backfill import byte_ranges
+
+# decorator to make swh.storage.backfill use less ranges, so tests run faster
+patch_byte_ranges = unittest.mock.patch(
+ "swh.storage.backfill.byte_ranges",
+ lambda numbits, start, end: byte_ranges(numbits // 8, start, end),
+)
+
+
+@patch_byte_ranges
+def test_no_corruption(scrubber_db, swh_storage):
+ swh_storage.directory_add(swh_model_data.DIRECTORIES)
+ swh_storage.revision_add(swh_model_data.REVISIONS)
+ swh_storage.release_add(swh_model_data.RELEASES)
+ swh_storage.snapshot_add(swh_model_data.SNAPSHOTS)
+
+ for object_type in ("snapshot", "release", "revision", "directory"):
+ StorageChecker(
+ db=scrubber_db,
+ storage=swh_storage,
+ object_type=object_type,
+ start_object="00" * 20,
+ end_object="ff" * 20,
+ ).check_storage()
+
+ assert list(scrubber_db.corrupt_object_iter()) == []
+
+
+@pytest.mark.parametrize("corrupt_idx", range(len(swh_model_data.SNAPSHOTS)))
+@patch_byte_ranges
+def test_corrupt_snapshot(scrubber_db, swh_storage, corrupt_idx):
+ snapshots = list(swh_model_data.SNAPSHOTS)
+ snapshots[corrupt_idx] = attr.evolve(snapshots[corrupt_idx], id=b"\x00" * 20)
+ swh_storage.snapshot_add(snapshots)
+
+ before_date = datetime.datetime.now(tz=datetime.timezone.utc)
+ for object_type in ("snapshot", "release", "revision", "directory"):
+ StorageChecker(
+ db=scrubber_db,
+ storage=swh_storage,
+ object_type=object_type,
+ start_object="00" * 20,
+ end_object="ff" * 20,
+ ).check_storage()
+ after_date = datetime.datetime.now(tz=datetime.timezone.utc)
+
+ corrupt_objects = list(scrubber_db.corrupt_object_iter())
+ assert len(corrupt_objects) == 1
+ assert corrupt_objects[0].id == swhids.CoreSWHID.from_string(
+ "swh:1:snp:0000000000000000000000000000000000000000"
+ )
+ assert corrupt_objects[0].datastore.package == "storage"
+ assert corrupt_objects[0].datastore.cls == "postgresql"
+ assert corrupt_objects[0].datastore.instance.startswith(
+ "user=postgres password=xxx dbname=storage host="
+ )
+ assert (
+ before_date - datetime.timedelta(seconds=5)
+ <= corrupt_objects[0].first_occurrence
+ <= after_date + datetime.timedelta(seconds=5)
+ )
+ assert (
+ kafka_to_value(corrupt_objects[0].object_) == snapshots[corrupt_idx].to_dict()
+ )
+
+
+@patch_byte_ranges
+def test_corrupt_snapshots(scrubber_db, swh_storage):
+ snapshots = list(swh_model_data.SNAPSHOTS)
+ for i in (0, 1):
+ snapshots[i] = attr.evolve(snapshots[i], id=bytes([i]) * 20)
+ swh_storage.snapshot_add(snapshots)
+
+ StorageChecker(
+ db=scrubber_db,
+ storage=swh_storage,
+ object_type="snapshot",
+ start_object="00" * 20,
+ end_object="ff" * 20,
+ ).check_storage()
+
+ corrupt_objects = list(scrubber_db.corrupt_object_iter())
+ assert len(corrupt_objects) == 2
+ assert {co.id for co in corrupt_objects} == {
+ swhids.CoreSWHID.from_string(swhid)
+ for swhid in [
+ "swh:1:snp:0000000000000000000000000000000000000000",
+ "swh:1:snp:0101010101010101010101010101010101010101",
+ ]
+ }

File Metadata

Mime Type
text/plain
Expires
Nov 5 2024, 5:56 AM (10 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3219365

Event Timeline