diff --git a/swh/scrubber/cli.py b/swh/scrubber/cli.py index ca7a428..b59ee9e 100644 --- a/swh/scrubber/cli.py +++ b/swh/scrubber/cli.py @@ -1,197 +1,202 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os from typing import Optional import click from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group @swh_cli_group.group(name="scrubber", context_settings=CONTEXT_SETTINGS) @click.option( "--config-file", "-C", default=None, - type=click.Path(exists=True, dir_okay=False,), + type=click.Path( + exists=True, + dir_okay=False, + ), help="Configuration file.", ) @click.pass_context def scrubber_cli_group(ctx, config_file: Optional[str]) -> None: """main command group of the datastore scrubber Expected config format:: scrubber_db: cls: local db: "service=..." # libpq DSN # for storage checkers + origin locator only: storage: cls: postgresql # cannot be remote for checkers, as they need direct # access to the pg DB db": "service=..." # libpq DSN objstorage: cls: memory # for journal checkers only: journal_client: # see https://docs.softwareheritage.org/devel/apidoc/swh.journal.client.html # for the full list of options sasl.mechanism: SCRAM-SHA-512 security.protocol: SASL_SSL sasl.username: ... sasl.password: ... group_id: ... privileged: True message.max.bytes: 524288000 brokers: - "broker1.journal.softwareheritage.org:9093 - "broker2.journal.softwareheritage.org:9093 - "broker3.journal.softwareheritage.org:9093 - "broker4.journal.softwareheritage.org:9093 - "broker5.journal.softwareheritage.org:9093 object_types: [directory, revision, snapshot, release] auto_offset_reset: earliest """ from swh.core import config from . import get_scrubber_db if not config_file: config_file = os.environ.get("SWH_CONFIG_FILENAME") if config_file: if not os.path.exists(config_file): raise ValueError("%s does not exist" % config_file) conf = config.read(config_file) else: conf = {} if "scrubber_db" not in conf: ctx.fail("You must have a scrubber_db configured in your config file.") ctx.ensure_object(dict) ctx.obj["config"] = conf ctx.obj["db"] = get_scrubber_db(**conf["scrubber_db"]) @scrubber_cli_group.group(name="check") @click.pass_context def scrubber_check_cli_group(ctx): - """group of commands which read from data stores and report errors. - """ + """group of commands which read from data stores and report errors.""" pass @scrubber_check_cli_group.command(name="storage") @click.option( "--object-type", type=click.Choice( # use a hardcoded list to prevent having to load the # replay module at cli loading time [ "snapshot", "revision", "release", "directory", # TODO: # "raw_extrinsic_metadata", # "extid", ] ), ) @click.option("--start-object", default="00" * 20) @click.option("--end-object", default="ff" * 20) @click.pass_context def scrubber_check_storage(ctx, object_type: str, start_object: str, end_object: str): """Reads a postgresql storage, and reports corrupt objects to the scrubber DB.""" conf = ctx.obj["config"] if "storage" not in conf: ctx.fail("You must have a storage configured in your config file.") from swh.storage import get_storage from .storage_checker import StorageChecker checker = StorageChecker( db=ctx.obj["db"], storage=get_storage(**conf["storage"]), object_type=object_type, start_object=start_object, end_object=end_object, ) checker.run() @scrubber_check_cli_group.command(name="journal") @click.pass_context def scrubber_check_journal(ctx) -> None: """Reads a complete kafka journal, and reports corrupt objects to the scrubber DB.""" conf = ctx.obj["config"] if "journal_client" not in conf: ctx.fail("You must have a journal_client configured in your config file.") from .journal_checker import JournalChecker - checker = JournalChecker(db=ctx.obj["db"], journal_client=conf["journal_client"],) + checker = JournalChecker( + db=ctx.obj["db"], + journal_client=conf["journal_client"], + ) checker.run() @scrubber_cli_group.command(name="locate") @click.option("--start-object", default="swh:1:cnt:" + "00" * 20) @click.option("--end-object", default="swh:1:snp:" + "ff" * 20) @click.pass_context def scrubber_locate_origins(ctx, start_object: str, end_object: str): """For each known corrupt object reported in the scrubber DB, looks up origins that may contain this object, and records them; so they can be used later for recovery.""" conf = ctx.obj["config"] if "storage" not in conf: ctx.fail("You must have a storage configured in your config file.") if "graph" not in conf: ctx.fail("You must have a graph configured in your config file.") from swh.graph.client import RemoteGraphClient from swh.model.model import CoreSWHID from swh.storage import get_storage from .origin_locator import OriginLocator locator = OriginLocator( db=ctx.obj["db"], storage=get_storage(**conf["storage"]), graph=RemoteGraphClient(**conf["graph"]), start_object=CoreSWHID.from_string(start_object), end_object=CoreSWHID.from_string(end_object), ) locator.run() @scrubber_cli_group.command(name="fix") @click.option("--start-object", default="swh:1:cnt:" + "00" * 20) @click.option("--end-object", default="swh:1:snp:" + "ff" * 20) @click.pass_context def scrubber_fix_objects(ctx, start_object: str, end_object: str): """For each known corrupt object reported in the scrubber DB, looks up origins that may contain this object, and records them; so they can be used later for recovery.""" from swh.model.model import CoreSWHID from .fixer import Fixer fixer = Fixer( db=ctx.obj["db"], start_object=CoreSWHID.from_string(start_object), end_object=CoreSWHID.from_string(end_object), ) fixer.run() diff --git a/swh/scrubber/db.py b/swh/scrubber/db.py index 445ef22..1b9ddd4 100644 --- a/swh/scrubber/db.py +++ b/swh/scrubber/db.py @@ -1,296 +1,306 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import dataclasses import datetime import functools from typing import Iterator, List, Optional import psycopg2 from swh.core.db import BaseDb from swh.model.swhids import CoreSWHID @dataclasses.dataclass(frozen=True) class Datastore: """Represents a datastore being scrubbed; eg. swh-storage or swh-journal.""" package: str """'storage', 'journal', or 'objstorage'.""" cls: str """'postgresql'/'cassandra' for storage, 'kafka' for journal, 'pathslicer'/'winery'/... for objstorage.""" instance: str """Human readable string.""" @dataclasses.dataclass(frozen=True) class CorruptObject: id: CoreSWHID datastore: Datastore first_occurrence: datetime.datetime object_: bytes @dataclasses.dataclass(frozen=True) class FixedObject: id: CoreSWHID object_: bytes method: str recovery_date: Optional[datetime.datetime] = None class ScrubberDb(BaseDb): current_version = 1 @functools.lru_cache(1000) def datastore_get_or_add(self, datastore: Datastore) -> int: """Creates a datastore if it does not exist, and returns its id.""" cur = self.cursor() cur.execute( """ INSERT INTO datastore (package, class, instance) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING RETURNING id """, (datastore.package, datastore.cls, datastore.instance), ) (id_,) = cur.fetchone() return id_ def corrupt_object_add( - self, id: CoreSWHID, datastore: Datastore, serialized_object: bytes, + self, + id: CoreSWHID, + datastore: Datastore, + serialized_object: bytes, ) -> None: datastore_id = self.datastore_get_or_add(datastore) cur = self.cursor() cur.execute( """ INSERT INTO corrupt_object (id, datastore, object) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING """, (str(id), datastore_id, serialized_object), ) def corrupt_object_iter(self) -> Iterator[CorruptObject]: """Yields all records in the 'corrupt_object' table.""" cur = self.cursor() cur.execute( """ SELECT co.id, co.first_occurrence, co.object, ds.package, ds.class, ds.instance FROM corrupt_object AS co INNER JOIN datastore AS ds ON (ds.id=co.datastore) """ ) for row in cur: (id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row yield CorruptObject( id=CoreSWHID.from_string(id), first_occurrence=first_occurrence, object_=object_, datastore=Datastore( package=ds_package, cls=ds_class, instance=ds_instance ), ) def _corrupt_object_list_from_cursor( self, cur: psycopg2.extensions.cursor ) -> List[CorruptObject]: results = [] for row in cur: (id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row results.append( CorruptObject( id=CoreSWHID.from_string(id), first_occurrence=first_occurrence, object_=object_, datastore=Datastore( package=ds_package, cls=ds_class, instance=ds_instance ), ) ) return results def corrupt_object_get( - self, start_id: CoreSWHID, end_id: CoreSWHID, limit: int = 100, + self, + start_id: CoreSWHID, + end_id: CoreSWHID, + limit: int = 100, ) -> List[CorruptObject]: """Yields a page of records in the 'corrupt_object' table, ordered by id. Arguments: start_id: Only return objects after this id end_id: Only return objects before this id in_origin: An origin URL. If provided, only returns objects that may be found in the given origin """ cur = self.cursor() cur.execute( """ SELECT co.id, co.first_occurrence, co.object, ds.package, ds.class, ds.instance FROM corrupt_object AS co INNER JOIN datastore AS ds ON (ds.id=co.datastore) WHERE co.id >= %s AND co.id <= %s ORDER BY co.id LIMIT %s """, (str(start_id), str(end_id), limit), ) return self._corrupt_object_list_from_cursor(cur) def corrupt_object_grab_by_id( self, cur: psycopg2.extensions.cursor, start_id: CoreSWHID, end_id: CoreSWHID, limit: int = 100, ) -> List[CorruptObject]: """Returns a page of records in the 'corrupt_object' table for a fixer, ordered by id These records are not already fixed (ie. do not have a corresponding entry in the 'fixed_object' table), and they are selected with an exclusive update lock. Arguments: start_id: Only return objects after this id end_id: Only return objects before this id """ cur.execute( """ SELECT co.id, co.first_occurrence, co.object, ds.package, ds.class, ds.instance FROM corrupt_object AS co INNER JOIN datastore AS ds ON (ds.id=co.datastore) WHERE co.id >= %(start_id)s AND co.id <= %(end_id)s AND NOT EXISTS (SELECT 1 FROM fixed_object WHERE fixed_object.id=co.id) ORDER BY co.id LIMIT %(limit)s FOR UPDATE SKIP LOCKED """, - dict(start_id=str(start_id), end_id=str(end_id), limit=limit,), + dict( + start_id=str(start_id), + end_id=str(end_id), + limit=limit, + ), ) return self._corrupt_object_list_from_cursor(cur) def corrupt_object_grab_by_origin( self, cur: psycopg2.extensions.cursor, origin_url: str, start_id: Optional[CoreSWHID] = None, end_id: Optional[CoreSWHID] = None, limit: int = 100, ) -> List[CorruptObject]: """Returns a page of records in the 'corrupt_object' table for a fixer, ordered by id These records are not already fixed (ie. do not have a corresponding entry in the 'fixed_object' table), and they are selected with an exclusive update lock. Arguments: origin_url: only returns objects that may be found in the given origin """ cur.execute( """ SELECT co.id, co.first_occurrence, co.object, ds.package, ds.class, ds.instance FROM corrupt_object AS co INNER JOIN datastore AS ds ON (ds.id=co.datastore) INNER JOIN object_origin AS oo ON (oo.object_id=co.id) WHERE (co.id >= %(start_id)s OR %(start_id)s IS NULL) AND (co.id <= %(end_id)s OR %(end_id)s IS NULL) AND NOT EXISTS (SELECT 1 FROM fixed_object WHERE fixed_object.id=co.id) AND oo.origin_url=%(origin_url)s ORDER BY co.id LIMIT %(limit)s FOR UPDATE SKIP LOCKED """, dict( start_id=None if start_id is None else str(start_id), end_id=None if end_id is None else str(end_id), origin_url=origin_url, limit=limit, ), ) return self._corrupt_object_list_from_cursor(cur) def object_origin_add( self, cur: psycopg2.extensions.cursor, swhid: CoreSWHID, origins: List[str] ) -> None: psycopg2.extras.execute_values( cur, """ INSERT INTO object_origin (object_id, origin_url) VALUES %s ON CONFLICT DO NOTHING """, [(str(swhid), origin_url) for origin_url in origins], ) def object_origin_get(self, after: str = "", limit: int = 1000) -> List[str]: """Returns origins with non-fixed corrupt objects, ordered by URL. Arguments: after: if given, only returns origins with an URL after this value """ cur = self.cursor() cur.execute( """ SELECT DISTINCT origin_url FROM object_origin WHERE origin_url > %(after)s AND object_id IN ( (SELECT id FROM corrupt_object) EXCEPT (SELECT id FROM fixed_object) ) ORDER BY origin_url LIMIT %(limit)s """, dict(after=after, limit=limit), ) return [origin_url for (origin_url,) in cur] def fixed_object_add( self, cur: psycopg2.extensions.cursor, fixed_objects: List[FixedObject] ) -> None: psycopg2.extras.execute_values( cur, """ INSERT INTO fixed_object (id, object, method) VALUES %s ON CONFLICT DO NOTHING """, [ (str(fixed_object.id), fixed_object.object_, fixed_object.method) for fixed_object in fixed_objects ], ) def fixed_object_iter(self) -> Iterator[FixedObject]: cur = self.cursor() cur.execute("SELECT id, object, method, recovery_date FROM fixed_object") for (id, object_, method, recovery_date) in cur: yield FixedObject( id=CoreSWHID.from_string(id), object_=object_, method=method, recovery_date=recovery_date, ) diff --git a/swh/scrubber/storage_checker.py b/swh/scrubber/storage_checker.py index d5d306c..f3a42fe 100644 --- a/swh/scrubber/storage_checker.py +++ b/swh/scrubber/storage_checker.py @@ -1,104 +1,106 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Reads all objects in a swh-storage instance and recomputes their checksums.""" import contextlib import dataclasses import logging from typing import Iterable, Union from swh.journal.serializers import value_to_kafka from swh.model.model import Directory, Release, Revision, Snapshot from swh.storage import backfill from swh.storage.interface import StorageInterface from swh.storage.postgresql.storage import Storage as PostgresqlStorage from .db import Datastore, ScrubberDb logger = logging.getLogger(__name__) ScrubbableObject = Union[Revision, Release, Snapshot, Directory] @contextlib.contextmanager def storage_db(storage): db = storage.get_db() try: yield db finally: storage.put_db(db) @dataclasses.dataclass class StorageChecker: """Reads a chunk of a swh-storage database, recomputes checksums, and reports errors in a separate database.""" db: ScrubberDb storage: StorageInterface object_type: str """``directory``/``revision``/``release``/``snapshot``""" start_object: str """minimum value of the hexdigest of the object's sha1.""" end_object: str """maximum value of the hexdigest of the object's sha1.""" _datastore = None def datastore_info(self) -> Datastore: """Returns a :class:`Datastore` instance representing the swh-storage instance being checked.""" if self._datastore is None: if isinstance(self.storage, PostgresqlStorage): with storage_db(self.storage) as db: self._datastore = Datastore( - package="storage", cls="postgresql", instance=db.conn.dsn, + package="storage", + cls="postgresql", + instance=db.conn.dsn, ) else: raise NotImplementedError( f"StorageChecker(storage={self.storage!r}).datastore()" ) return self._datastore def run(self): """Runs on all objects of ``object_type`` and with id between ``start_object`` and ``end_object``. """ if isinstance(self.storage, PostgresqlStorage): with storage_db(self.storage) as db: return self._check_postgresql(db) else: raise NotImplementedError( f"StorageChecker(storage={self.storage!r}).check_storage()" ) def _check_postgresql(self, db): for range_start, range_end in backfill.RANGE_GENERATORS[self.object_type]( self.start_object, self.end_object ): logger.info( "Processing %s range %s to %s", self.object_type, backfill._format_range_bound(range_start), backfill._format_range_bound(range_end), ) objects = backfill.fetch( db, self.object_type, start=range_start, end=range_end ) objects = list(objects) self.process_objects(objects) def process_objects(self, objects: Iterable[ScrubbableObject]): for object_ in objects: real_id = object_.compute_hash() if object_.id != real_id: self.db.corrupt_object_add( object_.swhid(), self.datastore_info(), value_to_kafka(object_.to_dict()), )