diff --git a/vlorentz/recover_corrupt_objects.py b/vlorentz/recover_corrupt_objects.py index 61cd4ac..cdb3780 100644 --- a/vlorentz/recover_corrupt_objects.py +++ b/vlorentz/recover_corrupt_objects.py @@ -1,182 +1,189 @@ #!/usr/bin/env python3 # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This is an executable script to update database rows that were corrupted when they were initially loaded -It expects as arguments: +This script reads the storage and journal writer configs from ``SWH_CONFIG_FILENAME``. -1. a DSN to connect to the storage database -2. the URL of a Kafka broker to bootstrap -3. arbitrarily many paths to directories containing ``*.pickle`` and ``*.git_manifest`` - objects, as generated by ``snippets/vlorentz/analyze_consistency_failures.py``. +It expects arbitrarily many paths to directories containing ``*.pickle`` and +``*.git_manifest`` objects, as generated by +``snippets/vlorentz/analyze_consistency_failures.py`` as arguments. """ import glob +import os import pathlib import pickle import sys from typing import Any, Dict, List, Type, Union import attr import dulwich.errors import dulwich.objects import tqdm +import yaml from swh.loader.git import converters from swh.model import git_objects from swh.model.model import BaseModel, Directory, Revision, Release from swh.model.swhids import CoreSWHID, ObjectType from swh.core.api.classes import stream_results from swh.storage.postgresql.storage import Storage from swh.storage.writer import JournalWriter SWHID_TYPE_TO_MODEL_CLASS: Dict[ObjectType, Any] = { ObjectType.DIRECTORY: Directory, ObjectType.RELEASE: Release, ObjectType.REVISION: Revision, } SWHID_TYPE_TO_DULWICH_CLASS: Dict[ObjectType, Any] = { ObjectType.DIRECTORY: dulwich.objects.Tree, ObjectType.RELEASE: dulwich.objects.Tag, ObjectType.REVISION: dulwich.objects.Commit, } SWHID_TYPE_TO_CONVERTER: Dict[ObjectType, Any] = { ObjectType.DIRECTORY: converters.dulwich_tree_to_directory, ObjectType.RELEASE: converters.dulwich_tag_to_release, ObjectType.REVISION: converters.dulwich_commit_to_revision, } def recover_model_object( storage: Storage, journal_writer: JournalWriter, swhid: CoreSWHID, obj: BaseModel, ) -> None: """Takes a fixed model ``obj`` as argument, and overwrites it in Kafka and PostgreSQL. """ obj.check() assert obj.id == swhid.object_id # Overwrite the object in kafka getattr(journal_writer, f"{swhid.object_type.name.lower()}_add")([obj]) # Overwrite the object in postgresql # In order to avoid duplicating the insertion code, we first DELETE the row(s), then # insert it as usual in the same transaction. with storage.db() as db: cur = db.cursor() if swhid.object_type == ObjectType.DIRECTORY: cur.execute("DELETE FROM directory WHERE id=%s", (swhid.object_id,)) cur.execute("SELECT * from directory") storage.directory_add([obj], db=db, cur=cur) entries = tuple(stream_results(storage.directory_get_entries, swhid.object_id, db=db, cur=cur)) raw_manifest = storage.directory_get_raw_manifest([swhid.object_id], db=db, cur=cur)[swhid.object_id] assert set(obj.entries) == set(entries) assert obj.raw_manifest == raw_manifest assert obj.id == swhid.object_id obj.check() elif swhid.object_type == ObjectType.REVISION: cur.execute("DELETE FROM revision_history WHERE id=%s", (swhid.object_id,)) cur.execute("DELETE FROM revision WHERE id=%s", (swhid.object_id,)) storage.revision_add([obj], db=db, cur=cur) assert storage.revision_get([swhid.object_id], db=db, cur=cur)[0] is not None assert [obj] == storage.revision_get([swhid.object_id], db=db, cur=cur) elif swhid.object_type == ObjectType.RELEASE: cur.execute("DELETE FROM release WHERE id=%s", (swhid.object_id,)) storage.release_add([obj], db=db, cur=cur) assert [obj] == storage.release_get([swhid.object_id], db=db, cur=cur) else: assert False, swhid def recover_dict( storage: Storage, journal_writer: JournalWriter, swhid: CoreSWHID, d: Dict[str, Any] ) -> None: """Takes the dict representation of a model object, and passes it to ``recover_model_object``. """ model_obj = SWHID_TYPE_TO_MODEL_CLASS[swhid.object_type].from_dict(d) recover_model_object(storage, journal_writer, swhid, model_obj) def recover_git_manifest( storage: Storage, journal_writer: JournalWriter, swhid: CoreSWHID, b: bytes ) -> None: """Takes a git manifest, parses it (with help from the existing object in the DB if Dulwich cannot parse it), then passes it to ``recover_model_object``. """ (header, content) = b.split(b"\x00", 1) dulwich_cls = SWHID_TYPE_TO_DULWICH_CLASS[swhid.object_type] expected_header = dulwich_cls.type_name + f" {len(content)}".encode() assert header == expected_header, b try: dulwich_obj = dulwich_cls.from_string(content) except dulwich.errors.ObjectFormatException: # not parseable anymore (was probably loaded with libgit2). # Recover the parsed object from the DB, add the raw_manifest, and reinject if swhid.object_type == ObjectType.DIRECTORY: entries = tuple(stream_results_optional(storage.directory_get_entries, swhid.object_id)) model_obj = Directory(id=swhid.object_id, entries=entries) elif swhid.object_type == ObjectType.RELEASE: (model_obj,) = storage.release_get([swhid.object_id]) elif swhid.object_type == ObjectType.REVISION: (model_obj,) = storage.revision_get([swhid.object_id]) else: assert False, swhid # The original object must be corrupt *somehow*, or we wouldn't be here assert model_obj.id != model_obj.compute_hash(), f"{swhid}\n{b.decode()}\n{git_objects.revision_git_object(model_obj).decode()}" model_obj = attr.evolve(model_obj, raw_manifest=b) else: model_obj = SWHID_TYPE_TO_CONVERTER[swhid.object_type](dulwich_obj) recover_model_object(storage, journal_writer, swhid, model_obj) -def main(storage_dbconn: str, kafka_bootstrap: str, dirs: List[pathlib.Path]) -> None: - journal_writer = JournalWriter( - { - "cls": "kafka", - "brokers": [kafka_bootstrap], - "prefix": "swh.journal.objects", - "client_id": "vlorentz-recover_corrupt_objects", - } - ) - # journal_writer = JournalWriter({"cls": "memory"}) +def main(storage_dbconn: str, journal_writer_conf: Dict[str, Any], dirs: List[pathlib.Path]) -> None: + journal_writer = JournalWriter(journal_writer_conf) # must be a direct postgresql storage backend, because we need to control # transactions in order to DELETE + INSERT objects atomically storage = Storage(db=storage_dbconn, objstorage={"cls": "memory"}) manifests = [ file_path for dir_ in dirs for file_path in dir_.glob("*.git_manifest") ] for file_path in tqdm.tqdm(manifests, desc="(1/2) Recovering from manifests"): swhid = CoreSWHID.from_string(file_path.stem) with open(file_path, "rb") as fd: recover_git_manifest(storage, journal_writer, swhid, fd.read()) pickles = [file_path for dir_ in dirs for file_path in dir_.glob("*.pickle")] for file_path in tqdm.tqdm(pickles, desc="(2/2) Recovering from pickles"): swhid = CoreSWHID.from_string(file_path.stem) with open(file_path, "rb") as fd: recover_dict(storage, journal_writer, swhid, pickle.load(fd)) if __name__ == "__main__": - if len(sys.argv) >= 3: - (_, storage_dbconn, kafka_bootstrap, *dirs) = sys.argv - main(storage_dbconn, kafka_bootstrap, list(map(pathlib.Path, dirs))) + if len(sys.argv) >= 1: + dirs = sys.argv[1:] + + config = yaml.safe_load(open(os.getenv("SWH_CONFIG_FILENAME"), "r")) + assert config["storage"]["cls"] in ("local", "postgresql") + try: + storage_dbconn = config["storage"]["db"] + except KeyError: + storage_dbconn = config["storage"]["args"]["db"] + try: + journal_writer_config = config["storage"]["journal_writer"] + except KeyError: + journal_writer_config = config["storage"]["args"]["journal_writer"] + + journal_writer_config["args"]["client_id"] = "snippets.recover_corrupt_objects.saam" + + main(storage_dbconn, journal_writer_config, list(map(pathlib.Path, dirs))) else: print(__doc__) exit(1)