diff --git a/swh/scrubber/cli.py b/swh/scrubber/cli.py index f218d26..f621289 100644 --- a/swh/scrubber/cli.py +++ b/swh/scrubber/cli.py @@ -1,144 +1,144 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os from typing import Optional import click from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group @swh_cli_group.group(name="scrubber", context_settings=CONTEXT_SETTINGS) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.", ) @click.pass_context def scrubber_cli_group(ctx, config_file: Optional[str]) -> None: """main command group of the datastore scrubber Expected config format:: scrubber_db: cls: local db: "service=..." # libpq DSN # for storage checkers only: storage: cls: postgresql # cannot be remote, as it needs direct access to the pg DB db": "service=..." # libpq DSN objstorage: cls: memory # for journal checkers only: journal_client: # see https://docs.softwareheritage.org/devel/apidoc/swh.journal.client.html # for the full list of options sasl.mechanism: SCRAM-SHA-512 security.protocol: SASL_SSL sasl.username: ... sasl.password: ... group_id: ... privileged: True message.max.bytes: 524288000 brokers: - "broker1.journal.softwareheritage.org:9093 - "broker2.journal.softwareheritage.org:9093 - "broker3.journal.softwareheritage.org:9093 - "broker4.journal.softwareheritage.org:9093 - "broker5.journal.softwareheritage.org:9093 object_types: [directory, revision, snapshot, release] auto_offset_reset: earliest """ from swh.core import config from . import get_scrubber_db if not config_file: config_file = os.environ.get("SWH_CONFIG_FILENAME") if config_file: if not os.path.exists(config_file): raise ValueError("%s does not exist" % config_file) conf = config.read(config_file) else: conf = {} if "scrubber_db" not in conf: ctx.fail("You must have a scrubber_db configured in your config file.") ctx.ensure_object(dict) ctx.obj["config"] = conf ctx.obj["db"] = get_scrubber_db(**conf["scrubber_db"]) @scrubber_cli_group.group(name="check") @click.pass_context def scrubber_check_cli_group(ctx): """group of commands which read from data stores and report errors. """ pass @scrubber_check_cli_group.command(name="storage") @click.option( "--object-type", type=click.Choice( # use a hardcoded list to prevent having to load the # replay module at cli loading time [ "snapshot", "revision", "release", "directory", # TODO: # "raw_extrinsic_metadata", # "extid", ] ), ) @click.option("--start-object", default="0" * 40) @click.option("--end-object", default="f" * 40) @click.pass_context def scrubber_check_storage(ctx, object_type: str, start_object: str, end_object: str): """Reads a postgresql storage, and reports corrupt objects to the scrubber DB.""" conf = ctx.obj["config"] if "storage" not in conf: ctx.fail("You must have a storage configured in your config file.") from swh.storage import get_storage - from .check_storage import StorageChecker + from .storage_checker import StorageChecker checker = StorageChecker( db=ctx.obj["db"], storage=get_storage(**conf["storage"]), object_type=object_type, start_object=start_object, end_object=end_object, ) - checker.check_storage() + checker.run() @scrubber_check_cli_group.command(name="journal") @click.pass_context def scrubber_check_journal(ctx) -> None: """Reads a complete kafka journal, and reports corrupt objects to the scrubber DB.""" conf = ctx.obj["config"] if "journal_client" not in conf: ctx.fail("You must have a journal_client configured in your config file.") - from .check_journal import JournalChecker + from .journal_checker import JournalChecker checker = JournalChecker(db=ctx.obj["db"], journal_client=conf["journal_client"],) - checker.check_journal() + checker.run() diff --git a/swh/scrubber/db.py b/swh/scrubber/db.py index 1b2af99..b915a6e 100644 --- a/swh/scrubber/db.py +++ b/swh/scrubber/db.py @@ -1,89 +1,96 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import dataclasses import datetime import functools from typing import Iterator, Union from swh.core.db import BaseDb from swh.model.model import Content, Directory, Release, Revision, Snapshot from swh.model.swhids import CoreSWHID @dataclasses.dataclass(frozen=True) class Datastore: + """Represents a datastore being scrubbed; eg. swh-storage or swh-journal.""" + package: str + """'storage', 'journal', or 'objstorage'.""" cls: str + """'postgresql'/'cassandra' for storage, 'kafka' for journal, + 'pathslicer'/'winery'/... for objstorage.""" instance: str + """Human readable string.""" @dataclasses.dataclass(frozen=True) class CorruptObject: id: CoreSWHID datastore: Datastore first_occurrence: datetime.datetime object_: bytes class ScrubberDb(BaseDb): current_version = 1 @functools.lru_cache(1000) def datastore_get_or_add(self, datastore: Datastore) -> int: """Creates a datastore if it does not exist, and returns its id.""" cur = self.cursor() cur.execute( """ INSERT INTO datastore (package, class, instance) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING RETURNING id """, (datastore.package, datastore.cls, datastore.instance), ) (id_,) = cur.fetchone() return id_ def corrupt_object_add( self, datastore: Datastore, object_: Union[Content, Directory, Revision, Release, Snapshot], serialized_object: bytes, ) -> None: datastore_id = self.datastore_get_or_add(datastore) cur = self.cursor() cur.execute( """ INSERT INTO corrupt_object (id, datastore, object) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING """, (str(object_.swhid()), datastore_id, serialized_object), ) def corrupt_object_iter(self) -> Iterator[CorruptObject]: + """Yields all records in the 'corrupt_object' table.""" cur = self.cursor() cur.execute( """ SELECT co.id, co.first_occurrence, co.object, ds.package, ds.class, ds.instance FROM corrupt_object AS co INNER JOIN datastore AS ds ON (ds.id=co.datastore) """ ) for row in cur: (id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row yield CorruptObject( id=CoreSWHID.from_string(id), first_occurrence=first_occurrence, object_=object_, datastore=Datastore( package=ds_package, cls=ds_class, instance=ds_instance ), ) diff --git a/swh/scrubber/check_journal.py b/swh/scrubber/journal_checker.py similarity index 84% rename from swh/scrubber/check_journal.py rename to swh/scrubber/journal_checker.py index 30c1379..369a3ea 100644 --- a/swh/scrubber/check_journal.py +++ b/swh/scrubber/journal_checker.py @@ -1,61 +1,69 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Reads all objects in a swh-storage instance and recomputes their checksums.""" import logging from typing import Any, Dict, List from swh.journal.client import get_journal_client from swh.journal.serializers import kafka_to_value from swh.model import model from .db import Datastore, ScrubberDb logger = logging.getLogger(__name__) class JournalChecker: + """Reads a chunk of a swh-storage database, recomputes checksums, and + reports errors in a separate database.""" + _datastore = None def __init__(self, db: ScrubberDb, journal_client: Dict[str, Any]): self.db = db self.journal_client_config = journal_client self.journal_client = get_journal_client( **journal_client, # Remove default deserializer; so process_kafka_values() gets the message # verbatim so it can archive it with as few modifications a possible. value_deserializer=lambda obj_type, msg: msg, ) def datastore_info(self) -> Datastore: + """Returns a :class:`Datastore` instance representing the journal instance + being checked.""" if self._datastore is None: config = self.journal_client_config if config["cls"] == "kafka": self._datastore = Datastore( package="journal", cls="kafka", instance=( f"brokers={config['brokers']!r} prefix={config['prefix']!r}" ), ) else: raise NotImplementedError( f"StorageChecker(journal_client={self.journal_client_config!r})" f".datastore()" ) return self._datastore - def check_journal(self): + def run(self): + """Runs a journal client with the given configuration. + This method does not return, unless otherwise configured (with ``stop_on_eof``). + """ self.journal_client.process(self.process_kafka_messages) def process_kafka_messages(self, all_messages: Dict[str, List[bytes]]): for (object_type, messages) in all_messages.items(): cls = getattr(model, object_type.capitalize()) for message in messages: object_ = cls.from_dict(kafka_to_value(message)) real_id = object_.compute_hash() if object_.id != real_id: self.db.corrupt_object_add(self.datastore_info(), object_, message) diff --git a/swh/scrubber/check_storage.py b/swh/scrubber/storage_checker.py similarity index 83% rename from swh/scrubber/check_storage.py rename to swh/scrubber/storage_checker.py index ef86e66..afb3835 100644 --- a/swh/scrubber/check_storage.py +++ b/swh/scrubber/storage_checker.py @@ -1,91 +1,102 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Reads all objects in a swh-storage instance and recomputes their checksums.""" import contextlib import dataclasses import logging from typing import Iterable, Union from swh.journal.serializers import value_to_kafka from swh.model.model import Directory, Release, Revision, Snapshot from swh.storage import backfill from swh.storage.interface import StorageInterface from swh.storage.postgresql.storage import Storage as PostgresqlStorage from .db import Datastore, ScrubberDb logger = logging.getLogger(__name__) ScrubbableObject = Union[Revision, Release, Snapshot, Directory] @contextlib.contextmanager def storage_db(storage): db = storage.get_db() try: yield db finally: storage.put_db(db) @dataclasses.dataclass class StorageChecker: + """Reads a chunk of a swh-storage database, recomputes checksums, and + reports errors in a separate database.""" + db: ScrubberDb storage: StorageInterface object_type: str + """``directory``/``revision``/``release``/``snapshot``""" start_object: str + """minimum value of the hexdigest of the object's sha1.""" end_object: str + """maximum value of the hexdigest of the object's sha1.""" _datastore = None def datastore_info(self) -> Datastore: + """Returns a :class:`Datastore` instance representing the swh-storage instance + being checked.""" if self._datastore is None: if isinstance(self.storage, PostgresqlStorage): with storage_db(self.storage) as db: self._datastore = Datastore( package="storage", cls="postgresql", instance=db.conn.dsn, ) else: raise NotImplementedError( f"StorageChecker(storage={self.storage!r}).datastore()" ) return self._datastore - def check_storage(self): + def run(self): + """Runs on all objects of ``object_type`` and with id between + ``start_object`` and ``end_object``. + """ if isinstance(self.storage, PostgresqlStorage): with storage_db(self.storage) as db: return self._check_postgresql(db) else: raise NotImplementedError( f"StorageChecker(storage={self.storage!r}).check_storage()" ) def _check_postgresql(self, db): for range_start, range_end in backfill.RANGE_GENERATORS[self.object_type]( self.start_object, self.end_object ): logger.info( "Processing %s range %s to %s", self.object_type, backfill._format_range_bound(range_start), backfill._format_range_bound(range_end), ) objects = backfill.fetch( db, self.object_type, start=range_start, end=range_end ) objects = list(objects) self.process_objects(objects) def process_objects(self, objects: Iterable[ScrubbableObject]): for object_ in objects: real_id = object_.compute_hash() if object_.id != real_id: self.db.corrupt_object_add( self.datastore_info(), object_, value_to_kafka(object_.to_dict()) ) diff --git a/swh/scrubber/tests/test_cli.py b/swh/scrubber/tests/test_cli.py index 29f10ec..3602813 100644 --- a/swh/scrubber/tests/test_cli.py +++ b/swh/scrubber/tests/test_cli.py @@ -1,113 +1,115 @@ # Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tempfile -from unittest.mock import MagicMock +from unittest.mock import MagicMock, call from click.testing import CliRunner import yaml -from swh.scrubber.check_storage import storage_db from swh.scrubber.cli import scrubber_cli_group +from swh.scrubber.storage_checker import storage_db def invoke( scrubber_db, args, storage=None, kafka_server=None, kafka_prefix=None, kafka_consumer_group=None, ): runner = CliRunner() config = { "scrubber_db": {"cls": "local", "db": scrubber_db.conn.dsn}, } if storage: with storage_db(storage) as db: config["storage"] = { "cls": "postgresql", "db": db.conn.dsn, "objstorage": {"cls": "memory"}, } assert ( (kafka_server is None) == (kafka_prefix is None) == (kafka_consumer_group is None) ) if kafka_server: config["journal_client"] = dict( cls="kafka", brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, ) with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: yaml.dump(config, config_fd) config_fd.seek(0) args = ["-C" + config_fd.name] + list(args) result = runner.invoke(scrubber_cli_group, args, catch_exceptions=False) return result def test_check_storage(mocker, scrubber_db, swh_storage): storage_checker = MagicMock() StorageChecker = mocker.patch( - "swh.scrubber.check_storage.StorageChecker", return_value=storage_checker + "swh.scrubber.storage_checker.StorageChecker", return_value=storage_checker ) get_scrubber_db = mocker.patch( "swh.scrubber.get_scrubber_db", return_value=scrubber_db ) result = invoke( scrubber_db, ["check", "storage", "--object-type=snapshot"], storage=swh_storage ) assert result.exit_code == 0, result.output assert result.output == "" get_scrubber_db.assert_called_once_with(cls="local", db=scrubber_db.conn.dsn) StorageChecker.assert_called_once_with( db=scrubber_db, storage=StorageChecker.mock_calls[0][2]["storage"], object_type="snapshot", start_object="0" * 40, end_object="f" * 40, ) + assert storage_checker.method_calls == [call.run()] def test_check_journal( mocker, scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group ): journal_checker = MagicMock() JournalChecker = mocker.patch( - "swh.scrubber.check_journal.JournalChecker", return_value=journal_checker + "swh.scrubber.journal_checker.JournalChecker", return_value=journal_checker ) get_scrubber_db = mocker.patch( "swh.scrubber.get_scrubber_db", return_value=scrubber_db ) result = invoke( scrubber_db, ["check", "journal"], kafka_server=kafka_server, kafka_prefix=kafka_prefix, kafka_consumer_group=kafka_consumer_group, ) assert result.exit_code == 0, result.output assert result.output == "" get_scrubber_db.assert_called_once_with(cls="local", db=scrubber_db.conn.dsn) JournalChecker.assert_called_once_with( db=scrubber_db, journal_client={ "brokers": kafka_server, "cls": "kafka", "group_id": kafka_consumer_group, "prefix": kafka_prefix, "stop_on_eof": True, }, ) + assert journal_checker.method_calls == [call.run()] diff --git a/swh/scrubber/tests/test_journal_kafka.py b/swh/scrubber/tests/test_journal_kafka.py index 5f455f2..09d752b 100644 --- a/swh/scrubber/tests/test_journal_kafka.py +++ b/swh/scrubber/tests/test_journal_kafka.py @@ -1,120 +1,120 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import attr import pytest from swh.journal.serializers import kafka_to_value from swh.journal.writer import get_journal_writer from swh.model import swhids from swh.model.tests import swh_model_data -from swh.scrubber.check_journal import JournalChecker +from swh.scrubber.journal_checker import JournalChecker def journal_client_config(kafka_server, kafka_prefix, kafka_consumer_group): return dict( cls="kafka", brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, ) def journal_writer(kafka_server, kafka_prefix): return get_journal_writer( cls="kafka", brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, anonymize=False, ) def test_no_corruption(scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group): writer = journal_writer(kafka_server, kafka_prefix) writer.write_additions("directory", swh_model_data.DIRECTORIES) writer.write_additions("revision", swh_model_data.REVISIONS) writer.write_additions("release", swh_model_data.RELEASES) writer.write_additions("snapshot", swh_model_data.SNAPSHOTS) JournalChecker( db=scrubber_db, journal_client=journal_client_config( kafka_server, kafka_prefix, kafka_consumer_group ), - ).check_journal() + ).run() assert list(scrubber_db.corrupt_object_iter()) == [] @pytest.mark.parametrize("corrupt_idx", range(len(swh_model_data.SNAPSHOTS))) def test_corrupt_snapshot( scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group, corrupt_idx ): snapshots = list(swh_model_data.SNAPSHOTS) snapshots[corrupt_idx] = attr.evolve(snapshots[corrupt_idx], id=b"\x00" * 20) writer = journal_writer(kafka_server, kafka_prefix) writer.write_additions("snapshot", snapshots) before_date = datetime.datetime.now(tz=datetime.timezone.utc) JournalChecker( db=scrubber_db, journal_client=journal_client_config( kafka_server, kafka_prefix, kafka_consumer_group ), - ).check_journal() + ).run() after_date = datetime.datetime.now(tz=datetime.timezone.utc) corrupt_objects = list(scrubber_db.corrupt_object_iter()) assert len(corrupt_objects) == 1 assert corrupt_objects[0].id == swhids.CoreSWHID.from_string( "swh:1:snp:0000000000000000000000000000000000000000" ) assert corrupt_objects[0].datastore.package == "journal" assert corrupt_objects[0].datastore.cls == "kafka" assert ( corrupt_objects[0].datastore.instance == f"brokers='{kafka_server}' prefix='{kafka_prefix}'" ) assert ( before_date - datetime.timedelta(seconds=5) <= corrupt_objects[0].first_occurrence <= after_date + datetime.timedelta(seconds=5) ) assert ( kafka_to_value(corrupt_objects[0].object_) == snapshots[corrupt_idx].to_dict() ) def test_corrupt_snapshots( scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group ): snapshots = list(swh_model_data.SNAPSHOTS) for i in (0, 1): snapshots[i] = attr.evolve(snapshots[i], id=bytes([i]) * 20) writer = journal_writer(kafka_server, kafka_prefix) writer.write_additions("snapshot", snapshots) JournalChecker( db=scrubber_db, journal_client=journal_client_config( kafka_server, kafka_prefix, kafka_consumer_group ), - ).check_journal() + ).run() corrupt_objects = list(scrubber_db.corrupt_object_iter()) assert len(corrupt_objects) == 2 assert {co.id for co in corrupt_objects} == { swhids.CoreSWHID.from_string(swhid) for swhid in [ "swh:1:snp:0000000000000000000000000000000000000000", "swh:1:snp:0101010101010101010101010101010101010101", ] } diff --git a/swh/scrubber/tests/test_storage_postgresql.py b/swh/scrubber/tests/test_storage_postgresql.py index 3fc9988..b43b777 100644 --- a/swh/scrubber/tests/test_storage_postgresql.py +++ b/swh/scrubber/tests/test_storage_postgresql.py @@ -1,105 +1,105 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import unittest.mock import attr import pytest from swh.journal.serializers import kafka_to_value from swh.model import swhids from swh.model.tests import swh_model_data -from swh.scrubber.check_storage import StorageChecker +from swh.scrubber.storage_checker import StorageChecker from swh.storage.backfill import byte_ranges # decorator to make swh.storage.backfill use less ranges, so tests run faster patch_byte_ranges = unittest.mock.patch( "swh.storage.backfill.byte_ranges", lambda numbits, start, end: byte_ranges(numbits // 8, start, end), ) @patch_byte_ranges def test_no_corruption(scrubber_db, swh_storage): swh_storage.directory_add(swh_model_data.DIRECTORIES) swh_storage.revision_add(swh_model_data.REVISIONS) swh_storage.release_add(swh_model_data.RELEASES) swh_storage.snapshot_add(swh_model_data.SNAPSHOTS) for object_type in ("snapshot", "release", "revision", "directory"): StorageChecker( db=scrubber_db, storage=swh_storage, object_type=object_type, start_object="00" * 20, end_object="ff" * 20, - ).check_storage() + ).run() assert list(scrubber_db.corrupt_object_iter()) == [] @pytest.mark.parametrize("corrupt_idx", range(len(swh_model_data.SNAPSHOTS))) @patch_byte_ranges def test_corrupt_snapshot(scrubber_db, swh_storage, corrupt_idx): snapshots = list(swh_model_data.SNAPSHOTS) snapshots[corrupt_idx] = attr.evolve(snapshots[corrupt_idx], id=b"\x00" * 20) swh_storage.snapshot_add(snapshots) before_date = datetime.datetime.now(tz=datetime.timezone.utc) for object_type in ("snapshot", "release", "revision", "directory"): StorageChecker( db=scrubber_db, storage=swh_storage, object_type=object_type, start_object="00" * 20, end_object="ff" * 20, - ).check_storage() + ).run() after_date = datetime.datetime.now(tz=datetime.timezone.utc) corrupt_objects = list(scrubber_db.corrupt_object_iter()) assert len(corrupt_objects) == 1 assert corrupt_objects[0].id == swhids.CoreSWHID.from_string( "swh:1:snp:0000000000000000000000000000000000000000" ) assert corrupt_objects[0].datastore.package == "storage" assert corrupt_objects[0].datastore.cls == "postgresql" assert corrupt_objects[0].datastore.instance.startswith( "user=postgres password=xxx dbname=storage host=" ) assert ( before_date - datetime.timedelta(seconds=5) <= corrupt_objects[0].first_occurrence <= after_date + datetime.timedelta(seconds=5) ) assert ( kafka_to_value(corrupt_objects[0].object_) == snapshots[corrupt_idx].to_dict() ) @patch_byte_ranges def test_corrupt_snapshots(scrubber_db, swh_storage): snapshots = list(swh_model_data.SNAPSHOTS) for i in (0, 1): snapshots[i] = attr.evolve(snapshots[i], id=bytes([i]) * 20) swh_storage.snapshot_add(snapshots) StorageChecker( db=scrubber_db, storage=swh_storage, object_type="snapshot", start_object="00" * 20, end_object="ff" * 20, - ).check_storage() + ).run() corrupt_objects = list(scrubber_db.corrupt_object_iter()) assert len(corrupt_objects) == 2 assert {co.id for co in corrupt_objects} == { swhids.CoreSWHID.from_string(swhid) for swhid in [ "swh:1:snp:0000000000000000000000000000000000000000", "swh:1:snp:0101010101010101010101010101010101010101", ] }