diff --git a/vlorentz/recheck_consistency.py b/vlorentz/recheck_consistency.py new file mode 100644 index 0000000..fea6998 --- /dev/null +++ b/vlorentz/recheck_consistency.py @@ -0,0 +1,520 @@ +""" +Kafka +----- + +Automatically manages parallelism and restart on error + +Syntax:: + + ./recheck_consistency.py kafka + + +Postgresql +---------- + +Checks all objects between two given hashes (inclusive). +Needs manual splitting and error management. + +Syntax + + ./recheck_consistency.py postgres {directory,release,revision} + +For example, to handle the first 1/16th directories: + + ./recheck_consistency.py postgres directory 00 01 +""" + + +import difflib +import hashlib +import json +import os +import multiprocessing +import pathlib +import pickle +import secrets +import subprocess +import sys +import traceback +from typing import Dict + +import dulwich.client +import dulwich.errors +import dulwich.object_store +import dulwich.pack +import dulwich.repo +import tqdm + +from swh.graph.client import RemoteGraphClient, GraphArgumentException +from swh.loader.git.converters import ( + dulwich_tree_to_directory, + dulwich_commit_to_revision, + dulwich_tag_to_release, +) +from swh.model import model +from swh.model.swhids import ExtendedSWHID +from swh.model.hashutil import hash_to_bytes, hash_to_hex, hash_to_bytehex +from swh.model.git_objects import ( + directory_git_object, + release_git_object, + revision_git_object, +) +from swh.storage import get_storage +from swh.core.api.classes import stream_results +from swh.core.utils import grouper + +CLONES_BASE_DIR = pathlib.Path( + "/srv/softwareheritage/cassandra-test-0/scratch/integrity_clones/" +).expanduser() + +MANAGER = multiprocessing.Manager() + +CLONED_ORIGINS_PATH = "analyze_consistency_failures/cloned_origins.json" +CLONED_ORIGINS: Dict[str, None] # used like a set + +if os.path.exists(CLONED_ORIGINS_PATH): + with open(CLONED_ORIGINS_PATH, "rt") as fd: + CLONED_ORIGINS = MANAGER.dict(json.load(fd)) + # CLONED_ORIGINS = {k: None for (k, v) in json.load(fd).items() if "linux" in k} +else: + CLONED_ORIGINS = MANAGER.dict() + # CLONED_ORIGINS = dict() + +graph = RemoteGraphClient("http://graph.internal.softwareheritage.org:5009/graph/") +graph2 = RemoteGraphClient("http://localhost:5009/graph/") + +################################ +# Local clones manipulation + + +def get_clone_path(origin_url): + if "linux" in origin_url: + # linux.git is very big and there are lots of forks... let's fetch them all + # in the same clone or it going to take forever to clone them all. + return CLONES_BASE_DIR / "linux.git" + else: + origin_id = model.Origin(url=origin_url).swhid() + dirname = f"{origin_id}_{origin_url.replace('/', '_')}" + return CLONES_BASE_DIR / dirname + + +def clone(origin_url): + print(f"Cloning {origin_url}") + if "linux" in origin_url: + # linux.git is very big and there are lots of forks... let's fetch them all + # in the same clone or it going to take forever to clone them all. + if origin_url in CLONED_ORIGINS: + return + clone_path = get_clone_path(origin_url) + subprocess.run( + ["git", "-C", clone_path, "fetch", origin_url], + env={**os.environ, "GIT_TERMINAL_PROMPT": "0"}, + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, + ) + CLONED_ORIGINS[origin_url] = None + else: + clone_path = get_clone_path(origin_url) + if not clone_path.is_dir(): + # print("Cloning", origin_url) + subprocess.run( + ["git", "clone", "--bare", origin_url, clone_path], + env={**os.environ, "GIT_TERMINAL_PROMPT": "0"}, + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, + ) + + +def get_object_from_clone(origin_url, obj_id): + clone_path = get_clone_path(origin_url) + try: + repo = dulwich.repo.Repo(str(clone_path)) + except dulwich.errors.NotGitRepository: + return None + + with repo: # needed to avoid packfile fd leaks + try: + return repo[hash_to_bytehex(obj_id)] + except dulwich.errors.ObjectFormatException: + # fallback to git if dulwich can't parse it + object_type = ( + subprocess.check_output( + ["git", "-C", clone_path, "cat-file", "-t", hash_to_hex(obj_id)] + ) + .decode() + .strip() + ) + manifest = subprocess.check_output( + ["git", "-C", clone_path, "cat-file", object_type, hash_to_hex(obj_id)] + ) + print(f"Dulwich failed to parse: {manifest!r}") + traceback.print_exc() + + +################################ +# Object recovery from origins + + +def get_object_from_origins(swhid, stored_obj): + print(f"Looking for {swhid}") + obj_id = swhid.object_id + (success, res) = get_origins(swhid, stored_obj) + if not success: + return (False, res) + else: + origins = res + for origin_url in origins: + if not origin_url.endswith(".git"): + origin_url += ".git" + if origin_url == "https://github.com/reingart/python.git": + # Fails very often... + raise + # continue + + data = b"0032want " + hash_to_bytehex(obj_id) + b"\n" + if swhid.object_type == model.ObjectType.REVISION: + for parent in stored_obj.parents: + data += b"0032have " + hash_to_bytehex(parent) + b"\n" + elif swhid.object_type == model.ObjectType.RELEASE: + data += b"0032have " + hash_to_bytehex(stored_obj.target) + b"\n" + data += b"0000" + data += b"0009done\n" + + clone_path = get_clone_path(origin_url) + if not clone_path.is_dir(): + try: + clone(origin_url) + except subprocess.CalledProcessError: + continue + elif "linux" in origin_url: + try: + clone(origin_url) + except subprocess.CalledProcessError: + continue + + try: + cloned_obj = get_object_from_clone(origin_url, obj_id) + except KeyError: + # try next origin + continue + if cloned_obj is None: + return (False, "found_but_unparseable") + break + else: + return ( + False, + f"unrecoverable_{swhid.object_type.value}_no-origin " + f"(tried: {' '.join(origins)})", + ) + + return (True, origin_url, cloned_obj) + + +def get_origins(swhid, stored_obj): + dir_ = f"graph_backward_leaves/{hash_to_hex(swhid.object_id)[0:2]}" + os.makedirs(dir_, exist_ok=True) + graph_cache_file = f"{dir_}/{swhid}.txt" + if os.path.isfile(graph_cache_file): + with open(graph_cache_file) as fd: + origin_swhids = [ + ExtendedSWHID.from_string(line.strip()) for line in fd if line.strip() + ] + else: + for _ in range(10): + try: + origin_swhids = [ + ExtendedSWHID.from_string(line) + for line in graph.leaves(swhid, direction="backward") + if line.startswith("swh:1:ori:") + ] + except GraphArgumentException: + # try again with the local graph (more up to date, but partial) + try: + origin_swhids = [ + ExtendedSWHID.from_string(line) + for line in graph2.leaves(swhid, direction="backward") + if line.startswith("swh:1:ori:") + ] + except GraphArgumentException: + return ( + False, + f"unrecoverable_{swhid.object_type.value}_not-in-swh-graph", + ) + except: + pass + else: + break + except: + raise + pass + else: + break + else: + return (False, f"unrecoverable_{swhid.object_type.value}_swh-graph-crashes") + tmp_path = graph_cache_file + ".tmp" + secrets.token_hex(8) + with open(tmp_path, "wt") as fd: + fd.write("\n".join(map(str, origin_swhids))) + fd.write("\n") + os.rename(tmp_path, graph_cache_file) # atomic + origins = [ + origin["url"] + for origin in storage.origin_get_by_sha1( + [origin_swhid.object_id for origin_swhid in origin_swhids] + ) + ] + + # swh-graph results are in non-deterministic order; so a bit of sorting avoids + # fetching lots of different forks of the same project. + # And for big projects with lots of forks and/or broken commits, + # let's manually hardcode the repo with the most commits. + PRIOTIZED_ORIGINS = [ + "https://github.com/torvalds/linux.git", + "https://github.com/git/git.git", + "https://github.com/nixos/nixpkgs.git", + ] + origins.sort(key=lambda url: "" if url in PRIOTIZED_ORIGINS else url) + + return (True, origins) + + +################################ +# Orchestration + + +def write_fixed_object(swhid, obj): + dir_path = os.path.join("recheck_consistency", hash_to_hex(swhid.object_id)[0:2]) + os.makedirs(dir_path, exist_ok=True) + with open(f"{dir_path}/{swhid}.pickle", "wb") as fd: + pickle.dump(obj.to_dict(), fd) + + +def handle_mismatch(object_type, swhid, stored_obj): + obj_id = swhid.object_id + res = get_object_from_origins(swhid, stored_obj) + if res[0]: + # successfully recovered + (_, origin_url, cloned_dulwich_obj) = res + else: + (_, bucket) = res + print(f"Failed to recover {swhid}. Cause: {bucket}") + return + + object_header = ( + cloned_dulwich_obj.type_name + + b" " + + str(cloned_dulwich_obj.raw_length()).encode() + + b"\x00" + ) + cloned_manifest = object_header + cloned_dulwich_obj.as_raw_string() + rehash = hashlib.sha1(cloned_manifest).digest() + assert ( + obj_id == rehash + ), f"Mismatch between origin hash and original object: {obj_id.hex()} != {rehash.hex()}" + + if object_type == "revision": + cloned_obj = dulwich_commit_to_revision(cloned_dulwich_obj) + roundtripped_cloned_manifest = revision_git_object(cloned_obj) + elif object_type == "directory": + cloned_obj = dulwich_tree_to_directory(cloned_dulwich_obj) + roundtripped_cloned_manifest = directory_git_object(cloned_obj) + elif object_type == "release": + cloned_obj = dulwich_tag_to_release(cloned_dulwich_obj) + roundtripped_cloned_manifest = release_git_object(cloned_obj) + else: + assert False, object_type + + if roundtripped_cloned_manifest != cloned_manifest: + print(f"manifest for {swhid} not round-tripped:") + print( + "\n".join( + difflib.ndiff( + cloned_manifest.split(b"\x00", 1)[1] + .decode(errors="backslashreplace") + .split("\n"), + roundtripped_cloned_manifest.split(b"\x00", 1)[1] + .decode(errors="backslashreplace") + .split("\n"), + ) + ) + ) + raise ValueError() + + write_fixed_object(swhid, cloned_obj) + print(f"Recovered {swhid}") + + +def process_objects(object_type, objects): + for object_ in objects: + real_id = object_.compute_hash() + if object_.id != real_id: + handle_mismatch(object_type, object_.swhid(), object_) + + with open(CLONED_ORIGINS_PATH) as fd: + CLONED_ORIGINS.update(json.load(fd)) + data = json.dumps(dict(CLONED_ORIGINS)) + + tmp_path = CLONED_ORIGINS_PATH + ".tmp" + secrets.token_hex(8) + with open(tmp_path, "wt") as fd: + fd.write(data) + os.rename(tmp_path, CLONED_ORIGINS_PATH) # atomic + + +def process_dicts(all_dicts): + for (object_type, dicts) in all_dicts.items(): + cls = getattr(model, object_type.capitalize()) + process_objects(object_type, map(cls.from_dict, dicts)) + + +################################ +# Reading existing data + + +def journal_main(): + from swh.journal.client import get_journal_client + + # import logging + # logging.basicConfig(level=logging.DEBUG) + + config = { + "sasl.mechanism": "SCRAM-SHA-512", + "security.protocol": "SASL_SSL", + "sasl.username": "swh-vlorentz", + "sasl.password": os.environ["KAFKA_SASL_PASSWORD"], + "privileged": True, + "message.max.bytes": 524288000, + # "debug": "consumer", + # "debug": "all", + } + + client = get_journal_client( + "kafka", + brokers=[f"broker{i}.journal.softwareheritage.org:9093" for i in range(1, 5)], + group_id="swh-vlorentz-T75-recheck-consistency", + # object_types=["directory", "snapshot"], + object_types=["directory", "revision", "snapshot", "release"], + auto_offset_reset="earliest", + **config, + ) + + try: + client.process(process_dicts) + except KeyboardInterrupt: + print("Called Ctrl-C, exiting.") + exit(0) + + +def postgres_main(object_type, from_id, to_id): + storage = get_storage( + cls="postgresql", db="service=swh-replica", objstorage={"cls": "memory"} + ) + + db = storage.get_db() + cur = db.cursor() + + cur.execute( + f""" + SELECT n_live_tup + FROM pg_stat_user_tables + where relname = %s + ORDER BY relname DESC ; + """, + (object_type,), + ) + ((total_objects,),) = cur.fetchall() + + range_size = int.from_bytes(to_id, byteorder="big") - int.from_bytes( + from_id, byteorder="big" + ) + max_range_size = int.from_bytes(b"\xff" * 20, byteorder="big") - int.from_bytes( + b"\x00" * 20, byteorder="big" + ) + range_coverage_ratio = range_size / max_range_size + estimated_objects = total_objects * range_coverage_ratio + + assert estimated_objects >= 1, estimated_objects # that's a very small interval + + # to make the range inclusive on the left + cur.execute(f"SELECT id FROM {object_type} WHERE id = %s" "", (from_id,)) + + with tqdm.tqdm(total=estimated_objects, unit_scale=True) as pbar: + while True: + # index-only scan + cur.execute( + f""" + SELECT id + FROM {object_type} + WHERE id > %s and id <= %s + ORDER BY id + LIMIT 10000 + """, + (from_id, to_id), + ) + ids = [id_ for (id_,) in cur.fetchall()] + if not ids: + break + + assert all(id_ > from_id for id_ in ids) + from_id = max(ids) + + for id_group in grouper(ids, 100): + id_group = list(id_group) + if object_type == "directory": + manifests = storage.directory_get_raw_manifest(id_group, db=db, cur=cur) + objects = ( + model.Directory( + id=id_, + entries=tuple( + stream_results(storage.directory_get_entries, id_, db=db, cur=cur) + ), + raw_manifest=manifests[id_], + ) + for id_ in id_group + ) + elif object_type == "release": + objects = storage.release_get(id_group) + elif object_type == "revision": + objects = storage.revision_get(id_group) + else: + assert False, object_type + + process_objects(object_type, objects) + pbar.update(len(id_group)) + + +if __name__ == "__main__": + storage = get_storage( + "pipeline", + steps=[ + dict(cls="retry"), + dict( + cls="remote", url="http://webapp1.internal.softwareheritage.org:5002/" + ), + ], + ) + + try: + (_, mode, *args) = sys.argv + if mode == "kafka": + () = args + elif mode == "postgres": + (type_, from_id, to_id) = args + else: + raise ValueError() + except ValueError: + print(__doc__) + sys.exit(1) + + if mode == "kafka": + journal_main() + elif mode == "postgres": + postgres_main( + type_, + hash_to_bytes(from_id.ljust(40, "0")), + hash_to_bytes(to_id.ljust(40, "0")), + ) + else: + assert False