Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/scrubber/check_journal.py b/swh/scrubber/check_journal.py
new file mode 100644
index 0000000..30c1379
--- /dev/null
+++ b/swh/scrubber/check_journal.py
@@ -0,0 +1,61 @@
+# Copyright (C) 2021-2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""Reads all objects in a swh-storage instance and recomputes their checksums."""
+
+import logging
+from typing import Any, Dict, List
+
+from swh.journal.client import get_journal_client
+from swh.journal.serializers import kafka_to_value
+from swh.model import model
+
+from .db import Datastore, ScrubberDb
+
+logger = logging.getLogger(__name__)
+
+
+class JournalChecker:
+ _datastore = None
+
+ def __init__(self, db: ScrubberDb, journal_client: Dict[str, Any]):
+ self.db = db
+ self.journal_client_config = journal_client
+ self.journal_client = get_journal_client(
+ **journal_client,
+ # Remove default deserializer; so process_kafka_values() gets the message
+ # verbatim so it can archive it with as few modifications a possible.
+ value_deserializer=lambda obj_type, msg: msg,
+ )
+
+ def datastore_info(self) -> Datastore:
+ if self._datastore is None:
+ config = self.journal_client_config
+ if config["cls"] == "kafka":
+ self._datastore = Datastore(
+ package="journal",
+ cls="kafka",
+ instance=(
+ f"brokers={config['brokers']!r} prefix={config['prefix']!r}"
+ ),
+ )
+ else:
+ raise NotImplementedError(
+ f"StorageChecker(journal_client={self.journal_client_config!r})"
+ f".datastore()"
+ )
+ return self._datastore
+
+ def check_journal(self):
+ self.journal_client.process(self.process_kafka_messages)
+
+ def process_kafka_messages(self, all_messages: Dict[str, List[bytes]]):
+ for (object_type, messages) in all_messages.items():
+ cls = getattr(model, object_type.capitalize())
+ for message in messages:
+ object_ = cls.from_dict(kafka_to_value(message))
+ real_id = object_.compute_hash()
+ if object_.id != real_id:
+ self.db.corrupt_object_add(self.datastore_info(), object_, message)
diff --git a/swh/scrubber/cli.py b/swh/scrubber/cli.py
index fa58ced..f218d26 100644
--- a/swh/scrubber/cli.py
+++ b/swh/scrubber/cli.py
@@ -1,93 +1,144 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
+from typing import Optional
import click
from swh.core.cli import CONTEXT_SETTINGS
from swh.core.cli import swh as swh_cli_group
@swh_cli_group.group(name="scrubber", context_settings=CONTEXT_SETTINGS)
@click.option(
"--config-file",
"-C",
default=None,
type=click.Path(exists=True, dir_okay=False,),
help="Configuration file.",
)
@click.pass_context
-def scrubber_cli_group(ctx, config_file):
+def scrubber_cli_group(ctx, config_file: Optional[str]) -> None:
"""main command group of the datastore scrubber
+
+ Expected config format::
+
+ scrubber_db:
+ cls: local
+ db: "service=..." # libpq DSN
+
+ # for storage checkers only:
+ storage:
+ cls: postgresql # cannot be remote, as it needs direct access to the pg DB
+ db": "service=..." # libpq DSN
+ objstorage:
+ cls: memory
+
+ # for journal checkers only:
+ journal_client:
+ # see https://docs.softwareheritage.org/devel/apidoc/swh.journal.client.html
+ # for the full list of options
+ sasl.mechanism: SCRAM-SHA-512
+ security.protocol: SASL_SSL
+ sasl.username: ...
+ sasl.password: ...
+ group_id: ...
+ privileged: True
+ message.max.bytes: 524288000
+ brokers:
+ - "broker1.journal.softwareheritage.org:9093
+ - "broker2.journal.softwareheritage.org:9093
+ - "broker3.journal.softwareheritage.org:9093
+ - "broker4.journal.softwareheritage.org:9093
+ - "broker5.journal.softwareheritage.org:9093
+ object_types: [directory, revision, snapshot, release]
+ auto_offset_reset: earliest
"""
from swh.core import config
from . import get_scrubber_db
if not config_file:
config_file = os.environ.get("SWH_CONFIG_FILENAME")
if config_file:
if not os.path.exists(config_file):
raise ValueError("%s does not exist" % config_file)
conf = config.read(config_file)
else:
conf = {}
if "scrubber_db" not in conf:
ctx.fail("You must have a scrubber_db configured in your config file.")
ctx.ensure_object(dict)
ctx.obj["config"] = conf
ctx.obj["db"] = get_scrubber_db(**conf["scrubber_db"])
@scrubber_cli_group.group(name="check")
@click.pass_context
def scrubber_check_cli_group(ctx):
"""group of commands which read from data stores and report errors.
"""
pass
@scrubber_check_cli_group.command(name="storage")
@click.option(
"--object-type",
type=click.Choice(
# use a hardcoded list to prevent having to load the
# replay module at cli loading time
[
"snapshot",
"revision",
"release",
"directory",
# TODO:
# "raw_extrinsic_metadata",
# "extid",
]
),
)
@click.option("--start-object", default="0" * 40)
@click.option("--end-object", default="f" * 40)
@click.pass_context
def scrubber_check_storage(ctx, object_type: str, start_object: str, end_object: str):
+ """Reads a postgresql storage, and reports corrupt objects to the scrubber DB."""
conf = ctx.obj["config"]
if "storage" not in conf:
ctx.fail("You must have a storage configured in your config file.")
from swh.storage import get_storage
from .check_storage import StorageChecker
checker = StorageChecker(
db=ctx.obj["db"],
storage=get_storage(**conf["storage"]),
object_type=object_type,
start_object=start_object,
end_object=end_object,
)
checker.check_storage()
+
+
+@scrubber_check_cli_group.command(name="journal")
+@click.pass_context
+def scrubber_check_journal(ctx) -> None:
+ """Reads a complete kafka journal, and reports corrupt objects to
+ the scrubber DB."""
+ conf = ctx.obj["config"]
+ if "journal_client" not in conf:
+ ctx.fail("You must have a journal_client configured in your config file.")
+
+ from .check_journal import JournalChecker
+
+ checker = JournalChecker(db=ctx.obj["db"], journal_client=conf["journal_client"],)
+
+ checker.check_journal()
diff --git a/swh/scrubber/tests/test_cli.py b/swh/scrubber/tests/test_cli.py
index 50889a5..29f10ec 100644
--- a/swh/scrubber/tests/test_cli.py
+++ b/swh/scrubber/tests/test_cli.py
@@ -1,64 +1,113 @@
# Copyright (C) 2020-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import copy
import tempfile
from unittest.mock import MagicMock
from click.testing import CliRunner
import yaml
from swh.scrubber.check_storage import storage_db
from swh.scrubber.cli import scrubber_cli_group
-CLI_CONFIG = {
- "storage": {
- "cls": "postgresql",
- "db": "<replaced at runtime>",
- "objstorage": {"cls": "memory"},
- },
- "scrubber_db": {"cls": "local", "db": "<replaced at runtime>"},
-}
-
-def invoke(swh_storage, scrubber_db, args):
+def invoke(
+ scrubber_db,
+ args,
+ storage=None,
+ kafka_server=None,
+ kafka_prefix=None,
+ kafka_consumer_group=None,
+):
runner = CliRunner()
- config = copy.deepcopy(CLI_CONFIG)
- with storage_db(swh_storage) as db:
- config["storage"]["db"] = db.conn.dsn
+ config = {
+ "scrubber_db": {"cls": "local", "db": scrubber_db.conn.dsn},
+ }
+ if storage:
+ with storage_db(storage) as db:
+ config["storage"] = {
+ "cls": "postgresql",
+ "db": db.conn.dsn,
+ "objstorage": {"cls": "memory"},
+ }
- config["scrubber_db"]["db"] = scrubber_db.conn.dsn
+ assert (
+ (kafka_server is None)
+ == (kafka_prefix is None)
+ == (kafka_consumer_group is None)
+ )
+ if kafka_server:
+ config["journal_client"] = dict(
+ cls="kafka",
+ brokers=kafka_server,
+ group_id=kafka_consumer_group,
+ prefix=kafka_prefix,
+ stop_on_eof=True,
+ )
with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd:
yaml.dump(config, config_fd)
config_fd.seek(0)
args = ["-C" + config_fd.name] + list(args)
result = runner.invoke(scrubber_cli_group, args, catch_exceptions=False)
return result
-def test_check_storage(swh_storage, mocker, scrubber_db):
+def test_check_storage(mocker, scrubber_db, swh_storage):
storage_checker = MagicMock()
StorageChecker = mocker.patch(
"swh.scrubber.check_storage.StorageChecker", return_value=storage_checker
)
get_scrubber_db = mocker.patch(
"swh.scrubber.get_scrubber_db", return_value=scrubber_db
)
result = invoke(
- swh_storage, scrubber_db, ["check", "storage", "--object-type=snapshot"]
+ scrubber_db, ["check", "storage", "--object-type=snapshot"], storage=swh_storage
)
assert result.exit_code == 0, result.output
assert result.output == ""
get_scrubber_db.assert_called_once_with(cls="local", db=scrubber_db.conn.dsn)
StorageChecker.assert_called_once_with(
db=scrubber_db,
storage=StorageChecker.mock_calls[0][2]["storage"],
object_type="snapshot",
start_object="0" * 40,
end_object="f" * 40,
)
+
+
+def test_check_journal(
+ mocker, scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group
+):
+ journal_checker = MagicMock()
+ JournalChecker = mocker.patch(
+ "swh.scrubber.check_journal.JournalChecker", return_value=journal_checker
+ )
+ get_scrubber_db = mocker.patch(
+ "swh.scrubber.get_scrubber_db", return_value=scrubber_db
+ )
+ result = invoke(
+ scrubber_db,
+ ["check", "journal"],
+ kafka_server=kafka_server,
+ kafka_prefix=kafka_prefix,
+ kafka_consumer_group=kafka_consumer_group,
+ )
+ assert result.exit_code == 0, result.output
+ assert result.output == ""
+
+ get_scrubber_db.assert_called_once_with(cls="local", db=scrubber_db.conn.dsn)
+ JournalChecker.assert_called_once_with(
+ db=scrubber_db,
+ journal_client={
+ "brokers": kafka_server,
+ "cls": "kafka",
+ "group_id": kafka_consumer_group,
+ "prefix": kafka_prefix,
+ "stop_on_eof": True,
+ },
+ )
diff --git a/swh/scrubber/tests/test_journal_kafka.py b/swh/scrubber/tests/test_journal_kafka.py
new file mode 100644
index 0000000..5f455f2
--- /dev/null
+++ b/swh/scrubber/tests/test_journal_kafka.py
@@ -0,0 +1,120 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import datetime
+
+import attr
+import pytest
+
+from swh.journal.serializers import kafka_to_value
+from swh.journal.writer import get_journal_writer
+from swh.model import swhids
+from swh.model.tests import swh_model_data
+from swh.scrubber.check_journal import JournalChecker
+
+
+def journal_client_config(kafka_server, kafka_prefix, kafka_consumer_group):
+ return dict(
+ cls="kafka",
+ brokers=kafka_server,
+ group_id=kafka_consumer_group,
+ prefix=kafka_prefix,
+ stop_on_eof=True,
+ )
+
+
+def journal_writer(kafka_server, kafka_prefix):
+ return get_journal_writer(
+ cls="kafka",
+ brokers=[kafka_server],
+ client_id="kafka_writer",
+ prefix=kafka_prefix,
+ anonymize=False,
+ )
+
+
+def test_no_corruption(scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group):
+ writer = journal_writer(kafka_server, kafka_prefix)
+ writer.write_additions("directory", swh_model_data.DIRECTORIES)
+ writer.write_additions("revision", swh_model_data.REVISIONS)
+ writer.write_additions("release", swh_model_data.RELEASES)
+ writer.write_additions("snapshot", swh_model_data.SNAPSHOTS)
+
+ JournalChecker(
+ db=scrubber_db,
+ journal_client=journal_client_config(
+ kafka_server, kafka_prefix, kafka_consumer_group
+ ),
+ ).check_journal()
+
+ assert list(scrubber_db.corrupt_object_iter()) == []
+
+
+@pytest.mark.parametrize("corrupt_idx", range(len(swh_model_data.SNAPSHOTS)))
+def test_corrupt_snapshot(
+ scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group, corrupt_idx
+):
+ snapshots = list(swh_model_data.SNAPSHOTS)
+ snapshots[corrupt_idx] = attr.evolve(snapshots[corrupt_idx], id=b"\x00" * 20)
+
+ writer = journal_writer(kafka_server, kafka_prefix)
+ writer.write_additions("snapshot", snapshots)
+
+ before_date = datetime.datetime.now(tz=datetime.timezone.utc)
+ JournalChecker(
+ db=scrubber_db,
+ journal_client=journal_client_config(
+ kafka_server, kafka_prefix, kafka_consumer_group
+ ),
+ ).check_journal()
+ after_date = datetime.datetime.now(tz=datetime.timezone.utc)
+
+ corrupt_objects = list(scrubber_db.corrupt_object_iter())
+ assert len(corrupt_objects) == 1
+ assert corrupt_objects[0].id == swhids.CoreSWHID.from_string(
+ "swh:1:snp:0000000000000000000000000000000000000000"
+ )
+ assert corrupt_objects[0].datastore.package == "journal"
+ assert corrupt_objects[0].datastore.cls == "kafka"
+ assert (
+ corrupt_objects[0].datastore.instance
+ == f"brokers='{kafka_server}' prefix='{kafka_prefix}'"
+ )
+ assert (
+ before_date - datetime.timedelta(seconds=5)
+ <= corrupt_objects[0].first_occurrence
+ <= after_date + datetime.timedelta(seconds=5)
+ )
+ assert (
+ kafka_to_value(corrupt_objects[0].object_) == snapshots[corrupt_idx].to_dict()
+ )
+
+
+def test_corrupt_snapshots(
+ scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group
+):
+ snapshots = list(swh_model_data.SNAPSHOTS)
+ for i in (0, 1):
+ snapshots[i] = attr.evolve(snapshots[i], id=bytes([i]) * 20)
+
+ writer = journal_writer(kafka_server, kafka_prefix)
+ writer.write_additions("snapshot", snapshots)
+
+ JournalChecker(
+ db=scrubber_db,
+ journal_client=journal_client_config(
+ kafka_server, kafka_prefix, kafka_consumer_group
+ ),
+ ).check_journal()
+
+ corrupt_objects = list(scrubber_db.corrupt_object_iter())
+ assert len(corrupt_objects) == 2
+ assert {co.id for co in corrupt_objects} == {
+ swhids.CoreSWHID.from_string(swhid)
+ for swhid in [
+ "swh:1:snp:0000000000000000000000000000000000000000",
+ "swh:1:snp:0101010101010101010101010101010101010101",
+ ]
+ }

File Metadata

Mime Type
text/x-diff
Expires
Sat, Jun 21, 6:24 PM (2 w, 3 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3276254

Event Timeline