diff --git a/vlorentz/recover_corrupt_objects.py b/vlorentz/recover_corrupt_objects.py index 5202f7d..b052bfb 100644 --- a/vlorentz/recover_corrupt_objects.py +++ b/vlorentz/recover_corrupt_objects.py @@ -1,199 +1,214 @@ #!/usr/bin/env python3 # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This is an executable script to update database rows that were corrupted when they were initially loaded This script reads the storage and journal writer configs from ``SWH_CONFIG_FILENAME``. It expects arbitrarily many paths to directories containing ``*.pickle`` and ``*.git_manifest`` objects, as generated by ``snippets/vlorentz/analyze_consistency_failures.py`` as arguments. """ import glob import os import pathlib import pickle import sys from typing import Any, Dict, List, Type, Union import attr import dulwich.errors import dulwich.objects import tqdm import yaml from swh.loader.git import converters from swh.model import git_objects from swh.model.model import BaseModel, Directory, Revision, Release from swh.model.swhids import CoreSWHID, ObjectType from swh.core.api.classes import stream_results from swh.storage.postgresql.storage import Storage from swh.storage.writer import JournalWriter SWHID_TYPE_TO_MODEL_CLASS: Dict[ObjectType, Any] = { ObjectType.DIRECTORY: Directory, ObjectType.RELEASE: Release, ObjectType.REVISION: Revision, } SWHID_TYPE_TO_DULWICH_CLASS: Dict[ObjectType, Any] = { ObjectType.DIRECTORY: dulwich.objects.Tree, ObjectType.RELEASE: dulwich.objects.Tag, ObjectType.REVISION: dulwich.objects.Commit, } SWHID_TYPE_TO_CONVERTER: Dict[ObjectType, Any] = { ObjectType.DIRECTORY: converters.dulwich_tree_to_directory, ObjectType.RELEASE: converters.dulwich_tag_to_release, ObjectType.REVISION: converters.dulwich_commit_to_revision, } def recover_model_object( storage: Storage, journal_writer: JournalWriter, swhid: CoreSWHID, obj: BaseModel, ) -> None: """Takes a fixed model ``obj`` as argument, and overwrites it in Kafka and PostgreSQL. """ obj.check() assert obj.id == swhid.object_id # Overwrite the object in kafka getattr(journal_writer, f"{swhid.object_type.name.lower()}_add")([obj]) # Overwrite the object in postgresql # In order to avoid duplicating the insertion code, we first DELETE the row(s), then # insert it as usual in the same transaction. with storage.db() as db: with db.transaction() as cur: if swhid.object_type == ObjectType.DIRECTORY: cur.execute("DELETE FROM directory WHERE id=%s", (swhid.object_id,)) storage.directory_add([obj], db=db, cur=cur) entries = tuple( stream_results( storage.directory_get_entries, swhid.object_id, db=db, cur=cur ) ) raw_manifest = storage.directory_get_raw_manifest( [swhid.object_id], db=db, cur=cur )[swhid.object_id] - assert set(obj.entries) == set(entries) - assert obj.raw_manifest == raw_manifest - assert obj.id == swhid.object_id + assert set(obj.entries) == set(entries), swhid + assert obj.raw_manifest == raw_manifest, swhid + assert obj.id == swhid.object_id, swhid obj.check() elif swhid.object_type == ObjectType.REVISION: cur.execute( "DELETE FROM revision_history WHERE id=%s", (swhid.object_id,) ) cur.execute("DELETE FROM revision WHERE id=%s", (swhid.object_id,)) storage.revision_add([obj], db=db, cur=cur) - assert ( - storage.revision_get([swhid.object_id], db=db, cur=cur)[0] - is not None - ) - assert [obj] == storage.revision_get([swhid.object_id], db=db, cur=cur) + (ret,) = storage.revision_get([swhid.object_id], db=db, cur=cur) + assert ret is not None, swhid + if obj.author is not None: + assert ret.author.fullname == obj.author.fullname, swhid + ret = attr.evolve(ret, author=obj.author) + else: + assert ret.author is None, swhid + if obj.committer is not None: + assert ret.committer.fullname == obj.committer.fullname, swhid + ret = attr.evolve(ret, committer=obj.committer) + else: + assert ret.committer is None, swhid + assert ret == obj, f"{swhid}: Returned {ret!r} != {obj!r}" elif swhid.object_type == ObjectType.RELEASE: cur.execute("DELETE FROM release WHERE id=%s", (swhid.object_id,)) storage.release_add([obj], db=db, cur=cur) - assert [obj] == storage.release_get([swhid.object_id], db=db, cur=cur) + (ret,) = storage.release_get([swhid.object_id], db=db, cur=cur) + assert ret is not None, swhid + if obj.author is not None: + assert ret.author.fullname == obj.author.fullname, swhid + ret = attr.evolve(ret, author=obj.author) + else: + assert ret.author is None, swhid + assert ret == obj, f"{swhid}: Returned {ret!r} != {obj!r}" else: assert False, swhid def recover_dict( storage: Storage, journal_writer: JournalWriter, swhid: CoreSWHID, d: Dict[str, Any] ) -> None: """Takes the dict representation of a model object, and passes it to ``recover_model_object``. """ model_obj = SWHID_TYPE_TO_MODEL_CLASS[swhid.object_type].from_dict(d) recover_model_object(storage, journal_writer, swhid, model_obj) def recover_git_manifest( storage: Storage, journal_writer: JournalWriter, swhid: CoreSWHID, b: bytes ) -> None: """Takes a git manifest, parses it (with help from the existing object in the DB if Dulwich cannot parse it), then passes it to ``recover_model_object``. """ (header, content) = b.split(b"\x00", 1) dulwich_cls = SWHID_TYPE_TO_DULWICH_CLASS[swhid.object_type] expected_header = dulwich_cls.type_name + f" {len(content)}".encode() assert header == expected_header, b try: dulwich_obj = dulwich_cls.from_string(content) except dulwich.errors.ObjectFormatException: # not parseable anymore (was probably loaded with libgit2). # Recover the parsed object from the DB, add the raw_manifest, and reinject if swhid.object_type == ObjectType.DIRECTORY: entries = tuple(stream_results_optional(storage.directory_get_entries, swhid.object_id)) model_obj = Directory(id=swhid.object_id, entries=entries) elif swhid.object_type == ObjectType.RELEASE: (model_obj,) = storage.release_get([swhid.object_id]) elif swhid.object_type == ObjectType.REVISION: (model_obj,) = storage.revision_get([swhid.object_id]) else: assert False, swhid # The original object must be corrupt *somehow*, or we wouldn't be here assert model_obj.id != model_obj.compute_hash(), f"{swhid}\n{b.decode()}\n{git_objects.revision_git_object(model_obj).decode()}" model_obj = attr.evolve(model_obj, raw_manifest=b) else: model_obj = SWHID_TYPE_TO_CONVERTER[swhid.object_type](dulwich_obj) recover_model_object(storage, journal_writer, swhid, model_obj) def main(storage_dbconn: str, journal_writer_conf: Dict[str, Any], dirs: List[pathlib.Path]) -> None: journal_writer = JournalWriter(journal_writer_conf) # must be a direct postgresql storage backend, because we need to control # transactions in order to DELETE + INSERT objects atomically storage = Storage(db=storage_dbconn, objstorage={"cls": "memory"}) manifests = [ file_path for dir_ in dirs for file_path in dir_.glob("*.git_manifest") ] for file_path in tqdm.tqdm(manifests, desc="(1/2) Recovering from manifests"): swhid = CoreSWHID.from_string(file_path.stem) with open(file_path, "rb") as fd: recover_git_manifest(storage, journal_writer, swhid, fd.read()) pickles = [file_path for dir_ in dirs for file_path in dir_.glob("*.pickle")] for file_path in tqdm.tqdm(pickles, desc="(2/2) Recovering from pickles"): swhid = CoreSWHID.from_string(file_path.stem) with open(file_path, "rb") as fd: recover_dict(storage, journal_writer, swhid, pickle.load(fd)) if __name__ == "__main__": if len(sys.argv) >= 1: dirs = sys.argv[1:] config = yaml.safe_load(open(os.getenv("SWH_CONFIG_FILENAME"), "r")) assert config["storage"]["cls"] in ("local", "postgresql") try: storage_dbconn = config["storage"]["db"] except KeyError: storage_dbconn = config["storage"]["args"]["db"] try: journal_writer_config = config["storage"]["journal_writer"] except KeyError: journal_writer_config = config["storage"]["args"]["journal_writer"] journal_writer_config["args"]["client_id"] = "snippets.recover_corrupt_objects.saam" main(storage_dbconn, journal_writer_config, list(map(pathlib.Path, dirs))) else: print(__doc__) exit(1)