diff --git a/vlorentz/recover_corrupt_objects.py b/vlorentz/recover_corrupt_objects.py index b51e6e5..01ede38 100644 --- a/vlorentz/recover_corrupt_objects.py +++ b/vlorentz/recover_corrupt_objects.py @@ -1,234 +1,236 @@ #!/usr/bin/env python3 # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This is an executable script to update database rows that were corrupted when they were initially loaded This script reads the storage and journal writer configs from ``SWH_CONFIG_FILENAME``. It expects arbitrarily many paths to directories containing ``*.pickle`` and ``*.git_manifest`` objects, as generated by ``snippets/vlorentz/analyze_consistency_failures.py`` as arguments. """ import glob import logging import os import pathlib import pickle import sys from typing import Any, Dict, List, Type, Union import attr import dulwich.errors import dulwich.objects import tqdm import yaml from swh.loader.git import converters from swh.model import git_objects from swh.model.model import BaseModel, Directory, Revision, Release from swh.model.swhids import CoreSWHID, ObjectType from swh.core.api.classes import stream_results from swh.storage.postgresql.storage import Storage from swh.storage.writer import JournalWriter logger = logging.getLogger(__name__) SWHID_TYPE_TO_MODEL_CLASS: Dict[ObjectType, Any] = { ObjectType.DIRECTORY: Directory, ObjectType.RELEASE: Release, ObjectType.REVISION: Revision, } SWHID_TYPE_TO_DULWICH_CLASS: Dict[ObjectType, Any] = { ObjectType.DIRECTORY: dulwich.objects.Tree, ObjectType.RELEASE: dulwich.objects.Tag, ObjectType.REVISION: dulwich.objects.Commit, } SWHID_TYPE_TO_CONVERTER: Dict[ObjectType, Any] = { ObjectType.DIRECTORY: converters.dulwich_tree_to_directory, ObjectType.RELEASE: converters.dulwich_tag_to_release, ObjectType.REVISION: converters.dulwich_commit_to_revision, } def recover_model_object( storage: Storage, journal_writer: JournalWriter, swhid: CoreSWHID, obj: BaseModel, ) -> None: """Takes a fixed model ``obj`` as argument, and overwrites it in Kafka and PostgreSQL. """ obj.check() assert obj.id == swhid.object_id # Overwrite the object in kafka getattr(journal_writer, f"{swhid.object_type.name.lower()}_add")([obj]) # Overwrite the object in postgresql # In order to avoid duplicating the insertion code, we first DELETE the row(s), then # insert it as usual in the same transaction. with storage.db() as db: with db.transaction() as cur: if swhid.object_type == ObjectType.DIRECTORY: cur.execute("DELETE FROM directory WHERE id=%s", (swhid.object_id,)) storage.directory_add([obj], db=db, cur=cur) entries = tuple( stream_results( storage.directory_get_entries, swhid.object_id, db=db, cur=cur ) ) raw_manifest = storage.directory_get_raw_manifest( [swhid.object_id], db=db, cur=cur )[swhid.object_id] assert set(obj.entries) == set(entries), swhid assert obj.raw_manifest == raw_manifest, swhid assert obj.id == swhid.object_id, swhid obj.check() elif swhid.object_type == ObjectType.REVISION: cur.execute( "DELETE FROM revision_history WHERE id=%s", (swhid.object_id,) ) cur.execute("DELETE FROM revision WHERE id=%s", (swhid.object_id,)) storage.revision_add([obj], db=db, cur=cur) (ret,) = storage.revision_get([swhid.object_id], db=db, cur=cur) assert ret is not None, swhid if obj.author is not None: assert ret.author.fullname == obj.author.fullname, swhid ret = attr.evolve(ret, author=obj.author) else: assert ret.author is None, swhid if obj.committer is not None: assert ret.committer.fullname == obj.committer.fullname, swhid ret = attr.evolve(ret, committer=obj.committer) else: assert ret.committer is None, swhid assert ret == obj, f"{swhid}: Returned {ret!r} != {obj!r}" elif swhid.object_type == ObjectType.RELEASE: cur.execute("DELETE FROM release WHERE id=%s", (swhid.object_id,)) storage.release_add([obj], db=db, cur=cur) (ret,) = storage.release_get([swhid.object_id], db=db, cur=cur) assert ret is not None, swhid if obj.author is not None: assert ret.author.fullname == obj.author.fullname, swhid ret = attr.evolve(ret, author=obj.author) else: assert ret.author is None, swhid assert ret == obj, f"{swhid}: Returned {ret!r} != {obj!r}" else: assert False, swhid def recover_dict( storage: Storage, journal_writer: JournalWriter, swhid: CoreSWHID, d: Dict[str, Any] ) -> None: """Takes the dict representation of a model object, and passes it to ``recover_model_object``. """ model_obj = SWHID_TYPE_TO_MODEL_CLASS[swhid.object_type].from_dict(d) recover_model_object(storage, journal_writer, swhid, model_obj) def recover_git_manifest( storage: Storage, journal_writer: JournalWriter, swhid: CoreSWHID, b: bytes ) -> None: """Takes a git manifest, parses it (with help from the existing object in the DB if Dulwich cannot parse it), then passes it to ``recover_model_object``. """ (header, content) = b.split(b"\x00", 1) dulwich_cls = SWHID_TYPE_TO_DULWICH_CLASS[swhid.object_type] expected_header = dulwich_cls.type_name + f" {len(content)}".encode() assert header == expected_header, b try: dulwich_obj = dulwich_cls.from_string(content) except dulwich.errors.ObjectFormatException: # not parseable anymore (was probably loaded with libgit2). # Recover the parsed object from the DB, add the raw_manifest, and reinject if swhid.object_type == ObjectType.DIRECTORY: - entries = tuple(stream_results_optional(storage.directory_get_entries, swhid.object_id)) + entries = tuple( + stream_results(storage.directory_get_entries, swhid.object_id) + ) model_obj = Directory(id=swhid.object_id, entries=entries) elif swhid.object_type == ObjectType.RELEASE: (model_obj,) = storage.release_get([swhid.object_id]) elif swhid.object_type == ObjectType.REVISION: (model_obj,) = storage.revision_get([swhid.object_id]) else: assert False, swhid # The original object must be corrupt *somehow*, or we wouldn't be here assert model_obj.id != model_obj.compute_hash(), f"{swhid}\n{b.decode()}\n{git_objects.revision_git_object(model_obj).decode()}" model_obj = attr.evolve(model_obj, raw_manifest=b) else: model_obj = SWHID_TYPE_TO_CONVERTER[swhid.object_type](dulwich_obj) recover_model_object(storage, journal_writer, swhid, model_obj) def main(storage_dbconn: str, journal_writer_conf: Dict[str, Any], dirs: List[pathlib.Path]) -> None: journal_writer = JournalWriter(journal_writer_conf) # must be a direct postgresql storage backend, because we need to control # transactions in order to DELETE + INSERT objects atomically storage = Storage(db=storage_dbconn, objstorage={"cls": "memory"}) manifests = [ file_path for dir_ in dirs for file_path in dir_.glob("*.git_manifest") ] logger.info("(1/2) recovering from manifests (count=%s)", len(manifests)) for counter, file_path in enumerate(sorted(manifests)): if counter and counter % 100 == 0: logger.info("Recovered %s/%s manifests", counter, len(manifests)) try: swhid = CoreSWHID.from_string(file_path.stem) with open(file_path, "rb") as fd: recover_git_manifest(storage, journal_writer, swhid, fd.read()) except Exception: logger.exception("Failed to recover %s out of %s:", swhid, file_path) pickles = [file_path for dir_ in dirs for file_path in dir_.glob("*.pickle")] logger.info("(2/2) recovering from pickles (count=%s)", len(pickles)) for counter, file_path in enumerate(sorted(pickles)): if counter and counter % 100 == 0: logger.info("Recovered %s/%s pickles", counter, len(pickles)) try: swhid = CoreSWHID.from_string(file_path.stem) with open(file_path, "rb") as fd: recover_dict(storage, journal_writer, swhid, pickle.load(fd)) except Exception: logger.exception("Failed to recover %s out of %s:", swhid, file_path) if __name__ == "__main__": if len(sys.argv) >= 1: logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s:%(name)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) dirs = sys.argv[1:] config = yaml.safe_load(open(os.getenv("SWH_CONFIG_FILENAME"), "r")) assert config["storage"]["cls"] in ("local", "postgresql") try: storage_dbconn = config["storage"]["db"] except KeyError: storage_dbconn = config["storage"]["args"]["db"] try: journal_writer_config = config["storage"]["journal_writer"] except KeyError: journal_writer_config = config["storage"]["args"]["journal_writer"] journal_writer_config["args"]["client_id"] = "snippets.recover_corrupt_objects.saam" main(storage_dbconn, journal_writer_config, list(map(pathlib.Path, dirs))) else: print(__doc__) exit(1)