diff --git a/swh/scrubber/cli.py b/swh/scrubber/cli.py --- a/swh/scrubber/cli.py +++ b/swh/scrubber/cli.py @@ -115,7 +115,7 @@ from swh.storage import get_storage - from .check_storage import StorageChecker + from .storage_checker import StorageChecker checker = StorageChecker( db=ctx.obj["db"], @@ -125,7 +125,7 @@ end_object=end_object, ) - checker.check_storage() + checker.run() @scrubber_check_cli_group.command(name="journal") @@ -137,8 +137,8 @@ if "journal_client" not in conf: ctx.fail("You must have a journal_client configured in your config file.") - from .check_journal import JournalChecker + from .journal_checker import JournalChecker checker = JournalChecker(db=ctx.obj["db"], journal_client=conf["journal_client"],) - checker.check_journal() + checker.run() diff --git a/swh/scrubber/db.py b/swh/scrubber/db.py --- a/swh/scrubber/db.py +++ b/swh/scrubber/db.py @@ -16,9 +16,15 @@ @dataclasses.dataclass(frozen=True) class Datastore: + """Represents a datastore being scrubbed; eg. swh-storage or swh-journal.""" + package: str + """'storage', 'journal', or 'objstorage'.""" cls: str + """'postgresql'/'cassandra' for storage, 'kafka' for journal, + 'pathslicer'/'winery'/... for objstorage.""" instance: str + """Human readable string.""" @dataclasses.dataclass(frozen=True) @@ -66,6 +72,7 @@ ) def corrupt_object_iter(self) -> Iterator[CorruptObject]: + """Yields all records in the 'corrupt_object' table.""" cur = self.cursor() cur.execute( """ diff --git a/swh/scrubber/check_journal.py b/swh/scrubber/journal_checker.py rename from swh/scrubber/check_journal.py rename to swh/scrubber/journal_checker.py --- a/swh/scrubber/check_journal.py +++ b/swh/scrubber/journal_checker.py @@ -18,6 +18,9 @@ class JournalChecker: + """Reads a chunk of a swh-storage database, recomputes checksums, and + reports errors in a separate database.""" + _datastore = None def __init__(self, db: ScrubberDb, journal_client: Dict[str, Any]): @@ -31,6 +34,8 @@ ) def datastore_info(self) -> Datastore: + """Returns a :class:`Datastore` instance representing the journal instance + being checked.""" if self._datastore is None: config = self.journal_client_config if config["cls"] == "kafka": @@ -48,7 +53,10 @@ ) return self._datastore - def check_journal(self): + def run(self): + """Runs a journal client with the given configuration. + This method does not return, unless otherwise configured (with ``stop_on_eof``). + """ self.journal_client.process(self.process_kafka_messages) def process_kafka_messages(self, all_messages: Dict[str, List[bytes]]): diff --git a/swh/scrubber/check_storage.py b/swh/scrubber/storage_checker.py rename from swh/scrubber/check_storage.py rename to swh/scrubber/storage_checker.py --- a/swh/scrubber/check_storage.py +++ b/swh/scrubber/storage_checker.py @@ -34,15 +34,23 @@ @dataclasses.dataclass class StorageChecker: + """Reads a chunk of a swh-storage database, recomputes checksums, and + reports errors in a separate database.""" + db: ScrubberDb storage: StorageInterface object_type: str + """``directory``/``revision``/``release``/``snapshot``""" start_object: str + """minimum value of the hexdigest of the object's sha1.""" end_object: str + """maximum value of the hexdigest of the object's sha1.""" _datastore = None def datastore_info(self) -> Datastore: + """Returns a :class:`Datastore` instance representing the swh-storage instance + being checked.""" if self._datastore is None: if isinstance(self.storage, PostgresqlStorage): with storage_db(self.storage) as db: @@ -55,7 +63,10 @@ ) return self._datastore - def check_storage(self): + def run(self): + """Runs on all objects of ``object_type`` and with id between + ``start_object`` and ``end_object``. + """ if isinstance(self.storage, PostgresqlStorage): with storage_db(self.storage) as db: return self._check_postgresql(db) diff --git a/swh/scrubber/tests/test_cli.py b/swh/scrubber/tests/test_cli.py --- a/swh/scrubber/tests/test_cli.py +++ b/swh/scrubber/tests/test_cli.py @@ -4,13 +4,13 @@ # See top-level LICENSE file for more information import tempfile -from unittest.mock import MagicMock +from unittest.mock import MagicMock, call from click.testing import CliRunner import yaml -from swh.scrubber.check_storage import storage_db from swh.scrubber.cli import scrubber_cli_group +from swh.scrubber.storage_checker import storage_db def invoke( @@ -59,7 +59,7 @@ def test_check_storage(mocker, scrubber_db, swh_storage): storage_checker = MagicMock() StorageChecker = mocker.patch( - "swh.scrubber.check_storage.StorageChecker", return_value=storage_checker + "swh.scrubber.storage_checker.StorageChecker", return_value=storage_checker ) get_scrubber_db = mocker.patch( "swh.scrubber.get_scrubber_db", return_value=scrubber_db @@ -78,6 +78,7 @@ start_object="0" * 40, end_object="f" * 40, ) + assert storage_checker.method_calls == [call.run()] def test_check_journal( @@ -85,7 +86,7 @@ ): journal_checker = MagicMock() JournalChecker = mocker.patch( - "swh.scrubber.check_journal.JournalChecker", return_value=journal_checker + "swh.scrubber.journal_checker.JournalChecker", return_value=journal_checker ) get_scrubber_db = mocker.patch( "swh.scrubber.get_scrubber_db", return_value=scrubber_db @@ -111,3 +112,4 @@ "stop_on_eof": True, }, ) + assert journal_checker.method_calls == [call.run()] diff --git a/swh/scrubber/tests/test_journal_kafka.py b/swh/scrubber/tests/test_journal_kafka.py --- a/swh/scrubber/tests/test_journal_kafka.py +++ b/swh/scrubber/tests/test_journal_kafka.py @@ -12,7 +12,7 @@ from swh.journal.writer import get_journal_writer from swh.model import swhids from swh.model.tests import swh_model_data -from swh.scrubber.check_journal import JournalChecker +from swh.scrubber.journal_checker import JournalChecker def journal_client_config(kafka_server, kafka_prefix, kafka_consumer_group): @@ -47,7 +47,7 @@ journal_client=journal_client_config( kafka_server, kafka_prefix, kafka_consumer_group ), - ).check_journal() + ).run() assert list(scrubber_db.corrupt_object_iter()) == [] @@ -68,7 +68,7 @@ journal_client=journal_client_config( kafka_server, kafka_prefix, kafka_consumer_group ), - ).check_journal() + ).run() after_date = datetime.datetime.now(tz=datetime.timezone.utc) corrupt_objects = list(scrubber_db.corrupt_object_iter()) @@ -107,7 +107,7 @@ journal_client=journal_client_config( kafka_server, kafka_prefix, kafka_consumer_group ), - ).check_journal() + ).run() corrupt_objects = list(scrubber_db.corrupt_object_iter()) assert len(corrupt_objects) == 2 diff --git a/swh/scrubber/tests/test_storage_postgresql.py b/swh/scrubber/tests/test_storage_postgresql.py --- a/swh/scrubber/tests/test_storage_postgresql.py +++ b/swh/scrubber/tests/test_storage_postgresql.py @@ -12,7 +12,7 @@ from swh.journal.serializers import kafka_to_value from swh.model import swhids from swh.model.tests import swh_model_data -from swh.scrubber.check_storage import StorageChecker +from swh.scrubber.storage_checker import StorageChecker from swh.storage.backfill import byte_ranges # decorator to make swh.storage.backfill use less ranges, so tests run faster @@ -36,7 +36,7 @@ object_type=object_type, start_object="00" * 20, end_object="ff" * 20, - ).check_storage() + ).run() assert list(scrubber_db.corrupt_object_iter()) == [] @@ -56,7 +56,7 @@ object_type=object_type, start_object="00" * 20, end_object="ff" * 20, - ).check_storage() + ).run() after_date = datetime.datetime.now(tz=datetime.timezone.utc) corrupt_objects = list(scrubber_db.corrupt_object_iter()) @@ -92,7 +92,7 @@ object_type="snapshot", start_object="00" * 20, end_object="ff" * 20, - ).check_storage() + ).run() corrupt_objects = list(scrubber_db.corrupt_object_iter()) assert len(corrupt_objects) == 2