diff --git a/docs/README.rst b/docs/README.rst --- a/docs/README.rst +++ b/docs/README.rst @@ -20,6 +20,10 @@ There is one "checker" for each datastore package: storage (postgresql and cassandra), journal (kafka), and objstorage. +The journal is "crawled" using its native streaming; others are crawled by range, +reusing swh-storage's backfiller utilities, and checkpointed from time to time +to the scrubber's database (in the ``checked_range`` table). + Recovery -------- diff --git a/swh/scrubber/storage_checker.py b/swh/scrubber/storage_checker.py --- a/swh/scrubber/storage_checker.py +++ b/swh/scrubber/storage_checker.py @@ -8,8 +8,9 @@ import collections import contextlib import dataclasses +import datetime import logging -from typing import Iterable, Union +from typing import Iterable, Optional, Tuple, Union from swh.core.statsd import Statsd from swh.journal.serializers import value_to_kafka @@ -43,6 +44,43 @@ storage.put_db(db) +def _get_inclusive_range_swhids( + inclusive_range_start: Optional[bytes], + exclusive_range_end: Optional[bytes], + object_type: swhids.ObjectType, +) -> Tuple[swhids.CoreSWHID, swhids.CoreSWHID]: + r""" + Given a ``[range_start, range_end)`` right-open interval of id prefixes + and an object type (as returned by :const:`swh.storage.backfill.RANGE_GENERATORS`), + returns a ``[range_start_swhid, range_end_swhid]`` closed interval of SWHIDs + suitable for the scrubber database. + + >>> _get_inclusive_range_swhids(b"\x42", None, swhids.ObjectType.SNAPSHOT) + (CoreSWHID.from_string('swh:1:snp:4200000000000000000000000000000000000000'), CoreSWHID.from_string('swh:1:snp:ffffffffffffffffffffffffffffffffffffffff')) + + >>> _get_inclusive_range_swhids(b"\x00", b"\x12\x34", swhids.ObjectType.REVISION) + (CoreSWHID.from_string('swh:1:rev:0000000000000000000000000000000000000000'), CoreSWHID.from_string('swh:1:rev:1233ffffffffffffffffffffffffffffffffffff')) + + """ # noqa + range_start_swhid = swhids.CoreSWHID( + object_type=object_type, + object_id=(inclusive_range_start or b"").ljust(20, b"\00"), + ) + if exclusive_range_end is None: + inclusive_range_end = b"\xff" * 20 + else: + # convert "1230000000..." to "122fffffff..." + inclusive_range_end = ( + int.from_bytes(exclusive_range_end.ljust(20, b"\x00"), "big") - 1 + ).to_bytes(20, "big") + range_end_swhid = swhids.CoreSWHID( + object_type=object_type, + object_id=inclusive_range_end, + ) + + return (range_start_swhid, range_end_swhid) + + @dataclasses.dataclass class StorageChecker: """Reads a chunk of a swh-storage database, recomputes checksums, and @@ -98,9 +136,15 @@ ) def _check_postgresql(self, db): + object_type = getattr(swhids.ObjectType, self.object_type.upper()) for range_start, range_end in backfill.RANGE_GENERATORS[self.object_type]( self.start_object, self.end_object ): + (range_start_swhid, range_end_swhid) = _get_inclusive_range_swhids( + range_start, range_end, object_type + ) + + start_time = datetime.datetime.now(tz=datetime.timezone.utc) logger.debug( "Processing %s range %s to %s", self.object_type, @@ -122,6 +166,13 @@ ): self.check_object_references(objects) + self.db.checked_range_upsert( + self.datastore_info(), + range_start_swhid, + range_end_swhid, + start_time, + ) + def check_object_hashes(self, objects: Iterable[ScrubbableObject]): """Recomputes hashes, and reports mismatches.""" count = 0 diff --git a/swh/scrubber/tests/test_storage_postgresql.py b/swh/scrubber/tests/test_storage_postgresql.py --- a/swh/scrubber/tests/test_storage_postgresql.py +++ b/swh/scrubber/tests/test_storage_postgresql.py @@ -12,7 +12,8 @@ from swh.journal.serializers import kafka_to_value from swh.model import model, swhids from swh.model.tests import swh_model_data -from swh.scrubber.storage_checker import StorageChecker +from swh.scrubber.db import Datastore +from swh.scrubber.storage_checker import StorageChecker, storage_db from swh.storage.backfill import byte_ranges CONTENT1 = model.Content.from_data(b"foo") @@ -58,6 +59,16 @@ ) +@pytest.fixture +def datastore(swh_storage): + with storage_db(swh_storage) as db: + return Datastore( + package="storage", + cls="postgresql", + instance=db.conn.dsn, + ) + + # decorator to make swh.storage.backfill use fewer ranges, so tests run faster patch_byte_ranges = unittest.mock.patch( "swh.storage.backfill.byte_ranges", @@ -65,13 +76,98 @@ ) +def _short_ranges(type_): + return [ + ( + f"swh:1:{type_}:0000000000000000000000000000000000000000", + f"swh:1:{type_}:1fffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:2000000000000000000000000000000000000000", + f"swh:1:{type_}:3fffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:4000000000000000000000000000000000000000", + f"swh:1:{type_}:5fffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:6000000000000000000000000000000000000000", + f"swh:1:{type_}:7fffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:8000000000000000000000000000000000000000", + f"swh:1:{type_}:9fffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:a000000000000000000000000000000000000000", + f"swh:1:{type_}:bfffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:c000000000000000000000000000000000000000", + f"swh:1:{type_}:dfffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:e000000000000000000000000000000000000000", + f"swh:1:{type_}:ffffffffffffffffffffffffffffffffffffffff", + ), + ] + + +def _long_ranges(type_): + return [ + ( + f"swh:1:{type_}:0000000000000000000000000000000000000000", + f"swh:1:{type_}:3fffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:4000000000000000000000000000000000000000", + f"swh:1:{type_}:7fffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:8000000000000000000000000000000000000000", + f"swh:1:{type_}:bfffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:c000000000000000000000000000000000000000", + f"swh:1:{type_}:ffffffffffffffffffffffffffffffffffffffff", + ), + ] + + +EXPECTED_RANGES = [ + *_short_ranges("dir"), + *_long_ranges("rel"), + *_short_ranges("rev"), + *_long_ranges("snp"), +] + + +def assert_checked_ranges( + scrubber_db, datastore, expected_ranges, before_date=None, after_date=None +): + if before_date is not None: + assert all( + before_date < date < after_date + for (_, _, date) in scrubber_db.checked_range_iter(datastore) + ) + + checked_ranges = [ + (str(start), str(end)) + for (start, end, date) in scrubber_db.checked_range_iter(datastore) + ] + checked_ranges.sort(key=str) + + assert checked_ranges == expected_ranges + + @patch_byte_ranges -def test_no_corruption(scrubber_db, swh_storage): +def test_no_corruption(scrubber_db, datastore, swh_storage): swh_storage.directory_add(swh_model_data.DIRECTORIES) swh_storage.revision_add(swh_model_data.REVISIONS) swh_storage.release_add(swh_model_data.RELEASES) swh_storage.snapshot_add(swh_model_data.SNAPSHOTS) + before_date = datetime.datetime.now(tz=datetime.timezone.utc) for object_type in ("snapshot", "release", "revision", "directory"): StorageChecker( db=scrubber_db, @@ -80,13 +176,18 @@ start_object="00" * 20, end_object="ff" * 20, ).run() + after_date = datetime.datetime.now(tz=datetime.timezone.utc) assert list(scrubber_db.corrupt_object_iter()) == [] + assert_checked_ranges( + scrubber_db, datastore, EXPECTED_RANGES, before_date, after_date + ) + @pytest.mark.parametrize("corrupt_idx", range(len(swh_model_data.SNAPSHOTS))) @patch_byte_ranges -def test_corrupt_snapshot(scrubber_db, swh_storage, corrupt_idx): +def test_corrupt_snapshot(scrubber_db, datastore, swh_storage, corrupt_idx): storage_dsn = swh_storage.get_db().conn.dsn snapshots = list(swh_model_data.SNAPSHOTS) snapshots[corrupt_idx] = attr.evolve(snapshots[corrupt_idx], id=b"\x00" * 20) @@ -120,9 +221,13 @@ kafka_to_value(corrupt_objects[0].object_) == snapshots[corrupt_idx].to_dict() ) + assert_checked_ranges( + scrubber_db, datastore, EXPECTED_RANGES, before_date, after_date + ) + @patch_byte_ranges -def test_corrupt_snapshots_same_batch(scrubber_db, swh_storage): +def test_corrupt_snapshots_same_batch(scrubber_db, datastore, swh_storage): snapshots = list(swh_model_data.SNAPSHOTS) for i in (0, 1): snapshots[i] = attr.evolve(snapshots[i], id=bytes([i]) * 20) @@ -146,9 +251,11 @@ ] } + assert_checked_ranges(scrubber_db, datastore, _long_ranges("snp")) + @patch_byte_ranges -def test_corrupt_snapshots_different_batches(scrubber_db, swh_storage): +def test_corrupt_snapshots_different_batches(scrubber_db, datastore, swh_storage): snapshots = list(swh_model_data.SNAPSHOTS) for i in (0, 1): snapshots[i] = attr.evolve(snapshots[i], id=bytes([i * 255]) * 20) @@ -186,9 +293,11 @@ ] } + assert_checked_ranges(scrubber_db, datastore, _long_ranges("snp")) + @patch_byte_ranges -def test_no_hole(scrubber_db, swh_storage): +def test_no_hole(scrubber_db, datastore, swh_storage): swh_storage.content_add([CONTENT1]) swh_storage.directory_add([DIRECTORY1, DIRECTORY2]) swh_storage.revision_add([REVISION1]) @@ -206,13 +315,15 @@ assert list(scrubber_db.missing_object_iter()) == [] + assert_checked_ranges(scrubber_db, datastore, EXPECTED_RANGES) + @pytest.mark.parametrize( "missing_object", ["content1", "directory1", "directory2", "revision1", "release1"], ) @patch_byte_ranges -def test_one_hole(scrubber_db, swh_storage, missing_object): +def test_one_hole(scrubber_db, datastore, swh_storage, missing_object): if missing_object == "content1": missing_swhid = CONTENT1.swhid() reference_swhids = [DIRECTORY1.swhid(), DIRECTORY2.swhid()] @@ -260,9 +371,11 @@ for mor in scrubber_db.missing_object_reference_iter(missing_swhid) } == {(missing_swhid, reference_swhid) for reference_swhid in reference_swhids} + assert_checked_ranges(scrubber_db, datastore, EXPECTED_RANGES) + @patch_byte_ranges -def test_two_holes(scrubber_db, swh_storage): +def test_two_holes(scrubber_db, datastore, swh_storage): # missing content and revision swh_storage.directory_add([DIRECTORY1, DIRECTORY2]) swh_storage.release_add([RELEASE1]) @@ -289,3 +402,5 @@ mor.reference_id for mor in scrubber_db.missing_object_reference_iter(REVISION1.swhid()) } == {RELEASE1.swhid()} + + assert_checked_ranges(scrubber_db, datastore, EXPECTED_RANGES)