Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9123942
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
15 KB
Subscribers
None
View Options
diff --git a/swh/scrubber/check_journal.py b/swh/scrubber/check_journal.py
new file mode 100644
index 0000000..30c1379
--- /dev/null
+++ b/swh/scrubber/check_journal.py
@@ -0,0 +1,61 @@
+# Copyright (C) 2021-2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""Reads all objects in a swh-storage instance and recomputes their checksums."""
+
+import logging
+from typing import Any, Dict, List
+
+from swh.journal.client import get_journal_client
+from swh.journal.serializers import kafka_to_value
+from swh.model import model
+
+from .db import Datastore, ScrubberDb
+
+logger = logging.getLogger(__name__)
+
+
+class JournalChecker:
+ _datastore = None
+
+ def __init__(self, db: ScrubberDb, journal_client: Dict[str, Any]):
+ self.db = db
+ self.journal_client_config = journal_client
+ self.journal_client = get_journal_client(
+ **journal_client,
+ # Remove default deserializer; so process_kafka_values() gets the message
+ # verbatim so it can archive it with as few modifications a possible.
+ value_deserializer=lambda obj_type, msg: msg,
+ )
+
+ def datastore_info(self) -> Datastore:
+ if self._datastore is None:
+ config = self.journal_client_config
+ if config["cls"] == "kafka":
+ self._datastore = Datastore(
+ package="journal",
+ cls="kafka",
+ instance=(
+ f"brokers={config['brokers']!r} prefix={config['prefix']!r}"
+ ),
+ )
+ else:
+ raise NotImplementedError(
+ f"StorageChecker(journal_client={self.journal_client_config!r})"
+ f".datastore()"
+ )
+ return self._datastore
+
+ def check_journal(self):
+ self.journal_client.process(self.process_kafka_messages)
+
+ def process_kafka_messages(self, all_messages: Dict[str, List[bytes]]):
+ for (object_type, messages) in all_messages.items():
+ cls = getattr(model, object_type.capitalize())
+ for message in messages:
+ object_ = cls.from_dict(kafka_to_value(message))
+ real_id = object_.compute_hash()
+ if object_.id != real_id:
+ self.db.corrupt_object_add(self.datastore_info(), object_, message)
diff --git a/swh/scrubber/cli.py b/swh/scrubber/cli.py
index fa58ced..f218d26 100644
--- a/swh/scrubber/cli.py
+++ b/swh/scrubber/cli.py
@@ -1,93 +1,144 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
+from typing import Optional
import click
from swh.core.cli import CONTEXT_SETTINGS
from swh.core.cli import swh as swh_cli_group
@swh_cli_group.group(name="scrubber", context_settings=CONTEXT_SETTINGS)
@click.option(
"--config-file",
"-C",
default=None,
type=click.Path(exists=True, dir_okay=False,),
help="Configuration file.",
)
@click.pass_context
-def scrubber_cli_group(ctx, config_file):
+def scrubber_cli_group(ctx, config_file: Optional[str]) -> None:
"""main command group of the datastore scrubber
+
+ Expected config format::
+
+ scrubber_db:
+ cls: local
+ db: "service=..." # libpq DSN
+
+ # for storage checkers only:
+ storage:
+ cls: postgresql # cannot be remote, as it needs direct access to the pg DB
+ db": "service=..." # libpq DSN
+ objstorage:
+ cls: memory
+
+ # for journal checkers only:
+ journal_client:
+ # see https://docs.softwareheritage.org/devel/apidoc/swh.journal.client.html
+ # for the full list of options
+ sasl.mechanism: SCRAM-SHA-512
+ security.protocol: SASL_SSL
+ sasl.username: ...
+ sasl.password: ...
+ group_id: ...
+ privileged: True
+ message.max.bytes: 524288000
+ brokers:
+ - "broker1.journal.softwareheritage.org:9093
+ - "broker2.journal.softwareheritage.org:9093
+ - "broker3.journal.softwareheritage.org:9093
+ - "broker4.journal.softwareheritage.org:9093
+ - "broker5.journal.softwareheritage.org:9093
+ object_types: [directory, revision, snapshot, release]
+ auto_offset_reset: earliest
"""
from swh.core import config
from . import get_scrubber_db
if not config_file:
config_file = os.environ.get("SWH_CONFIG_FILENAME")
if config_file:
if not os.path.exists(config_file):
raise ValueError("%s does not exist" % config_file)
conf = config.read(config_file)
else:
conf = {}
if "scrubber_db" not in conf:
ctx.fail("You must have a scrubber_db configured in your config file.")
ctx.ensure_object(dict)
ctx.obj["config"] = conf
ctx.obj["db"] = get_scrubber_db(**conf["scrubber_db"])
@scrubber_cli_group.group(name="check")
@click.pass_context
def scrubber_check_cli_group(ctx):
"""group of commands which read from data stores and report errors.
"""
pass
@scrubber_check_cli_group.command(name="storage")
@click.option(
"--object-type",
type=click.Choice(
# use a hardcoded list to prevent having to load the
# replay module at cli loading time
[
"snapshot",
"revision",
"release",
"directory",
# TODO:
# "raw_extrinsic_metadata",
# "extid",
]
),
)
@click.option("--start-object", default="0" * 40)
@click.option("--end-object", default="f" * 40)
@click.pass_context
def scrubber_check_storage(ctx, object_type: str, start_object: str, end_object: str):
+ """Reads a postgresql storage, and reports corrupt objects to the scrubber DB."""
conf = ctx.obj["config"]
if "storage" not in conf:
ctx.fail("You must have a storage configured in your config file.")
from swh.storage import get_storage
from .check_storage import StorageChecker
checker = StorageChecker(
db=ctx.obj["db"],
storage=get_storage(**conf["storage"]),
object_type=object_type,
start_object=start_object,
end_object=end_object,
)
checker.check_storage()
+
+
+@scrubber_check_cli_group.command(name="journal")
+@click.pass_context
+def scrubber_check_journal(ctx) -> None:
+ """Reads a complete kafka journal, and reports corrupt objects to
+ the scrubber DB."""
+ conf = ctx.obj["config"]
+ if "journal_client" not in conf:
+ ctx.fail("You must have a journal_client configured in your config file.")
+
+ from .check_journal import JournalChecker
+
+ checker = JournalChecker(db=ctx.obj["db"], journal_client=conf["journal_client"],)
+
+ checker.check_journal()
diff --git a/swh/scrubber/tests/test_cli.py b/swh/scrubber/tests/test_cli.py
index 50889a5..29f10ec 100644
--- a/swh/scrubber/tests/test_cli.py
+++ b/swh/scrubber/tests/test_cli.py
@@ -1,64 +1,113 @@
# Copyright (C) 2020-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import copy
import tempfile
from unittest.mock import MagicMock
from click.testing import CliRunner
import yaml
from swh.scrubber.check_storage import storage_db
from swh.scrubber.cli import scrubber_cli_group
-CLI_CONFIG = {
- "storage": {
- "cls": "postgresql",
- "db": "<replaced at runtime>",
- "objstorage": {"cls": "memory"},
- },
- "scrubber_db": {"cls": "local", "db": "<replaced at runtime>"},
-}
-
-def invoke(swh_storage, scrubber_db, args):
+def invoke(
+ scrubber_db,
+ args,
+ storage=None,
+ kafka_server=None,
+ kafka_prefix=None,
+ kafka_consumer_group=None,
+):
runner = CliRunner()
- config = copy.deepcopy(CLI_CONFIG)
- with storage_db(swh_storage) as db:
- config["storage"]["db"] = db.conn.dsn
+ config = {
+ "scrubber_db": {"cls": "local", "db": scrubber_db.conn.dsn},
+ }
+ if storage:
+ with storage_db(storage) as db:
+ config["storage"] = {
+ "cls": "postgresql",
+ "db": db.conn.dsn,
+ "objstorage": {"cls": "memory"},
+ }
- config["scrubber_db"]["db"] = scrubber_db.conn.dsn
+ assert (
+ (kafka_server is None)
+ == (kafka_prefix is None)
+ == (kafka_consumer_group is None)
+ )
+ if kafka_server:
+ config["journal_client"] = dict(
+ cls="kafka",
+ brokers=kafka_server,
+ group_id=kafka_consumer_group,
+ prefix=kafka_prefix,
+ stop_on_eof=True,
+ )
with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd:
yaml.dump(config, config_fd)
config_fd.seek(0)
args = ["-C" + config_fd.name] + list(args)
result = runner.invoke(scrubber_cli_group, args, catch_exceptions=False)
return result
-def test_check_storage(swh_storage, mocker, scrubber_db):
+def test_check_storage(mocker, scrubber_db, swh_storage):
storage_checker = MagicMock()
StorageChecker = mocker.patch(
"swh.scrubber.check_storage.StorageChecker", return_value=storage_checker
)
get_scrubber_db = mocker.patch(
"swh.scrubber.get_scrubber_db", return_value=scrubber_db
)
result = invoke(
- swh_storage, scrubber_db, ["check", "storage", "--object-type=snapshot"]
+ scrubber_db, ["check", "storage", "--object-type=snapshot"], storage=swh_storage
)
assert result.exit_code == 0, result.output
assert result.output == ""
get_scrubber_db.assert_called_once_with(cls="local", db=scrubber_db.conn.dsn)
StorageChecker.assert_called_once_with(
db=scrubber_db,
storage=StorageChecker.mock_calls[0][2]["storage"],
object_type="snapshot",
start_object="0" * 40,
end_object="f" * 40,
)
+
+
+def test_check_journal(
+ mocker, scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group
+):
+ journal_checker = MagicMock()
+ JournalChecker = mocker.patch(
+ "swh.scrubber.check_journal.JournalChecker", return_value=journal_checker
+ )
+ get_scrubber_db = mocker.patch(
+ "swh.scrubber.get_scrubber_db", return_value=scrubber_db
+ )
+ result = invoke(
+ scrubber_db,
+ ["check", "journal"],
+ kafka_server=kafka_server,
+ kafka_prefix=kafka_prefix,
+ kafka_consumer_group=kafka_consumer_group,
+ )
+ assert result.exit_code == 0, result.output
+ assert result.output == ""
+
+ get_scrubber_db.assert_called_once_with(cls="local", db=scrubber_db.conn.dsn)
+ JournalChecker.assert_called_once_with(
+ db=scrubber_db,
+ journal_client={
+ "brokers": kafka_server,
+ "cls": "kafka",
+ "group_id": kafka_consumer_group,
+ "prefix": kafka_prefix,
+ "stop_on_eof": True,
+ },
+ )
diff --git a/swh/scrubber/tests/test_journal_kafka.py b/swh/scrubber/tests/test_journal_kafka.py
new file mode 100644
index 0000000..5f455f2
--- /dev/null
+++ b/swh/scrubber/tests/test_journal_kafka.py
@@ -0,0 +1,120 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import datetime
+
+import attr
+import pytest
+
+from swh.journal.serializers import kafka_to_value
+from swh.journal.writer import get_journal_writer
+from swh.model import swhids
+from swh.model.tests import swh_model_data
+from swh.scrubber.check_journal import JournalChecker
+
+
+def journal_client_config(kafka_server, kafka_prefix, kafka_consumer_group):
+ return dict(
+ cls="kafka",
+ brokers=kafka_server,
+ group_id=kafka_consumer_group,
+ prefix=kafka_prefix,
+ stop_on_eof=True,
+ )
+
+
+def journal_writer(kafka_server, kafka_prefix):
+ return get_journal_writer(
+ cls="kafka",
+ brokers=[kafka_server],
+ client_id="kafka_writer",
+ prefix=kafka_prefix,
+ anonymize=False,
+ )
+
+
+def test_no_corruption(scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group):
+ writer = journal_writer(kafka_server, kafka_prefix)
+ writer.write_additions("directory", swh_model_data.DIRECTORIES)
+ writer.write_additions("revision", swh_model_data.REVISIONS)
+ writer.write_additions("release", swh_model_data.RELEASES)
+ writer.write_additions("snapshot", swh_model_data.SNAPSHOTS)
+
+ JournalChecker(
+ db=scrubber_db,
+ journal_client=journal_client_config(
+ kafka_server, kafka_prefix, kafka_consumer_group
+ ),
+ ).check_journal()
+
+ assert list(scrubber_db.corrupt_object_iter()) == []
+
+
+@pytest.mark.parametrize("corrupt_idx", range(len(swh_model_data.SNAPSHOTS)))
+def test_corrupt_snapshot(
+ scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group, corrupt_idx
+):
+ snapshots = list(swh_model_data.SNAPSHOTS)
+ snapshots[corrupt_idx] = attr.evolve(snapshots[corrupt_idx], id=b"\x00" * 20)
+
+ writer = journal_writer(kafka_server, kafka_prefix)
+ writer.write_additions("snapshot", snapshots)
+
+ before_date = datetime.datetime.now(tz=datetime.timezone.utc)
+ JournalChecker(
+ db=scrubber_db,
+ journal_client=journal_client_config(
+ kafka_server, kafka_prefix, kafka_consumer_group
+ ),
+ ).check_journal()
+ after_date = datetime.datetime.now(tz=datetime.timezone.utc)
+
+ corrupt_objects = list(scrubber_db.corrupt_object_iter())
+ assert len(corrupt_objects) == 1
+ assert corrupt_objects[0].id == swhids.CoreSWHID.from_string(
+ "swh:1:snp:0000000000000000000000000000000000000000"
+ )
+ assert corrupt_objects[0].datastore.package == "journal"
+ assert corrupt_objects[0].datastore.cls == "kafka"
+ assert (
+ corrupt_objects[0].datastore.instance
+ == f"brokers='{kafka_server}' prefix='{kafka_prefix}'"
+ )
+ assert (
+ before_date - datetime.timedelta(seconds=5)
+ <= corrupt_objects[0].first_occurrence
+ <= after_date + datetime.timedelta(seconds=5)
+ )
+ assert (
+ kafka_to_value(corrupt_objects[0].object_) == snapshots[corrupt_idx].to_dict()
+ )
+
+
+def test_corrupt_snapshots(
+ scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group
+):
+ snapshots = list(swh_model_data.SNAPSHOTS)
+ for i in (0, 1):
+ snapshots[i] = attr.evolve(snapshots[i], id=bytes([i]) * 20)
+
+ writer = journal_writer(kafka_server, kafka_prefix)
+ writer.write_additions("snapshot", snapshots)
+
+ JournalChecker(
+ db=scrubber_db,
+ journal_client=journal_client_config(
+ kafka_server, kafka_prefix, kafka_consumer_group
+ ),
+ ).check_journal()
+
+ corrupt_objects = list(scrubber_db.corrupt_object_iter())
+ assert len(corrupt_objects) == 2
+ assert {co.id for co in corrupt_objects} == {
+ swhids.CoreSWHID.from_string(swhid)
+ for swhid in [
+ "swh:1:snp:0000000000000000000000000000000000000000",
+ "swh:1:snp:0101010101010101010101010101010101010101",
+ ]
+ }
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Jun 21, 6:24 PM (2 w, 3 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3276254
Attached To
rDSCRUB Datastore Scrubber
Event Timeline
Log In to Comment