diff --git a/PKG-INFO b/PKG-INFO index b9ab27a..40ef69a 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,61 +1,61 @@ Metadata-Version: 2.1 Name: swh.scrubber -Version: 0.0.2 +Version: 0.0.3 Summary: Software Heritage Datastore Scrubber Home-page: https://forge.softwareheritage.org/diffusion/swh-scrubber Author: Software Heritage developers Author-email: swh-devel@inria.fr Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-scrubber Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-scrubber/ Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 3 - Alpha Requires-Python: >=3.7 Description-Content-Type: text/x-rst Provides-Extra: testing License-File: LICENSE License-File: AUTHORS Software Heritage - Datastore Scrubber ====================================== Tools to periodically checks data integrity in swh-storage and swh-objstorage, reports errors, and (try to) fix them. This is a work in progress; some of the components described below do not exist yet (cassandra storage checker, objstorage checker, recovery, and reinjection) The Scrubber package is made of the following parts: Checking -------- Highly parallel processes continuously read objects from a data store, compute checksums, and write any failure in a database, along with the data of the corrupt object. There is one "checker" for each datastore package: storage (postgresql and cassandra), journal (kafka), and objstorage. Recovery -------- Then, from time to time, jobs go through the list of known corrupt objects, and try to recover the original objects, through various means: * Brute-forcing variations until they match their checksum * Recovering from another data store * As a last resort, recovering from known origins, if any Reinjection ----------- Finally, when an original object is recovered, it is reinjected in the original data store, replacing the corrupt one. diff --git a/requirements.txt b/requirements.txt index 54ce666..e35fb35 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html +dulwich diff --git a/swh.scrubber.egg-info/PKG-INFO b/swh.scrubber.egg-info/PKG-INFO index b9ab27a..40ef69a 100644 --- a/swh.scrubber.egg-info/PKG-INFO +++ b/swh.scrubber.egg-info/PKG-INFO @@ -1,61 +1,61 @@ Metadata-Version: 2.1 Name: swh.scrubber -Version: 0.0.2 +Version: 0.0.3 Summary: Software Heritage Datastore Scrubber Home-page: https://forge.softwareheritage.org/diffusion/swh-scrubber Author: Software Heritage developers Author-email: swh-devel@inria.fr Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-scrubber Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-scrubber/ Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 3 - Alpha Requires-Python: >=3.7 Description-Content-Type: text/x-rst Provides-Extra: testing License-File: LICENSE License-File: AUTHORS Software Heritage - Datastore Scrubber ====================================== Tools to periodically checks data integrity in swh-storage and swh-objstorage, reports errors, and (try to) fix them. This is a work in progress; some of the components described below do not exist yet (cassandra storage checker, objstorage checker, recovery, and reinjection) The Scrubber package is made of the following parts: Checking -------- Highly parallel processes continuously read objects from a data store, compute checksums, and write any failure in a database, along with the data of the corrupt object. There is one "checker" for each datastore package: storage (postgresql and cassandra), journal (kafka), and objstorage. Recovery -------- Then, from time to time, jobs go through the list of known corrupt objects, and try to recover the original objects, through various means: * Brute-forcing variations until they match their checksum * Recovering from another data store * As a last resort, recovering from known origins, if any Reinjection ----------- Finally, when an original object is recovered, it is reinjected in the original data store, replacing the corrupt one. diff --git a/swh.scrubber.egg-info/SOURCES.txt b/swh.scrubber.egg-info/SOURCES.txt index c65cc27..964ff87 100644 --- a/swh.scrubber.egg-info/SOURCES.txt +++ b/swh.scrubber.egg-info/SOURCES.txt @@ -1,53 +1,54 @@ .git-blame-ignore-revs .gitignore .pre-commit-config.yaml AUTHORS CODE_OF_CONDUCT.md CONTRIBUTORS LICENSE MANIFEST.in Makefile README.rst conftest.py mypy.ini pyproject.toml pytest.ini requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py tox.ini docs/.gitignore docs/Makefile docs/README.rst docs/conf.py docs/index.rst docs/_static/.placeholder docs/_templates/.placeholder swh/__init__.py swh.scrubber.egg-info/PKG-INFO swh.scrubber.egg-info/SOURCES.txt swh.scrubber.egg-info/dependency_links.txt swh.scrubber.egg-info/entry_points.txt swh.scrubber.egg-info/requires.txt swh.scrubber.egg-info/top_level.txt swh/scrubber/__init__.py swh/scrubber/cli.py swh/scrubber/db.py swh/scrubber/fixer.py swh/scrubber/journal_checker.py swh/scrubber/origin_locator.py swh/scrubber/py.typed swh/scrubber/storage_checker.py swh/scrubber/utils.py swh/scrubber/sql/20-enums.sql swh/scrubber/sql/30-schema.sql swh/scrubber/sql/60-indexes.sql swh/scrubber/tests/__init__.py swh/scrubber/tests/conftest.py swh/scrubber/tests/test_cli.py swh/scrubber/tests/test_fixer.py +swh/scrubber/tests/test_init.py swh/scrubber/tests/test_journal_kafka.py swh/scrubber/tests/test_origin_locator.py swh/scrubber/tests/test_storage_postgresql.py \ No newline at end of file diff --git a/swh.scrubber.egg-info/requires.txt b/swh.scrubber.egg-info/requires.txt index 6d9945b..a52c064 100644 --- a/swh.scrubber.egg-info/requires.txt +++ b/swh.scrubber.egg-info/requires.txt @@ -1,12 +1,13 @@ +dulwich swh.core[http]>=0.3 swh.loader.git>=1.4.0 swh.model>=5.0.0 swh.storage>=1.1.0 swh.journal>=0.9.0 [testing] pytest pytest-mock pyyaml swh.graph types-pyyaml diff --git a/swh/scrubber/__init__.py b/swh/scrubber/__init__.py index 2527e35..1c4c0a2 100644 --- a/swh/scrubber/__init__.py +++ b/swh/scrubber/__init__.py @@ -1,23 +1,25 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from typing import TYPE_CHECKING if TYPE_CHECKING: - from .db import ScrubberDb + from swh.scrubber.db import ScrubberDb def get_scrubber_db(cls: str, **kwargs) -> ScrubberDb: - if cls != "local": - raise ValueError(f"Unknown scrubber db class '{cls}', use 'local' instead.") + if cls not in ("local", "postgresql"): + raise ValueError( + f"Unknown scrubber db class '{cls}', use 'postgresql' instead." + ) - from .db import ScrubberDb + from swh.scrubber.db import ScrubberDb return ScrubberDb.connect(kwargs.pop("db"), **kwargs) get_datastore = get_scrubber_db diff --git a/swh/scrubber/db.py b/swh/scrubber/db.py index c319497..d11b7a0 100644 --- a/swh/scrubber/db.py +++ b/swh/scrubber/db.py @@ -1,320 +1,320 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import dataclasses import datetime import functools from typing import Iterator, List, Optional import psycopg2 from swh.core.db import BaseDb from swh.model.swhids import CoreSWHID @dataclasses.dataclass(frozen=True) class Datastore: """Represents a datastore being scrubbed; eg. swh-storage or swh-journal.""" package: str """'storage', 'journal', or 'objstorage'.""" cls: str """'postgresql'/'cassandra' for storage, 'kafka' for journal, 'pathslicer'/'winery'/... for objstorage.""" instance: str """Human readable string.""" @dataclasses.dataclass(frozen=True) class CorruptObject: id: CoreSWHID datastore: Datastore first_occurrence: datetime.datetime object_: bytes @dataclasses.dataclass(frozen=True) class FixedObject: id: CoreSWHID object_: bytes method: str recovery_date: Optional[datetime.datetime] = None class ScrubberDb(BaseDb): - current_version = 1 + current_version = 2 @functools.lru_cache(1000) def datastore_get_or_add(self, datastore: Datastore) -> int: """Creates a datastore if it does not exist, and returns its id.""" cur = self.cursor() cur.execute( """ WITH inserted AS ( INSERT INTO datastore (package, class, instance) VALUES (%(package)s, %(cls)s, %(instance)s) ON CONFLICT DO NOTHING RETURNING id ) SELECT id FROM inserted UNION ( -- If the datastore already exists, we need to fetch its id SELECT id FROM datastore WHERE package=%(package)s AND class=%(cls)s AND instance=%(instance)s ) LIMIT 1 """, (dataclasses.asdict(datastore)), ) (id_,) = cur.fetchone() return id_ def corrupt_object_add( self, id: CoreSWHID, datastore: Datastore, serialized_object: bytes, ) -> None: datastore_id = self.datastore_get_or_add(datastore) cur = self.cursor() cur.execute( """ INSERT INTO corrupt_object (id, datastore, object) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING """, (str(id), datastore_id, serialized_object), ) def corrupt_object_iter(self) -> Iterator[CorruptObject]: """Yields all records in the 'corrupt_object' table.""" cur = self.cursor() cur.execute( """ SELECT co.id, co.first_occurrence, co.object, ds.package, ds.class, ds.instance FROM corrupt_object AS co INNER JOIN datastore AS ds ON (ds.id=co.datastore) """ ) for row in cur: (id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row yield CorruptObject( id=CoreSWHID.from_string(id), first_occurrence=first_occurrence, object_=object_, datastore=Datastore( package=ds_package, cls=ds_class, instance=ds_instance ), ) def _corrupt_object_list_from_cursor( self, cur: psycopg2.extensions.cursor ) -> List[CorruptObject]: results = [] for row in cur: (id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row results.append( CorruptObject( id=CoreSWHID.from_string(id), first_occurrence=first_occurrence, object_=object_, datastore=Datastore( package=ds_package, cls=ds_class, instance=ds_instance ), ) ) return results def corrupt_object_get( self, start_id: CoreSWHID, end_id: CoreSWHID, limit: int = 100, ) -> List[CorruptObject]: """Yields a page of records in the 'corrupt_object' table, ordered by id. Arguments: start_id: Only return objects after this id end_id: Only return objects before this id in_origin: An origin URL. If provided, only returns objects that may be found in the given origin """ cur = self.cursor() cur.execute( """ SELECT co.id, co.first_occurrence, co.object, ds.package, ds.class, ds.instance FROM corrupt_object AS co INNER JOIN datastore AS ds ON (ds.id=co.datastore) WHERE co.id >= %s AND co.id <= %s ORDER BY co.id LIMIT %s """, (str(start_id), str(end_id), limit), ) return self._corrupt_object_list_from_cursor(cur) def corrupt_object_grab_by_id( self, cur: psycopg2.extensions.cursor, start_id: CoreSWHID, end_id: CoreSWHID, limit: int = 100, ) -> List[CorruptObject]: """Returns a page of records in the 'corrupt_object' table for a fixer, ordered by id These records are not already fixed (ie. do not have a corresponding entry in the 'fixed_object' table), and they are selected with an exclusive update lock. Arguments: start_id: Only return objects after this id end_id: Only return objects before this id """ cur.execute( """ SELECT co.id, co.first_occurrence, co.object, ds.package, ds.class, ds.instance FROM corrupt_object AS co INNER JOIN datastore AS ds ON (ds.id=co.datastore) WHERE co.id >= %(start_id)s AND co.id <= %(end_id)s AND NOT EXISTS (SELECT 1 FROM fixed_object WHERE fixed_object.id=co.id) ORDER BY co.id LIMIT %(limit)s FOR UPDATE SKIP LOCKED """, dict( start_id=str(start_id), end_id=str(end_id), limit=limit, ), ) return self._corrupt_object_list_from_cursor(cur) def corrupt_object_grab_by_origin( self, cur: psycopg2.extensions.cursor, origin_url: str, start_id: Optional[CoreSWHID] = None, end_id: Optional[CoreSWHID] = None, limit: int = 100, ) -> List[CorruptObject]: """Returns a page of records in the 'corrupt_object' table for a fixer, ordered by id These records are not already fixed (ie. do not have a corresponding entry in the 'fixed_object' table), and they are selected with an exclusive update lock. Arguments: origin_url: only returns objects that may be found in the given origin """ cur.execute( """ SELECT co.id, co.first_occurrence, co.object, ds.package, ds.class, ds.instance FROM corrupt_object AS co INNER JOIN datastore AS ds ON (ds.id=co.datastore) INNER JOIN object_origin AS oo ON (oo.object_id=co.id) WHERE (co.id >= %(start_id)s OR %(start_id)s IS NULL) AND (co.id <= %(end_id)s OR %(end_id)s IS NULL) AND NOT EXISTS (SELECT 1 FROM fixed_object WHERE fixed_object.id=co.id) AND oo.origin_url=%(origin_url)s ORDER BY co.id LIMIT %(limit)s FOR UPDATE SKIP LOCKED """, dict( start_id=None if start_id is None else str(start_id), end_id=None if end_id is None else str(end_id), origin_url=origin_url, limit=limit, ), ) return self._corrupt_object_list_from_cursor(cur) def object_origin_add( self, cur: psycopg2.extensions.cursor, swhid: CoreSWHID, origins: List[str] ) -> None: psycopg2.extras.execute_values( cur, """ INSERT INTO object_origin (object_id, origin_url) VALUES %s ON CONFLICT DO NOTHING """, [(str(swhid), origin_url) for origin_url in origins], ) def object_origin_get(self, after: str = "", limit: int = 1000) -> List[str]: """Returns origins with non-fixed corrupt objects, ordered by URL. Arguments: after: if given, only returns origins with an URL after this value """ cur = self.cursor() cur.execute( """ SELECT DISTINCT origin_url FROM object_origin WHERE origin_url > %(after)s AND object_id IN ( (SELECT id FROM corrupt_object) EXCEPT (SELECT id FROM fixed_object) ) ORDER BY origin_url LIMIT %(limit)s """, dict(after=after, limit=limit), ) return [origin_url for (origin_url,) in cur] def fixed_object_add( self, cur: psycopg2.extensions.cursor, fixed_objects: List[FixedObject] ) -> None: psycopg2.extras.execute_values( cur, """ INSERT INTO fixed_object (id, object, method) VALUES %s ON CONFLICT DO NOTHING """, [ (str(fixed_object.id), fixed_object.object_, fixed_object.method) for fixed_object in fixed_objects ], ) def fixed_object_iter(self) -> Iterator[FixedObject]: cur = self.cursor() cur.execute("SELECT id, object, method, recovery_date FROM fixed_object") for (id, object_, method, recovery_date) in cur: yield FixedObject( id=CoreSWHID.from_string(id), object_=object_, method=method, recovery_date=recovery_date, ) diff --git a/swh/scrubber/tests/test_cli.py b/swh/scrubber/tests/test_cli.py index b54a5c8..4b85237 100644 --- a/swh/scrubber/tests/test_cli.py +++ b/swh/scrubber/tests/test_cli.py @@ -1,159 +1,159 @@ # Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tempfile from unittest.mock import MagicMock, call from click.testing import CliRunner import yaml from swh.model.swhids import CoreSWHID from swh.scrubber.cli import scrubber_cli_group from swh.scrubber.storage_checker import storage_db def invoke( scrubber_db, args, storage=None, kafka_server=None, kafka_prefix=None, kafka_consumer_group=None, ): runner = CliRunner() config = { - "scrubber_db": {"cls": "local", "db": scrubber_db.conn.dsn}, + "scrubber_db": {"cls": "postgresql", "db": scrubber_db.conn.dsn}, "graph": {"url": "http://graph.example.org:5009/"}, } if storage: with storage_db(storage) as db: config["storage"] = { "cls": "postgresql", "db": db.conn.dsn, "objstorage": {"cls": "memory"}, } assert ( (kafka_server is None) == (kafka_prefix is None) == (kafka_consumer_group is None) ) if kafka_server: config["journal_client"] = dict( cls="kafka", brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, ) with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: yaml.dump(config, config_fd) config_fd.seek(0) args = ["-C" + config_fd.name] + list(args) result = runner.invoke(scrubber_cli_group, args, catch_exceptions=False) return result def test_check_storage(mocker, scrubber_db, swh_storage): storage_checker = MagicMock() StorageChecker = mocker.patch( "swh.scrubber.storage_checker.StorageChecker", return_value=storage_checker ) get_scrubber_db = mocker.patch( "swh.scrubber.get_scrubber_db", return_value=scrubber_db ) result = invoke( scrubber_db, ["check", "storage", "--object-type=snapshot"], storage=swh_storage ) assert result.exit_code == 0, result.output assert result.output == "" - get_scrubber_db.assert_called_once_with(cls="local", db=scrubber_db.conn.dsn) + get_scrubber_db.assert_called_once_with(cls="postgresql", db=scrubber_db.conn.dsn) StorageChecker.assert_called_once_with( db=scrubber_db, storage=StorageChecker.mock_calls[0][2]["storage"], object_type="snapshot", start_object="0" * 40, end_object="f" * 40, ) assert storage_checker.method_calls == [call.run()] def test_check_journal( mocker, scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group ): journal_checker = MagicMock() JournalChecker = mocker.patch( "swh.scrubber.journal_checker.JournalChecker", return_value=journal_checker ) get_scrubber_db = mocker.patch( "swh.scrubber.get_scrubber_db", return_value=scrubber_db ) result = invoke( scrubber_db, ["check", "journal"], kafka_server=kafka_server, kafka_prefix=kafka_prefix, kafka_consumer_group=kafka_consumer_group, ) assert result.exit_code == 0, result.output assert result.output == "" - get_scrubber_db.assert_called_once_with(cls="local", db=scrubber_db.conn.dsn) + get_scrubber_db.assert_called_once_with(cls="postgresql", db=scrubber_db.conn.dsn) JournalChecker.assert_called_once_with( db=scrubber_db, journal_client={ "brokers": kafka_server, "cls": "kafka", "group_id": kafka_consumer_group, "prefix": kafka_prefix, "stop_on_eof": True, }, ) assert journal_checker.method_calls == [call.run()] def test_locate_origins(mocker, scrubber_db, swh_storage): origin_locator = MagicMock() OriginLocator = mocker.patch( "swh.scrubber.origin_locator.OriginLocator", return_value=origin_locator ) get_scrubber_db = mocker.patch( "swh.scrubber.get_scrubber_db", return_value=scrubber_db ) result = invoke(scrubber_db, ["locate"], storage=swh_storage) assert result.exit_code == 0, result.output assert result.output == "" - get_scrubber_db.assert_called_once_with(cls="local", db=scrubber_db.conn.dsn) + get_scrubber_db.assert_called_once_with(cls="postgresql", db=scrubber_db.conn.dsn) OriginLocator.assert_called_once_with( db=scrubber_db, storage=OriginLocator.mock_calls[0][2]["storage"], graph=OriginLocator.mock_calls[0][2]["graph"], start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20), end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20), ) assert origin_locator.method_calls == [call.run()] def test_fix_objects(mocker, scrubber_db): fixer = MagicMock() Fixer = mocker.patch("swh.scrubber.fixer.Fixer", return_value=fixer) get_scrubber_db = mocker.patch( "swh.scrubber.get_scrubber_db", return_value=scrubber_db ) result = invoke(scrubber_db, ["fix"]) assert result.exit_code == 0, result.output assert result.output == "" - get_scrubber_db.assert_called_once_with(cls="local", db=scrubber_db.conn.dsn) + get_scrubber_db.assert_called_once_with(cls="postgresql", db=scrubber_db.conn.dsn) Fixer.assert_called_once_with( db=scrubber_db, start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20), end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20), ) assert fixer.method_calls == [call.run()] diff --git a/swh/scrubber/tests/test_init.py b/swh/scrubber/tests/test_init.py new file mode 100644 index 0000000..1f80823 --- /dev/null +++ b/swh/scrubber/tests/test_init.py @@ -0,0 +1,33 @@ +# Copyright (C) 2020-2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Any + +import pytest + +from swh.scrubber import get_scrubber_db + + +@pytest.mark.parametrize("clz", ["local", "postgresql"]) +def test_get_scrubber_db(mocker, clz): + mock_scrubber = mocker.patch("swh.scrubber.db.ScrubberDb") + + def test_connect(db_str: str, **kwargs) -> Any: + return "connection-result" + + mock_scrubber.connect.side_effect = test_connect + + actual_result = get_scrubber_db(clz, db="service=scrubber-db") + + assert mock_scrubber.connect.called is True + assert actual_result == "connection-result" + + +@pytest.mark.parametrize("clz", ["something", "anything"]) +def test_get_scrubber_db_raise(clz): + assert clz not in ["local", "postgresql"] + + with pytest.raises(ValueError, match="Unknown"): + get_scrubber_db(clz, db="service=scrubber-db")