diff --git a/swh/scrubber/check_journal.py b/swh/scrubber/check_journal.py new file mode 100644 --- /dev/null +++ b/swh/scrubber/check_journal.py @@ -0,0 +1,61 @@ +# Copyright (C) 2021-2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Reads all objects in a swh-storage instance and recomputes their checksums.""" + +import logging +from typing import Any, Dict, List + +from swh.journal.client import get_journal_client +from swh.journal.serializers import kafka_to_value +from swh.model import model + +from .db import Datastore, ScrubberDb + +logger = logging.getLogger(__name__) + + +class JournalChecker: + _datastore = None + + def __init__(self, db: ScrubberDb, journal_client: Dict[str, Any]): + self.db = db + self.journal_client_config = journal_client + self.journal_client = get_journal_client( + **journal_client, + # Remove default deserializer; so process_kafka_values() gets the message + # verbatim so it can archive it with as few modifications a possible. + value_deserializer=lambda obj_type, msg: msg, + ) + + def datastore_info(self) -> Datastore: + if self._datastore is None: + config = self.journal_client_config + if config["cls"] == "kafka": + self._datastore = Datastore( + package="journal", + cls="kafka", + instance=( + f"brokers={config['brokers']!r} prefix={config['prefix']!r}" + ), + ) + else: + raise NotImplementedError( + f"StorageChecker(journal_client={self.journal_client_config!r})" + f".datastore()" + ) + return self._datastore + + def check_journal(self): + self.journal_client.process(self.process_kafka_messages) + + def process_kafka_messages(self, all_messages: Dict[str, List[bytes]]): + for (object_type, messages) in all_messages.items(): + cls = getattr(model, object_type.capitalize()) + for message in messages: + object_ = cls.from_dict(kafka_to_value(message)) + real_id = object_.compute_hash() + if object_.id != real_id: + self.db.corrupt_object_add(self.datastore_info(), object_, message) diff --git a/swh/scrubber/cli.py b/swh/scrubber/cli.py --- a/swh/scrubber/cli.py +++ b/swh/scrubber/cli.py @@ -4,6 +4,7 @@ # See top-level LICENSE file for more information import os +from typing import Optional import click @@ -20,8 +21,41 @@ help="Configuration file.", ) @click.pass_context -def scrubber_cli_group(ctx, config_file): +def scrubber_cli_group(ctx, config_file: Optional[str]) -> None: """main command group of the datastore scrubber + + Expected config format:: + + scrubber_db: + cls: local + db: "service=..." # libpq DSN + + # for storage checkers only: + storage: + cls: postgresql # cannot be remote, as it needs direct access to the pg DB + db": "service=..." # libpq DSN + objstorage: + cls: memory + + # for journal checkers only: + journal_client: + # see https://docs.softwareheritage.org/devel/apidoc/swh.journal.client.html + # for the full list of options + sasl.mechanism: SCRAM-SHA-512 + security.protocol: SASL_SSL + sasl.username: ... + sasl.password: ... + group_id: ... + privileged: True + message.max.bytes: 524288000 + brokers: + - "broker1.journal.softwareheritage.org:9093 + - "broker2.journal.softwareheritage.org:9093 + - "broker3.journal.softwareheritage.org:9093 + - "broker4.journal.softwareheritage.org:9093 + - "broker5.journal.softwareheritage.org:9093 + object_types: [directory, revision, snapshot, release] + auto_offset_reset: earliest """ from swh.core import config @@ -74,6 +108,7 @@ @click.option("--end-object", default="f" * 40) @click.pass_context def scrubber_check_storage(ctx, object_type: str, start_object: str, end_object: str): + """Reads a postgresql storage, and reports corrupt objects to the scrubber DB.""" conf = ctx.obj["config"] if "storage" not in conf: ctx.fail("You must have a storage configured in your config file.") @@ -91,3 +126,19 @@ ) checker.check_storage() + + +@scrubber_check_cli_group.command(name="journal") +@click.pass_context +def scrubber_check_journal(ctx) -> None: + """Reads a complete kafka journal, and reports corrupt objects to + the scrubber DB.""" + conf = ctx.obj["config"] + if "journal_client" not in conf: + ctx.fail("You must have a journal_client configured in your config file.") + + from .check_journal import JournalChecker + + checker = JournalChecker(db=ctx.obj["db"], journal_client=conf["journal_client"],) + + checker.check_journal() diff --git a/swh/scrubber/tests/test_cli.py b/swh/scrubber/tests/test_cli.py --- a/swh/scrubber/tests/test_cli.py +++ b/swh/scrubber/tests/test_cli.py @@ -3,7 +3,6 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import copy import tempfile from unittest.mock import MagicMock @@ -13,24 +12,41 @@ from swh.scrubber.check_storage import storage_db from swh.scrubber.cli import scrubber_cli_group -CLI_CONFIG = { - "storage": { - "cls": "postgresql", - "db": "", - "objstorage": {"cls": "memory"}, - }, - "scrubber_db": {"cls": "local", "db": ""}, -} - -def invoke(swh_storage, scrubber_db, args): +def invoke( + scrubber_db, + args, + storage=None, + kafka_server=None, + kafka_prefix=None, + kafka_consumer_group=None, +): runner = CliRunner() - config = copy.deepcopy(CLI_CONFIG) - with storage_db(swh_storage) as db: - config["storage"]["db"] = db.conn.dsn + config = { + "scrubber_db": {"cls": "local", "db": scrubber_db.conn.dsn}, + } + if storage: + with storage_db(storage) as db: + config["storage"] = { + "cls": "postgresql", + "db": db.conn.dsn, + "objstorage": {"cls": "memory"}, + } - config["scrubber_db"]["db"] = scrubber_db.conn.dsn + assert ( + (kafka_server is None) + == (kafka_prefix is None) + == (kafka_consumer_group is None) + ) + if kafka_server: + config["journal_client"] = dict( + cls="kafka", + brokers=kafka_server, + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_on_eof=True, + ) with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: yaml.dump(config, config_fd) @@ -40,7 +56,7 @@ return result -def test_check_storage(swh_storage, mocker, scrubber_db): +def test_check_storage(mocker, scrubber_db, swh_storage): storage_checker = MagicMock() StorageChecker = mocker.patch( "swh.scrubber.check_storage.StorageChecker", return_value=storage_checker @@ -49,7 +65,7 @@ "swh.scrubber.get_scrubber_db", return_value=scrubber_db ) result = invoke( - swh_storage, scrubber_db, ["check", "storage", "--object-type=snapshot"] + scrubber_db, ["check", "storage", "--object-type=snapshot"], storage=swh_storage ) assert result.exit_code == 0, result.output assert result.output == "" @@ -62,3 +78,36 @@ start_object="0" * 40, end_object="f" * 40, ) + + +def test_check_journal( + mocker, scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group +): + journal_checker = MagicMock() + JournalChecker = mocker.patch( + "swh.scrubber.check_journal.JournalChecker", return_value=journal_checker + ) + get_scrubber_db = mocker.patch( + "swh.scrubber.get_scrubber_db", return_value=scrubber_db + ) + result = invoke( + scrubber_db, + ["check", "journal"], + kafka_server=kafka_server, + kafka_prefix=kafka_prefix, + kafka_consumer_group=kafka_consumer_group, + ) + assert result.exit_code == 0, result.output + assert result.output == "" + + get_scrubber_db.assert_called_once_with(cls="local", db=scrubber_db.conn.dsn) + JournalChecker.assert_called_once_with( + db=scrubber_db, + journal_client={ + "brokers": kafka_server, + "cls": "kafka", + "group_id": kafka_consumer_group, + "prefix": kafka_prefix, + "stop_on_eof": True, + }, + ) diff --git a/swh/scrubber/tests/test_journal_kafka.py b/swh/scrubber/tests/test_journal_kafka.py new file mode 100644 --- /dev/null +++ b/swh/scrubber/tests/test_journal_kafka.py @@ -0,0 +1,120 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime + +import attr +import pytest + +from swh.journal.serializers import kafka_to_value +from swh.journal.writer import get_journal_writer +from swh.model import swhids +from swh.model.tests import swh_model_data +from swh.scrubber.check_journal import JournalChecker + + +def journal_client_config(kafka_server, kafka_prefix, kafka_consumer_group): + return dict( + cls="kafka", + brokers=kafka_server, + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_on_eof=True, + ) + + +def journal_writer(kafka_server, kafka_prefix): + return get_journal_writer( + cls="kafka", + brokers=[kafka_server], + client_id="kafka_writer", + prefix=kafka_prefix, + anonymize=False, + ) + + +def test_no_corruption(scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group): + writer = journal_writer(kafka_server, kafka_prefix) + writer.write_additions("directory", swh_model_data.DIRECTORIES) + writer.write_additions("revision", swh_model_data.REVISIONS) + writer.write_additions("release", swh_model_data.RELEASES) + writer.write_additions("snapshot", swh_model_data.SNAPSHOTS) + + JournalChecker( + db=scrubber_db, + journal_client=journal_client_config( + kafka_server, kafka_prefix, kafka_consumer_group + ), + ).check_journal() + + assert list(scrubber_db.corrupt_object_iter()) == [] + + +@pytest.mark.parametrize("corrupt_idx", range(len(swh_model_data.SNAPSHOTS))) +def test_corrupt_snapshot( + scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group, corrupt_idx +): + snapshots = list(swh_model_data.SNAPSHOTS) + snapshots[corrupt_idx] = attr.evolve(snapshots[corrupt_idx], id=b"\x00" * 20) + + writer = journal_writer(kafka_server, kafka_prefix) + writer.write_additions("snapshot", snapshots) + + before_date = datetime.datetime.now(tz=datetime.timezone.utc) + JournalChecker( + db=scrubber_db, + journal_client=journal_client_config( + kafka_server, kafka_prefix, kafka_consumer_group + ), + ).check_journal() + after_date = datetime.datetime.now(tz=datetime.timezone.utc) + + corrupt_objects = list(scrubber_db.corrupt_object_iter()) + assert len(corrupt_objects) == 1 + assert corrupt_objects[0].id == swhids.CoreSWHID.from_string( + "swh:1:snp:0000000000000000000000000000000000000000" + ) + assert corrupt_objects[0].datastore.package == "journal" + assert corrupt_objects[0].datastore.cls == "kafka" + assert ( + corrupt_objects[0].datastore.instance + == f"brokers='{kafka_server}' prefix='{kafka_prefix}'" + ) + assert ( + before_date - datetime.timedelta(seconds=5) + <= corrupt_objects[0].first_occurrence + <= after_date + datetime.timedelta(seconds=5) + ) + assert ( + kafka_to_value(corrupt_objects[0].object_) == snapshots[corrupt_idx].to_dict() + ) + + +def test_corrupt_snapshots( + scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group +): + snapshots = list(swh_model_data.SNAPSHOTS) + for i in (0, 1): + snapshots[i] = attr.evolve(snapshots[i], id=bytes([i]) * 20) + + writer = journal_writer(kafka_server, kafka_prefix) + writer.write_additions("snapshot", snapshots) + + JournalChecker( + db=scrubber_db, + journal_client=journal_client_config( + kafka_server, kafka_prefix, kafka_consumer_group + ), + ).check_journal() + + corrupt_objects = list(scrubber_db.corrupt_object_iter()) + assert len(corrupt_objects) == 2 + assert {co.id for co in corrupt_objects} == { + swhids.CoreSWHID.from_string(swhid) + for swhid in [ + "swh:1:snp:0000000000000000000000000000000000000000", + "swh:1:snp:0101010101010101010101010101010101010101", + ] + }