diff --git a/vlorentz/recover_corrupt_objects.py b/vlorentz/recover_corrupt_objects.py new file mode 100644 index 0000000..059ca6a --- /dev/null +++ b/vlorentz/recover_corrupt_objects.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""This is an executable script to update database rows that were corrupted +when they were initially loaded + +It expects as arguments: + +1. a DSN to connect to the storage database +2. the URL of a Kafka broker to bootstrap +3. arbitrarily many paths to directories containing ``*.pickle`` and ``*.git_manifest`` + objects, as generated by ``snippets/vlorentz/analyze_consistency_failures.py``. +""" + +import glob +import pathlib +import pickle +import sys +from typing import Any, Dict, List, Type, Union + +import attr +import dulwich.errors +import dulwich.objects +import tqdm + +from swh.loader.git import converters +from swh.model.model import BaseModel, Directory, Revision, Release +from swh.model.swhids import CoreSWHID, ObjectType +from swh.storage.postgresql.storage import Storage +from swh.storage.writer import JournalWriter + +SWHID_TYPE_TO_MODEL_CLASS: Dict[ObjectType, Any] = { + ObjectType.DIRECTORY: Directory, + ObjectType.RELEASE: Release, + ObjectType.REVISION: Revision, +} + + +SWHID_TYPE_TO_DULWICH_CLASS: Dict[ObjectType, Any] = { + ObjectType.DIRECTORY: dulwich.objects.Tree, + ObjectType.RELEASE: dulwich.objects.Tag, + ObjectType.REVISION: dulwich.objects.Commit, +} + + +SWHID_TYPE_TO_CONVERTER: Dict[ObjectType, Any] = { + ObjectType.DIRECTORY: converters.dulwich_tree_to_directory, + ObjectType.RELEASE: converters.dulwich_tag_to_release, + ObjectType.REVISION: converters.dulwich_commit_to_revision, +} + + +def recover_model_object( + storage: Storage, journal_writer: JournalWriter, swhid: CoreSWHID, obj: BaseModel, +) -> None: + """Takes a fixed model ``obj`` as argument, and overwrites it in Kafka + and PostgreSQL. + """ + obj.check() + assert obj.id == swhid.object_id + + # Overwrite the object in kafka + getattr(journal_writer, f"{swhid.object_type.name.lower()}_add")([obj]) + + # Overwrite the object in postgresql + # In order to avoid duplicating the insertion code, we first DELETE the row(s), then + # insert it as usual in the same transaction. + with storage.db() as db: + cur = db.cursor() + if swhid.object_type == ObjectType.DIRECTORY: + cur.execute("DELETE FROM directory WHERE id=%s", (swhid.object_id,)) + storage.directory_add([obj], db=db, cur=cur) + elif swhid.object_type == ObjectType.REVISION: + cur.execute("DELETE FROM revision_history WHERE id=%s", (swhid.object_id,)) + cur.execute("DELETE FROM revision WHERE id=%s", (swhid.object_id,)) + storage.revision_add([obj], db=db, cur=cur) + elif swhid.object_type == ObjectType.RELEASE: + cur.execute("DELETE FROM release WHERE id=%s", (swhid.object_id,)) + storage.release_add([obj], db=db, cur=cur) + else: + assert False, swhid + + +def recover_dict( + storage: Storage, journal_writer: JournalWriter, swhid: CoreSWHID, d: Dict[str, Any] +) -> None: + """Takes the dict representation of a model object, and passes it to + ``recover_model_object``. + """ + model_obj = SWHID_TYPE_TO_MODEL_CLASS[swhid.object_type].from_dict(d) + + recover_model_object(storage, journal_writer, swhid, model_obj) + + +def recover_git_manifest( + storage: Storage, journal_writer: JournalWriter, swhid: CoreSWHID, b: bytes +) -> None: + """Takes a git manifest, parses it (with help from the existing object in the DB + if Dulwich cannot parse it), then passes it to ``recover_model_object``. + """ + (header, content) = b.split(b"\x00", 1) + dulwich_cls = SWHID_TYPE_TO_DULWICH_CLASS[swhid.object_type] + expected_header = dulwich_cls.type_name + f" {len(content)}".encode() + assert header == expected_header, b + try: + dulwich_obj = dulwich_cls.from_string(content) + except dulwich.errors.ObjectFormatException: + # not parseable anymore (was probably loaded with libgit2). + # Recover the parsed object from the DB, add the raw_manifest, and reinject + if swhid.object_type == ObjectType.DIRECTORY: + entries = tuple(stream_results_optional(storage.directory_get_entries, swhid.object_id)) + model_obj = Directory(id=swhid.object_id, entries=entries) + elif swhid.object_type == ObjectType.RELEASE: + (model_obj,) = storage.release_get([swhid.object_id]) + elif swhid.object_type == ObjectType.REVISION: + (model_obj,) = storage.revision_get([swhid.object_id]) + else: + assert False, swhid + + # The original object must be corrupt *somehow*, or we wouldn't be here + print(swhid, model_obj) + from swh.model import git_objects + assert model_obj.id != model_obj.compute_hash(), f"{swhid}\n{b.decode()}\n{git_objects.revision_git_object(model_obj).decode()}" + + model_obj = attr.evolve(model_obj, raw_manifest=b) + else: + model_obj = SWHID_TYPE_TO_CONVERTER[swhid.object_type](dulwich_obj) + + recover_model_object(storage, journal_writer, swhid, model_obj) + + +def main(storage_dbconn: str, kafka_bootstrap: str, dirs: List[pathlib.Path]) -> None: + journal_writer = JournalWriter( + { + "cls": "kafka", + "brokers": [kafka_bootstrap], + "prefix": "swh.journal.objects", + "client_id": "vlorentz-recover_corrupt_objects", + } + ) + # journal_writer = JournalWriter({"cls": "memory"}) + + # must be a direct postgresql storage backend, because we need to control + # transactions in order to DELETE + INSERT objects atomically + storage = Storage(db=storage_dbconn, objstorage={"cls": "memory"}) + + manifests = [ + file_path for dir_ in dirs for file_path in dir_.glob("*.git_manifest") + ] + for file_path in tqdm.tqdm(manifests, desc="(1/2) Recovering from manifests"): + swhid = CoreSWHID.from_string(file_path.stem) + with open(file_path, "rb") as fd: + recover_git_manifest(storage, journal_writer, swhid, fd.read()) + + pickles = [file_path for dir_ in dirs for file_path in dir_.glob("*.pickle")] + for file_path in tqdm.tqdm(pickles, desc="(2/2) Recovering from pickles"): + swhid = CoreSWHID.from_string(file_path.stem) + with open(file_path, "rb") as fd: + recover_dict(storage, journal_writer, swhid, pickle.load(fd)) + + +if __name__ == "__main__": + if len(sys.argv) >= 3: + (_, storage_dbconn, kafka_bootstrap, *dirs) = sys.argv + main(storage_dbconn, kafka_bootstrap, list(map(pathlib.Path, dirs))) + else: + print(__doc__) + exit(1)