diff --git a/mypy.ini b/mypy.ini index 9170785..08cf8c8 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,18 +1,22 @@ [mypy] namespace_packages = True warn_unused_ignores = True # 3rd party libraries without stubs (yet) + +[mypy-dulwich.*] +ignore_missing_imports = True + [mypy-pkg_resources.*] ignore_missing_imports = True [mypy-psycopg2.*] ignore_missing_imports = True [mypy-pytest.*] ignore_missing_imports = True # [mypy-add_your_lib_here.*] # ignore_missing_imports = True diff --git a/requirements-swh.txt b/requirements-swh.txt index ae5fa2a..cdce91c 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,5 +1,6 @@ # Add here internal Software Heritage dependencies, one per line. swh.core[http] >= 0.3 # [http] is required by swh.core.pytest_plugin +swh.loader.git >= 1.4.0 swh.model >= 5.0.0 swh.storage >= 1.1.0 swh.journal >= 0.9.0 diff --git a/swh/scrubber/cli.py b/swh/scrubber/cli.py index bbd5079..ca7a428 100644 --- a/swh/scrubber/cli.py +++ b/swh/scrubber/cli.py @@ -1,174 +1,197 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os from typing import Optional import click from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group @swh_cli_group.group(name="scrubber", context_settings=CONTEXT_SETTINGS) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.", ) @click.pass_context def scrubber_cli_group(ctx, config_file: Optional[str]) -> None: """main command group of the datastore scrubber Expected config format:: scrubber_db: cls: local db: "service=..." # libpq DSN # for storage checkers + origin locator only: storage: cls: postgresql # cannot be remote for checkers, as they need direct # access to the pg DB db": "service=..." # libpq DSN objstorage: cls: memory # for journal checkers only: journal_client: # see https://docs.softwareheritage.org/devel/apidoc/swh.journal.client.html # for the full list of options sasl.mechanism: SCRAM-SHA-512 security.protocol: SASL_SSL sasl.username: ... sasl.password: ... group_id: ... privileged: True message.max.bytes: 524288000 brokers: - "broker1.journal.softwareheritage.org:9093 - "broker2.journal.softwareheritage.org:9093 - "broker3.journal.softwareheritage.org:9093 - "broker4.journal.softwareheritage.org:9093 - "broker5.journal.softwareheritage.org:9093 object_types: [directory, revision, snapshot, release] auto_offset_reset: earliest """ from swh.core import config from . import get_scrubber_db if not config_file: config_file = os.environ.get("SWH_CONFIG_FILENAME") if config_file: if not os.path.exists(config_file): raise ValueError("%s does not exist" % config_file) conf = config.read(config_file) else: conf = {} if "scrubber_db" not in conf: ctx.fail("You must have a scrubber_db configured in your config file.") ctx.ensure_object(dict) ctx.obj["config"] = conf ctx.obj["db"] = get_scrubber_db(**conf["scrubber_db"]) @scrubber_cli_group.group(name="check") @click.pass_context def scrubber_check_cli_group(ctx): """group of commands which read from data stores and report errors. """ pass @scrubber_check_cli_group.command(name="storage") @click.option( "--object-type", type=click.Choice( # use a hardcoded list to prevent having to load the # replay module at cli loading time [ "snapshot", "revision", "release", "directory", # TODO: # "raw_extrinsic_metadata", # "extid", ] ), ) @click.option("--start-object", default="00" * 20) @click.option("--end-object", default="ff" * 20) @click.pass_context def scrubber_check_storage(ctx, object_type: str, start_object: str, end_object: str): """Reads a postgresql storage, and reports corrupt objects to the scrubber DB.""" conf = ctx.obj["config"] if "storage" not in conf: ctx.fail("You must have a storage configured in your config file.") from swh.storage import get_storage from .storage_checker import StorageChecker checker = StorageChecker( db=ctx.obj["db"], storage=get_storage(**conf["storage"]), object_type=object_type, start_object=start_object, end_object=end_object, ) checker.run() @scrubber_check_cli_group.command(name="journal") @click.pass_context def scrubber_check_journal(ctx) -> None: """Reads a complete kafka journal, and reports corrupt objects to the scrubber DB.""" conf = ctx.obj["config"] if "journal_client" not in conf: ctx.fail("You must have a journal_client configured in your config file.") from .journal_checker import JournalChecker checker = JournalChecker(db=ctx.obj["db"], journal_client=conf["journal_client"],) checker.run() @scrubber_cli_group.command(name="locate") @click.option("--start-object", default="swh:1:cnt:" + "00" * 20) @click.option("--end-object", default="swh:1:snp:" + "ff" * 20) @click.pass_context def scrubber_locate_origins(ctx, start_object: str, end_object: str): """For each known corrupt object reported in the scrubber DB, looks up origins that may contain this object, and records them; so they can be used later for recovery.""" conf = ctx.obj["config"] if "storage" not in conf: ctx.fail("You must have a storage configured in your config file.") + if "graph" not in conf: + ctx.fail("You must have a graph configured in your config file.") from swh.graph.client import RemoteGraphClient from swh.model.model import CoreSWHID from swh.storage import get_storage from .origin_locator import OriginLocator locator = OriginLocator( db=ctx.obj["db"], storage=get_storage(**conf["storage"]), graph=RemoteGraphClient(**conf["graph"]), start_object=CoreSWHID.from_string(start_object), end_object=CoreSWHID.from_string(end_object), ) locator.run() + + +@scrubber_cli_group.command(name="fix") +@click.option("--start-object", default="swh:1:cnt:" + "00" * 20) +@click.option("--end-object", default="swh:1:snp:" + "ff" * 20) +@click.pass_context +def scrubber_fix_objects(ctx, start_object: str, end_object: str): + """For each known corrupt object reported in the scrubber DB, looks up origins + that may contain this object, and records them; so they can be used later + for recovery.""" + from swh.model.model import CoreSWHID + + from .fixer import Fixer + + fixer = Fixer( + db=ctx.obj["db"], + start_object=CoreSWHID.from_string(start_object), + end_object=CoreSWHID.from_string(end_object), + ) + + fixer.run() diff --git a/swh/scrubber/db.py b/swh/scrubber/db.py index 2efa953..445ef22 100644 --- a/swh/scrubber/db.py +++ b/swh/scrubber/db.py @@ -1,145 +1,296 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import dataclasses import datetime import functools -from typing import Iterator, List +from typing import Iterator, List, Optional import psycopg2 from swh.core.db import BaseDb from swh.model.swhids import CoreSWHID @dataclasses.dataclass(frozen=True) class Datastore: """Represents a datastore being scrubbed; eg. swh-storage or swh-journal.""" package: str """'storage', 'journal', or 'objstorage'.""" cls: str """'postgresql'/'cassandra' for storage, 'kafka' for journal, 'pathslicer'/'winery'/... for objstorage.""" instance: str """Human readable string.""" @dataclasses.dataclass(frozen=True) class CorruptObject: id: CoreSWHID datastore: Datastore first_occurrence: datetime.datetime object_: bytes +@dataclasses.dataclass(frozen=True) +class FixedObject: + id: CoreSWHID + object_: bytes + method: str + recovery_date: Optional[datetime.datetime] = None + + class ScrubberDb(BaseDb): current_version = 1 @functools.lru_cache(1000) def datastore_get_or_add(self, datastore: Datastore) -> int: """Creates a datastore if it does not exist, and returns its id.""" cur = self.cursor() cur.execute( """ INSERT INTO datastore (package, class, instance) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING RETURNING id """, (datastore.package, datastore.cls, datastore.instance), ) (id_,) = cur.fetchone() return id_ def corrupt_object_add( self, id: CoreSWHID, datastore: Datastore, serialized_object: bytes, ) -> None: datastore_id = self.datastore_get_or_add(datastore) cur = self.cursor() cur.execute( """ INSERT INTO corrupt_object (id, datastore, object) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING """, (str(id), datastore_id, serialized_object), ) def corrupt_object_iter(self) -> Iterator[CorruptObject]: """Yields all records in the 'corrupt_object' table.""" cur = self.cursor() cur.execute( """ SELECT co.id, co.first_occurrence, co.object, ds.package, ds.class, ds.instance FROM corrupt_object AS co INNER JOIN datastore AS ds ON (ds.id=co.datastore) """ ) for row in cur: (id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row yield CorruptObject( id=CoreSWHID.from_string(id), first_occurrence=first_occurrence, object_=object_, datastore=Datastore( package=ds_package, cls=ds_class, instance=ds_instance ), ) - def corrupt_object_grab( - self, - cur, - start_id: CoreSWHID = None, - end_id: CoreSWHID = None, - limit: int = 100, + def _corrupt_object_list_from_cursor( + self, cur: psycopg2.extensions.cursor ) -> List[CorruptObject]: - """Yields a page of records in the 'corrupt_object' table.""" + results = [] + for row in cur: + (id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row + results.append( + CorruptObject( + id=CoreSWHID.from_string(id), + first_occurrence=first_occurrence, + object_=object_, + datastore=Datastore( + package=ds_package, cls=ds_class, instance=ds_instance + ), + ) + ) + + return results + + def corrupt_object_get( + self, start_id: CoreSWHID, end_id: CoreSWHID, limit: int = 100, + ) -> List[CorruptObject]: + """Yields a page of records in the 'corrupt_object' table, ordered by id. + + Arguments: + start_id: Only return objects after this id + end_id: Only return objects before this id + in_origin: An origin URL. If provided, only returns objects that may be + found in the given origin + """ + cur = self.cursor() cur.execute( """ SELECT co.id, co.first_occurrence, co.object, ds.package, ds.class, ds.instance FROM corrupt_object AS co INNER JOIN datastore AS ds ON (ds.id=co.datastore) WHERE co.id >= %s AND co.id <= %s ORDER BY co.id LIMIT %s """, (str(start_id), str(end_id), limit), ) + return self._corrupt_object_list_from_cursor(cur) - results = [] - for row in cur: - (id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row - results.append( - CorruptObject( - id=CoreSWHID.from_string(id), - first_occurrence=first_occurrence, - object_=object_, - datastore=Datastore( - package=ds_package, cls=ds_class, instance=ds_instance - ), - ) - ) + def corrupt_object_grab_by_id( + self, + cur: psycopg2.extensions.cursor, + start_id: CoreSWHID, + end_id: CoreSWHID, + limit: int = 100, + ) -> List[CorruptObject]: + """Returns a page of records in the 'corrupt_object' table for a fixer, + ordered by id - return results + These records are not already fixed (ie. do not have a corresponding entry + in the 'fixed_object' table), and they are selected with an exclusive update + lock. - def object_origin_add(self, cur, swhid: CoreSWHID, origins: List[str]) -> None: + Arguments: + start_id: Only return objects after this id + end_id: Only return objects before this id + """ + cur.execute( + """ + SELECT + co.id, co.first_occurrence, co.object, + ds.package, ds.class, ds.instance + FROM corrupt_object AS co + INNER JOIN datastore AS ds ON (ds.id=co.datastore) + WHERE + co.id >= %(start_id)s + AND co.id <= %(end_id)s + AND NOT EXISTS (SELECT 1 FROM fixed_object WHERE fixed_object.id=co.id) + ORDER BY co.id + LIMIT %(limit)s + FOR UPDATE SKIP LOCKED + """, + dict(start_id=str(start_id), end_id=str(end_id), limit=limit,), + ) + return self._corrupt_object_list_from_cursor(cur) + + def corrupt_object_grab_by_origin( + self, + cur: psycopg2.extensions.cursor, + origin_url: str, + start_id: Optional[CoreSWHID] = None, + end_id: Optional[CoreSWHID] = None, + limit: int = 100, + ) -> List[CorruptObject]: + """Returns a page of records in the 'corrupt_object' table for a fixer, + ordered by id + + These records are not already fixed (ie. do not have a corresponding entry + in the 'fixed_object' table), and they are selected with an exclusive update + lock. + + Arguments: + origin_url: only returns objects that may be found in the given origin + """ + cur.execute( + """ + SELECT + co.id, co.first_occurrence, co.object, + ds.package, ds.class, ds.instance + FROM corrupt_object AS co + INNER JOIN datastore AS ds ON (ds.id=co.datastore) + INNER JOIN object_origin AS oo ON (oo.object_id=co.id) + WHERE + (co.id >= %(start_id)s OR %(start_id)s IS NULL) + AND (co.id <= %(end_id)s OR %(end_id)s IS NULL) + AND NOT EXISTS (SELECT 1 FROM fixed_object WHERE fixed_object.id=co.id) + AND oo.origin_url=%(origin_url)s + ORDER BY co.id + LIMIT %(limit)s + FOR UPDATE SKIP LOCKED + """, + dict( + start_id=None if start_id is None else str(start_id), + end_id=None if end_id is None else str(end_id), + origin_url=origin_url, + limit=limit, + ), + ) + return self._corrupt_object_list_from_cursor(cur) + + def object_origin_add( + self, cur: psycopg2.extensions.cursor, swhid: CoreSWHID, origins: List[str] + ) -> None: psycopg2.extras.execute_values( cur, """ INSERT INTO object_origin (object_id, origin_url) VALUES %s ON CONFLICT DO NOTHING """, [(str(swhid), origin_url) for origin_url in origins], ) + + def object_origin_get(self, after: str = "", limit: int = 1000) -> List[str]: + """Returns origins with non-fixed corrupt objects, ordered by URL. + + Arguments: + after: if given, only returns origins with an URL after this value + """ + cur = self.cursor() + cur.execute( + """ + SELECT DISTINCT origin_url + FROM object_origin + WHERE + origin_url > %(after)s + AND object_id IN ( + (SELECT id FROM corrupt_object) + EXCEPT (SELECT id FROM fixed_object) + ) + ORDER BY origin_url + LIMIT %(limit)s + """, + dict(after=after, limit=limit), + ) + + return [origin_url for (origin_url,) in cur] + + def fixed_object_add( + self, cur: psycopg2.extensions.cursor, fixed_objects: List[FixedObject] + ) -> None: + psycopg2.extras.execute_values( + cur, + """ + INSERT INTO fixed_object (id, object, method) + VALUES %s + ON CONFLICT DO NOTHING + """, + [ + (str(fixed_object.id), fixed_object.object_, fixed_object.method) + for fixed_object in fixed_objects + ], + ) + + def fixed_object_iter(self) -> Iterator[FixedObject]: + cur = self.cursor() + cur.execute("SELECT id, object, method, recovery_date FROM fixed_object") + for (id, object_, method, recovery_date) in cur: + yield FixedObject( + id=CoreSWHID.from_string(id), + object_=object_, + method=method, + recovery_date=recovery_date, + ) diff --git a/swh/scrubber/fixer.py b/swh/scrubber/fixer.py new file mode 100644 index 0000000..14667a9 --- /dev/null +++ b/swh/scrubber/fixer.py @@ -0,0 +1,205 @@ +# Copyright (C) 2021-2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Reads all known corrupts objects from the swh-scrubber database, +and tries to recover them. + +Currently, only recovery from Git origins is implemented""" + +import dataclasses +import functools +import logging +import os +from pathlib import Path +import subprocess +import tempfile +from typing import Dict, Optional, Type, Union + +import dulwich +import dulwich.objects +import dulwich.repo +import psycopg2 + +from swh.journal.serializers import kafka_to_value, value_to_kafka +from swh.loader.git import converters +from swh.model.hashutil import hash_to_bytehex, hash_to_hex +from swh.model.model import BaseModel, Directory, Release, Revision, Snapshot +from swh.model.swhids import CoreSWHID, ObjectType + +from .db import CorruptObject, FixedObject, ScrubberDb +from .utils import iter_corrupt_objects + +logger = logging.getLogger(__name__) + +ScrubbableObject = Union[Revision, Release, Snapshot, Directory] + + +def get_object_from_clone( + clone_path: Path, swhid: CoreSWHID +) -> Union[None, bytes, dulwich.objects.ShaFile]: + """Reads the original object matching the ``corrupt_object`` from the given clone + if it exists, and returns a Dulwich object if possible, or a the raw manifest.""" + try: + repo = dulwich.repo.Repo(str(clone_path)) + except dulwich.errors.NotGitRepository: + return None + + with repo: # needed to avoid packfile fd leaks + try: + return repo[hash_to_bytehex(swhid.object_id)] + except KeyError: + return None + except dulwich.errors.ObjectFormatException: + # fallback to git if dulwich can't parse it. + # Unfortunately, Dulwich does not allow fetching an object without + # parsing it into a ShaFile subclass, so we have to manually get it + # by shelling out to git. + object_type = ( + subprocess.check_output( + [ + "git", + "-C", + clone_path, + "cat-file", + "-t", + hash_to_hex(swhid.object_id), + ] + ) + .decode() + .strip() + ) + manifest = subprocess.check_output( + [ + "git", + "-C", + clone_path, + "cat-file", + object_type, + hash_to_hex(swhid.object_id), + ] + ) + manifest = f"{object_type} {len(manifest)}\x00".encode() + manifest + logger.info("Dulwich failed to parse %r", manifest) + return manifest + + +def get_fixed_object_from_clone( + clone_path: Path, corrupt_object: CorruptObject +) -> Optional[FixedObject]: + """Reads the original object matching the ``corrupt_object`` from the given clone + if it exists, and returns a :class:`FixedObject` instance ready to be inserted + in the database.""" + cloned_dulwich_obj_or_manifest = get_object_from_clone( + clone_path, corrupt_object.id + ) + if cloned_dulwich_obj_or_manifest is None: + # Origin still exists, but object disappeared + logger.info("%s not found in origin", corrupt_object.id) + return None + elif isinstance(cloned_dulwich_obj_or_manifest, bytes): + # Dulwich could not parse it. Add as raw manifest to the existing object + d = kafka_to_value(corrupt_object.object_) + assert d.get("raw_manifest") is None, "Corrupt object has a raw_manifest" + d["raw_manifest"] = cloned_dulwich_obj_or_manifest + + # Rebuild the object from the stored corrupt object + the raw manifest + # just recovered; then checksum it. + classes: Dict[ObjectType, Type[BaseModel]] = { + ObjectType.REVISION: Revision, + ObjectType.DIRECTORY: Directory, + ObjectType.RELEASE: Release, + } + cls = classes[corrupt_object.id.object_type] + recovered_obj = cls.from_dict(d) + recovered_obj.check() + + return FixedObject( + id=corrupt_object.id, + object_=value_to_kafka(d), + method="manifest_from_origin", + ) + else: + converter = { + ObjectType.REVISION: converters.dulwich_commit_to_revision, + ObjectType.DIRECTORY: converters.dulwich_tree_to_directory, + ObjectType.RELEASE: converters.dulwich_tag_to_release, + }[corrupt_object.id.object_type] + cloned_obj = converter(cloned_dulwich_obj_or_manifest) + + # Check checksum, among others + cloned_obj.check() + + return FixedObject( + id=corrupt_object.id, + object_=value_to_kafka(cloned_obj.to_dict()), + method="from_origin", + ) + + +@dataclasses.dataclass +class Fixer: + """Reads a chunk of corrupt objects in the swh-scrubber database, tries to recover + them through various means (brute-forcing fields and re-downloading from the origin) + recomputes checksums, and writes them back to the swh-scrubber database + if successful. + + """ + + db: ScrubberDb + """Database to read from and write to.""" + start_object: CoreSWHID = CoreSWHID.from_string("swh:1:cnt:" + "00" * 20) + """Minimum SWHID to check (in alphabetical order)""" + end_object: CoreSWHID = CoreSWHID.from_string("swh:1:snp:" + "ff" * 20) + """Maximum SWHID to check (in alphabetical order)""" + + def run(self): + # TODO: currently only support re-downloading from the origin: + # we should try brute-forcing for objects with no known origin (or when + # all origins fail) + after = "" + while True: + new_origins = self.db.object_origin_get(after=after) + if not new_origins: + break + for origin_url in new_origins: + self.recover_objects_from_origin(origin_url) + after = new_origins[-1] + + def recover_objects_from_origin(self, origin_url): + """Clones an origin, and cherry-picks original objects that are known to be + corrupt in the database.""" + with tempfile.TemporaryDirectory(prefix=__name__ + ".") as tempdir: + clone_path = Path(tempdir) / "repository.git" + try: + subprocess.run( + ["git", "clone", "--bare", origin_url, clone_path], + env={"PATH": os.environ["PATH"], "GIT_TERMINAL_PROMPT": "0"}, + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, + ) + except Exception: + logger.exception("Failed to clone %s", origin_url) + return + + iter_corrupt_objects( + self.db, + self.start_object, + self.end_object, + origin_url, + functools.partial(self.recover_corrupt_object, clone_path=clone_path), + ) + + def recover_corrupt_object( + self, + corrupt_object: CorruptObject, + cur: psycopg2.extensions.cursor, + clone_path: Path, + ) -> None: + fixed_object = get_fixed_object_from_clone(clone_path, corrupt_object) + + if fixed_object is not None: + self.db.fixed_object_add(cur, [fixed_object]) diff --git a/swh/scrubber/origin_locator.py b/swh/scrubber/origin_locator.py index aa19010..c12cdbe 100644 --- a/swh/scrubber/origin_locator.py +++ b/swh/scrubber/origin_locator.py @@ -1,87 +1,91 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Lists corrupt objects in the scrubber database, and lists candidate origins to recover them from.""" import dataclasses import itertools import logging from typing import Iterable, Union import psycopg2 from swh.core.utils import grouper from swh.graph.client import GraphArgumentException, RemoteGraphClient from swh.model.model import Directory, Release, Revision, Snapshot from swh.model.swhids import CoreSWHID, ExtendedSWHID from swh.storage.interface import StorageInterface from .db import CorruptObject, ScrubberDb from .utils import iter_corrupt_objects logger = logging.getLogger(__name__) ScrubbableObject = Union[Revision, Release, Snapshot, Directory] def get_origins( graph: RemoteGraphClient, storage: StorageInterface, swhid: CoreSWHID ) -> Iterable[str]: try: origin_swhids = [ ExtendedSWHID.from_string(line) for line in graph.leaves(str(swhid), direction="backward") if line.startswith("swh:1:ori:") ] except GraphArgumentException: return for origin_swhid_group in grouper(origin_swhids, 10): origin_swhid_group = list(origin_swhid_group) for (origin, origin_swhid) in zip( storage.origin_get_by_sha1( [origin_swhid.object_id for origin_swhid in origin_swhid_group] ), origin_swhid_group, ): if origin is None: logger.error("%s found in graph but missing from storage", origin_swhid) else: yield origin["url"] @dataclasses.dataclass class OriginLocator: """Reads a chunk of corrupt objects in the swh-scrubber database, then writes to the same database a list of origins they might be recovered from.""" db: ScrubberDb """Database to read from and write to.""" graph: RemoteGraphClient storage: StorageInterface """Used to resolve origin SHA1s to URLs.""" start_object: CoreSWHID """Minimum SWHID to check (in alphabetical order)""" end_object: CoreSWHID """Maximum SWHID to check (in alphabetical order)""" def run(self): iter_corrupt_objects( - self.db, self.start_object, self.end_object, self.handle_corrupt_object + self.db, + self.start_object, + self.end_object, + None, + self.handle_corrupt_object, ) def handle_corrupt_object( self, corrupt_object: CorruptObject, cur: psycopg2.extensions.cursor ) -> None: origins = get_origins(self.graph, self.storage, corrupt_object.id) # Keep only 100 origins, to avoid flooding the DB. # It is very unlikely an object disappred from 100 somwhat-randomly sampled # origins. first_origins = list(itertools.islice(origins, 0, 100)) self.db.object_origin_add(cur, corrupt_object.id, first_origins) diff --git a/swh/scrubber/sql/30-schema.sql b/swh/scrubber/sql/30-schema.sql index 5cfab88..1d4f423 100644 --- a/swh/scrubber/sql/30-schema.sql +++ b/swh/scrubber/sql/30-schema.sql @@ -1,37 +1,50 @@ create domain swhid as text check (value ~ '^swh:[0-9]+:.*'); create table datastore ( id bigserial not null, package datastore_type not null, class text, instance text ); comment on table datastore is 'Each row identifies a data store being scrubbed'; comment on column datastore.id is 'Internal identifier of the datastore'; comment on column datastore.package is 'Name of the component using this datastore (storage/journal/objstorage)'; comment on column datastore.class is 'For datastores with multiple backends, name of the backend (postgresql/cassandra for storage, kafka for journal, pathslicer/azure/winery/... for objstorage)'; comment on column datastore.instance is 'Human-readable way to uniquely identify the datastore; eg. its URL or DSN.'; create table corrupt_object ( id swhid not null, datastore int not null, object bytea not null, first_occurrence timestamptz not null default now() ); comment on table corrupt_object is 'Each row identifies an object that was found to be corrupt'; comment on column corrupt_object.datastore is 'Datastore the corrupt object was found in.'; comment on column corrupt_object.object is 'Corrupt object, as found in the datastore (possibly msgpack-encoded, using the journal''s serializer)'; comment on column corrupt_object.first_occurrence is 'Moment the object was found to be corrupt for the first time'; create table object_origin ( object_id swhid not null, origin_url text not null, last_attempt timestamptz -- NULL if not tried yet ); comment on table object_origin is 'Maps objects to origins they might be found in.'; + +create table fixed_object +( + id swhid not null, + object bytea not null, + method text, + recovery_date timestamptz not null default now() +); + +comment on table fixed_object is 'Each row identifies an object that was found to be corrupt, along with the original version of the object'; +comment on column fixed_object.object is 'The recovered object itself, as a msgpack-encoded dict'; +comment on column fixed_object.recovery_date is 'Moment the object was recovered.'; +comment on column fixed_object.method is 'How the object was recovered. For example: "from_origin", "negative_utc", "capitalized_revision_parent".'; diff --git a/swh/scrubber/sql/60-indexes.sql b/swh/scrubber/sql/60-indexes.sql index 5acd9c8..cdea5fe 100644 --- a/swh/scrubber/sql/60-indexes.sql +++ b/swh/scrubber/sql/60-indexes.sql @@ -1,24 +1,29 @@ -- datastore create unique index concurrently datastore_pkey on datastore(id); alter table datastore add primary key using index datastore_pkey; create unique index concurrently datastore_package_class_instance on datastore(package, class, instance); -- corrupt_object alter table corrupt_object add constraint corrupt_object_datastore_fkey foreign key (datastore) references datastore(id) not valid; alter table corrupt_object validate constraint corrupt_object_datastore_fkey; create unique index concurrently corrupt_object_pkey on corrupt_object(id, datastore); alter table corrupt_object add primary key using index corrupt_object_pkey; -- object_origin create unique index concurrently object_origin_pkey on object_origin (object_id, origin_url); create index concurrently object_origin_by_origin on object_origin (origin_url, object_id); -- FIXME: not valid, because corrupt_object(id) is not unique -- alter table object_origin add constraint object_origin_object_fkey foreign key (object_id) references corrupt_object(id) not valid; -- alter table object_origin validate constraint object_origin_object_fkey; + +-- fixed_object + +create unique index concurrently fixed_object_pkey on fixed_object(id); +alter table fixed_object add primary key using index fixed_object_pkey; diff --git a/swh/scrubber/tests/test_cli.py b/swh/scrubber/tests/test_cli.py index 37e39d8..b54a5c8 100644 --- a/swh/scrubber/tests/test_cli.py +++ b/swh/scrubber/tests/test_cli.py @@ -1,140 +1,159 @@ # Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tempfile from unittest.mock import MagicMock, call from click.testing import CliRunner import yaml from swh.model.swhids import CoreSWHID from swh.scrubber.cli import scrubber_cli_group from swh.scrubber.storage_checker import storage_db def invoke( scrubber_db, args, storage=None, kafka_server=None, kafka_prefix=None, kafka_consumer_group=None, ): runner = CliRunner() config = { "scrubber_db": {"cls": "local", "db": scrubber_db.conn.dsn}, "graph": {"url": "http://graph.example.org:5009/"}, } if storage: with storage_db(storage) as db: config["storage"] = { "cls": "postgresql", "db": db.conn.dsn, "objstorage": {"cls": "memory"}, } assert ( (kafka_server is None) == (kafka_prefix is None) == (kafka_consumer_group is None) ) if kafka_server: config["journal_client"] = dict( cls="kafka", brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, ) with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: yaml.dump(config, config_fd) config_fd.seek(0) args = ["-C" + config_fd.name] + list(args) result = runner.invoke(scrubber_cli_group, args, catch_exceptions=False) return result def test_check_storage(mocker, scrubber_db, swh_storage): storage_checker = MagicMock() StorageChecker = mocker.patch( "swh.scrubber.storage_checker.StorageChecker", return_value=storage_checker ) get_scrubber_db = mocker.patch( "swh.scrubber.get_scrubber_db", return_value=scrubber_db ) result = invoke( scrubber_db, ["check", "storage", "--object-type=snapshot"], storage=swh_storage ) assert result.exit_code == 0, result.output assert result.output == "" get_scrubber_db.assert_called_once_with(cls="local", db=scrubber_db.conn.dsn) StorageChecker.assert_called_once_with( db=scrubber_db, storage=StorageChecker.mock_calls[0][2]["storage"], object_type="snapshot", start_object="0" * 40, end_object="f" * 40, ) assert storage_checker.method_calls == [call.run()] def test_check_journal( mocker, scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group ): journal_checker = MagicMock() JournalChecker = mocker.patch( "swh.scrubber.journal_checker.JournalChecker", return_value=journal_checker ) get_scrubber_db = mocker.patch( "swh.scrubber.get_scrubber_db", return_value=scrubber_db ) result = invoke( scrubber_db, ["check", "journal"], kafka_server=kafka_server, kafka_prefix=kafka_prefix, kafka_consumer_group=kafka_consumer_group, ) assert result.exit_code == 0, result.output assert result.output == "" get_scrubber_db.assert_called_once_with(cls="local", db=scrubber_db.conn.dsn) JournalChecker.assert_called_once_with( db=scrubber_db, journal_client={ "brokers": kafka_server, "cls": "kafka", "group_id": kafka_consumer_group, "prefix": kafka_prefix, "stop_on_eof": True, }, ) assert journal_checker.method_calls == [call.run()] def test_locate_origins(mocker, scrubber_db, swh_storage): origin_locator = MagicMock() OriginLocator = mocker.patch( "swh.scrubber.origin_locator.OriginLocator", return_value=origin_locator ) get_scrubber_db = mocker.patch( "swh.scrubber.get_scrubber_db", return_value=scrubber_db ) result = invoke(scrubber_db, ["locate"], storage=swh_storage) assert result.exit_code == 0, result.output assert result.output == "" get_scrubber_db.assert_called_once_with(cls="local", db=scrubber_db.conn.dsn) OriginLocator.assert_called_once_with( db=scrubber_db, storage=OriginLocator.mock_calls[0][2]["storage"], graph=OriginLocator.mock_calls[0][2]["graph"], start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20), end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20), ) assert origin_locator.method_calls == [call.run()] + + +def test_fix_objects(mocker, scrubber_db): + fixer = MagicMock() + Fixer = mocker.patch("swh.scrubber.fixer.Fixer", return_value=fixer) + get_scrubber_db = mocker.patch( + "swh.scrubber.get_scrubber_db", return_value=scrubber_db + ) + result = invoke(scrubber_db, ["fix"]) + assert result.exit_code == 0, result.output + assert result.output == "" + + get_scrubber_db.assert_called_once_with(cls="local", db=scrubber_db.conn.dsn) + Fixer.assert_called_once_with( + db=scrubber_db, + start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20), + end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20), + ) + assert fixer.method_calls == [call.run()] diff --git a/swh/scrubber/tests/test_fixer.py b/swh/scrubber/tests/test_fixer.py new file mode 100644 index 0000000..ab8ebef --- /dev/null +++ b/swh/scrubber/tests/test_fixer.py @@ -0,0 +1,331 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime +import logging +from pathlib import Path +import subprocess +from unittest.mock import MagicMock +import zlib + +import attr + +from swh.journal.serializers import kafka_to_value, value_to_kafka +from swh.model.hashutil import hash_to_bytes +from swh.model.model import Directory, DirectoryEntry +from swh.model.tests.swh_model_data import DIRECTORIES +from swh.scrubber.db import CorruptObject, Datastore, FixedObject, ScrubberDb +from swh.scrubber.fixer import Fixer + +(DIRECTORY,) = [dir_ for dir_ in DIRECTORIES if len(dir_.entries) > 1] + +# ORIGINAL_DIRECTORY represents a directory with entries in non-canonical order, +# and a consistent hash. Its entries' were canonically reordered, but the original +# order is still present in the raw manifest. +_DIR = Directory(entries=tuple(reversed(DIRECTORY.entries))) +ORIGINAL_DIRECTORY = Directory( + entries=( + DirectoryEntry( + name=b"dir1", + type="dir", + target=hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), + perms=0o040755, + ), + DirectoryEntry( + name=b"file1.ext", + type="file", + target=hash_to_bytes("86bc6b377e9d25f9d26777a4a28d08e63e7c5779"), + perms=0o644, + ), + DirectoryEntry( + name=b"subprepo1", + type="rev", + target=hash_to_bytes("c7f96242d73c267adc77c2908e64e0c1cb6a4431"), + perms=0o160000, + ), + ), + raw_manifest=( + b"tree 102\x00" + b"160000 subprepo1\x00\xc7\xf9bB\xd7<&z\xdcw\xc2\x90\x8ed\xe0\xc1\xcbjD1" + b"644 file1.ext\x00\x86\xbck7~\x9d%\xf9\xd2gw\xa4\xa2\x8d\x08\xe6>|Wy" + b"40755 dir1\x00K\x82]\xc6B\xcbn\xb9\xa0`\xe5K\xf8\xd6\x92\x88\xfb\xeeI\x04" + ), +) + +# A directory with its entries in canonical order, but a hash computed as if +# computed in the reverse order. +# This happens when entries get normalized (either by the loader or accidentally +# in swh-storage) +CORRUPT_DIRECTORY = attr.evolve(ORIGINAL_DIRECTORY, raw_manifest=None) + + +assert ORIGINAL_DIRECTORY != CORRUPT_DIRECTORY +assert ( + hash_to_bytes("61992617462fff81509bda4a24b54c96ea74a007") + == ORIGINAL_DIRECTORY.id + == CORRUPT_DIRECTORY.id +) +assert ( + hash_to_bytes("81fda5b242e65fc81201e590d0f0ce5f582fbcdd") + == CORRUPT_DIRECTORY.compute_hash() + != CORRUPT_DIRECTORY.id +) +assert ORIGINAL_DIRECTORY.entries == CORRUPT_DIRECTORY.entries + +DATASTORE = Datastore(package="storage", cls="postgresql", instance="service=swh") +CORRUPT_OBJECT = CorruptObject( + id=ORIGINAL_DIRECTORY.swhid(), + datastore=DATASTORE, + first_occurrence=datetime.datetime.now(tz=datetime.timezone.utc), + object_=value_to_kafka(CORRUPT_DIRECTORY.to_dict()), +) + + +def test_no_object(scrubber_db: ScrubberDb, mocker) -> None: + """There is no object to recover -> nothing happens""" + fixer = Fixer(db=scrubber_db) + fixer.run() + + with scrubber_db.conn.cursor() as cur: + cur.execute("SELECT COUNT(*) FROM fixed_object") + assert cur.fetchone() == (0,) + + +def test_no_origin(scrubber_db: ScrubberDb, mocker) -> None: + """There is no origin to recover objects from -> nothing happens""" + scrubber_db.corrupt_object_add( + CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_ + ) + + fixer = Fixer(db=scrubber_db) + fixer.run() + + with scrubber_db.conn.cursor() as cur: + cur.execute("SELECT COUNT(*) FROM fixed_object") + assert cur.fetchone() == (0,) + + +def test_already_fixed(scrubber_db: ScrubberDb, mocker) -> None: + """All corrupt objects are already fixed -> nothing happens""" + fixed_object = FixedObject( + id=CORRUPT_OBJECT.id, + object_=value_to_kafka(ORIGINAL_DIRECTORY.to_dict()), + method="whatever means necessary", + ) + scrubber_db.corrupt_object_add( + CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_ + ) + with scrubber_db.cursor() as cur: + scrubber_db.object_origin_add(cur, CORRUPT_OBJECT.id, ["http://example.org/"]) + scrubber_db.fixed_object_add(cur, [fixed_object]) + + subprocess_run = mocker.patch("subprocess.run") + + scrubber_db = MagicMock(wraps=scrubber_db) + + fixer = Fixer(db=scrubber_db) + fixer.run() + + # Check the Fixer did not try to fix the object again + scrubber_db.fixed_object_add.assert_not_called() + subprocess_run.assert_not_called() + with scrubber_db.conn.cursor() as cur: + cur.execute("SELECT id, method FROM fixed_object") + assert list(cur) == [(str(fixed_object.id), fixed_object.method)] + + +def _run_fixer_with_clone( + scrubber_db: ScrubberDb, + mocker, + caplog, + corrupt_object: CorruptObject, + subprocess_run_side_effect, +) -> None: + """Helper for all tests that involve running the fixer with a clone: + adds a corrupt object and an origin to the DB, mocks subprocess.run with the + given function, and runs the fixer with caplog""" + scrubber_db.corrupt_object_add( + corrupt_object.id, corrupt_object.datastore, corrupt_object.object_ + ) + with scrubber_db.cursor() as cur: + scrubber_db.object_origin_add(cur, corrupt_object.id, ["http://example.org/"]) + + subprocess_run = mocker.patch( + "subprocess.run", side_effect=subprocess_run_side_effect + ) + + fixer = Fixer(db=scrubber_db) + with caplog.at_level(logging.CRITICAL): + with caplog.at_level(logging.INFO, logger="swh.scrubber.fixer"): + fixer.run() + + subprocess_run.assert_called() + + +def test_failed_clone(scrubber_db: ScrubberDb, mocker, caplog) -> None: + """Corrupt object found with an origin, but the origin's clone is broken somehow""" + scrubber_db = MagicMock(wraps=scrubber_db) + + _run_fixer_with_clone( + scrubber_db, + mocker, + caplog, + corrupt_object=CORRUPT_OBJECT, + subprocess_run_side_effect=subprocess.CalledProcessError(1, "foo"), + ) + + scrubber_db.fixed_object_add.assert_not_called() + with scrubber_db.conn.cursor() as cur: + cur.execute("SELECT id, method FROM fixed_object") + assert list(cur) == [] + + assert ( + "swh.scrubber.fixer", + logging.ERROR, + "Failed to clone http://example.org/", + ) in caplog.record_tuples + + +def test_empty_origin(scrubber_db: ScrubberDb, mocker, caplog) -> None: + """Corrupt object found with an origin, but the origin's clone is missing + the object""" + scrubber_db = MagicMock(wraps=scrubber_db) + real_subprocess_run = subprocess.run + + def subprocess_run(args, **kwargs): + (*head, path) = args + assert head == ["git", "clone", "--bare", "http://example.org/"] + real_subprocess_run(["git", "init", "--bare", path]) + + _run_fixer_with_clone( + scrubber_db, + mocker, + caplog, + corrupt_object=CORRUPT_OBJECT, + subprocess_run_side_effect=subprocess_run, + ) + + scrubber_db.fixed_object_add.assert_not_called() + with scrubber_db.conn.cursor() as cur: + cur.execute("SELECT id, method FROM fixed_object") + assert list(cur) == [] + + assert ( + "swh.scrubber.fixer", + logging.INFO, + "swh:1:dir:61992617462fff81509bda4a24b54c96ea74a007 not found in origin", + ) in caplog.record_tuples + + +def test_parseable_directory_from_origin( + scrubber_db: ScrubberDb, mocker, caplog +) -> None: + """Corrupt object found with an origin, and the object is found in the origin's + clone as expected.""" + scrubber_db = MagicMock(wraps=scrubber_db) + real_subprocess_run = subprocess.run + + def subprocess_run(args, **kwargs): + (*head, path) = args + assert head == ["git", "clone", "--bare", "http://example.org/"] + real_subprocess_run(["git", "init", "--bare", path]) + object_dir_path = Path(path) / "objects/61" + object_path = object_dir_path / "992617462fff81509bda4a24b54c96ea74a007" + object_dir_path.mkdir() + with open(object_path, "wb") as fd: + fd.write(zlib.compress(ORIGINAL_DIRECTORY.raw_manifest)) + + _run_fixer_with_clone( + scrubber_db, + mocker, + caplog, + corrupt_object=CORRUPT_OBJECT, + subprocess_run_side_effect=subprocess_run, + ) + + scrubber_db.fixed_object_add.assert_called_once() + fixed_objects = list(scrubber_db.fixed_object_iter()) + assert len(fixed_objects) == 1 + + assert fixed_objects[0].id == ORIGINAL_DIRECTORY.swhid() + assert fixed_objects[0].method == "from_origin" + assert ( + Directory.from_dict(kafka_to_value(fixed_objects[0].object_)) + == ORIGINAL_DIRECTORY + ) + + assert caplog.record_tuples == [] + + +def test_unparseable_directory(scrubber_db: ScrubberDb, mocker, caplog) -> None: + """Corrupt object found with an origin, and the object is found in the origin's + clone as expected; but Dulwich cannot parse it. + It was probably loaded by an old version of the loader that was more permissive, + by using libgit2.""" + scrubber_db = MagicMock(wraps=scrubber_db) + real_subprocess_run = subprocess.run + + raw_manifest = b"this is not a parseable manifest" + raw_manifest = f"tree {len(raw_manifest)}\x00".encode() + raw_manifest + + original_directory = Directory( + entries=( + DirectoryEntry( + name=b"dir1", + type="dir", + target=hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), + perms=0o040755, + ), + ), + raw_manifest=raw_manifest, + ) + assert original_directory.id.hex() == "a518fa6b46bad74e95588d2bfdf4455398a2216a" + + corrupt_directory = attr.evolve(original_directory, raw_manifest=None) + corrupt_object = CorruptObject( + id=original_directory.swhid(), + datastore=DATASTORE, + object_=value_to_kafka(corrupt_directory.to_dict()), + first_occurrence=datetime.datetime.now(tz=datetime.timezone.utc), + ) + + def subprocess_run(args, **kwargs): + (*head, path) = args + if head[0:2] != ["git", "clone"]: + return real_subprocess_run(args, **kwargs) + assert head == ["git", "clone", "--bare", "http://example.org/"] + real_subprocess_run(["git", "init", "--bare", path]) + object_dir_path = Path(path) / "objects/a5" + object_path = object_dir_path / "18fa6b46bad74e95588d2bfdf4455398a2216a" + object_dir_path.mkdir() + with open(object_path, "wb") as fd: + fd.write(zlib.compress(raw_manifest)) + + _run_fixer_with_clone( + scrubber_db, + mocker, + caplog, + corrupt_object=corrupt_object, + subprocess_run_side_effect=subprocess_run, + ) + + scrubber_db.fixed_object_add.assert_called_once() + fixed_objects = list(scrubber_db.fixed_object_iter()) + assert len(fixed_objects) == 1 + + assert fixed_objects[0].id == original_directory.swhid() + assert fixed_objects[0].method == "manifest_from_origin" + assert ( + Directory.from_dict(kafka_to_value(fixed_objects[0].object_)) + == original_directory + ) + + assert caplog.record_tuples == [ + ( + "swh.scrubber.fixer", + logging.INFO, + r"Dulwich failed to parse b'tree 32\x00this is not a parseable manifest'", + ) + ] diff --git a/swh/scrubber/utils.py b/swh/scrubber/utils.py index 2d28b51..5a4daaf 100644 --- a/swh/scrubber/utils.py +++ b/swh/scrubber/utils.py @@ -1,34 +1,46 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Callable +from typing import Callable, Optional import psycopg2 from swh.model.swhids import CoreSWHID from .db import CorruptObject, ScrubberDb def iter_corrupt_objects( db: ScrubberDb, start_object: CoreSWHID, end_object: CoreSWHID, + origin_url: Optional[str], cb: Callable[[CorruptObject, psycopg2.extensions.cursor], None], ) -> None: + """Fetches objects and calls ``cb`` on each of them. + + objects are fetched with an update lock, with the same transaction as ``cb``, + which is automatically committed after ``cb`` runs.""" while True: with db.conn, db.cursor() as cur: - corrupt_objects = db.corrupt_object_grab(cur, start_object, end_object,) + if origin_url: + corrupt_objects = db.corrupt_object_grab_by_origin( + cur, origin_url, start_object, end_object + ) + else: + corrupt_objects = db.corrupt_object_grab_by_id( + cur, start_object, end_object + ) if corrupt_objects and corrupt_objects[0].id == start_object: # TODO: don't needlessly fetch duplicate objects del corrupt_objects[0] if not corrupt_objects: # Nothing more to do break for corrupt_object in corrupt_objects: cb(corrupt_object, cur) db.conn.commit() # XXX: is this redundant with db.conn.__exit__? start_object = corrupt_objects[-1].id