diff --git a/swh/vault/cookers/revision_bare.py b/swh/vault/cookers/revision_bare.py new file mode 100644 --- /dev/null +++ b/swh/vault/cookers/revision_bare.py @@ -0,0 +1,227 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import collections +import functools +from itertools import chain, tee, filterfalse +import os +import tarfile +import tempfile + +from dulwich.objects import Blob, Commit, Tree +from dulwich.repo import MemoryRepo, Repo + +from swh.model.hashutil import hash_to_bytehex +from swh.model.identifiers import ( + format_author_data, + format_manifest, + hash_manifest, + identifier_to_str, +) +from swh.storage.algos.dir_iterators import dir_iterator +from swh.vault.cookers.base import BaseVaultCooker +from swh.vault.cookers.utils import revision_log +from swh.vault.to_disk import get_filtered_files_content + +# temp imports +import time +from itertools import islice +import ipdb +import codecs + + +def partition(pred, iterable): + """Use a predicate to partition entries into false entries and true entries""" + t1, t2 = tee(iterable) + return filterfalse(pred, t1), filter(pred, t2) + + +def revision_to_commit_objdata(revision): + + headers = [(b"tree", identifier_to_str(revision["directory"]).encode())] + for parent in revision["parents"]: + if parent: + headers.append((b"parent", identifier_to_str(parent).encode())) + + headers.append((b"author", format_author_data(revision["author"], revision["date"]))) + headers.append((b"committer", format_author_data(revision["committer"], revision["committer_date"]))) + + # Handle extra headers + metadata = revision.get("metadata") or {} + extra_headers = revision.get("extra_headers", ()) + if not extra_headers and "extra_headers" in metadata: + extra_headers = metadata["extra_headers"] + + headers.extend(extra_headers) + out = format_manifest(headers, revision["message"]) + + commit_id = identifier_to_str(hash_manifest("commit", headers, revision["message"])) + + return commit_id, out + + +def revision_to_dulwich_commit(revision, tree): + commit = Commit() + commit.parents = [hash_to_bytehex(rev) for rev in revision["parents"]] + commit.tree = tree + commit.author = revision["author"]["fullname"] + commit.committer = revision["committer"]["fullname"] + commit.author_time = revision["date"]["timestamp"]["seconds"] + commit.commit_time = revision["committer_date"]["timestamp"]["seconds"] + commit.author_timezone = revision["date"]["offset"] * 60 + commit.commit_timezone = revision["committer_date"]["offset"] * 60 + # commit.encoding = b"UTF-8" + commit.message = revision["message"] + if len(revision["extra_headers"]) > 0 and revision["extra_headers"][0][0] == b"gpgsig": + commit.gpgsig = revision["extra_headers"][0][1] + commit.check() + return commit + + +class RevisionBareCooker(BaseVaultCooker): + """Cooker to create a revision_bare bundle """ + + CACHE_TYPE_KEY = "revision_bare" + + def check_exists(self): + return not list(self.storage.revision_missing([self.obj_id])) + + + def process_tree(self, root_tree_id:bytes, known_ids:set): + root_tree = Tree() + dir_stack = [root_tree_id] + new_objs = {root_tree_id: root_tree} + + while dir_stack: + # print("stack:", ' '.join(dir_id[:3].hex() for dir_id in dir_stack)) + dir_id = dir_stack.pop() + + entries = collections.defaultdict(list) + for entry in self.storage.directory_ls(dir_id): + entries[entry["type"]].append(entry) + + for entry in entries["dir"]: + target = entry["target"] + perms = entry["perms"] + name = entry["name"] + + new_objs[dir_id].add(name, perms, hash_to_bytehex(target)) + if target in known_ids: continue #print("Skip dup tree", target[:3].hex()); continue + known_ids.add(target) + dir_stack.append(target) + new_objs[target] = Tree() + + for entry in entries["rev"]: + target = entry["target"] + perms = entry["perms"] + name = entry["name"] + + new_objs[dir_id].add(name, perms, hash_to_bytehex(target)) + if target in known_ids: continue #print("Skip dup subrev", target[:3].hex()); continue + known_ids.add(target) + + is_file_known = (lambda e: e["target"] in known_ids) + new_files, old_files = partition(is_file_known, entries["file"]) + new_files = get_filtered_files_content(self.storage, new_files) + + for entry in chain(old_files, new_files): + target = entry["target"] + perms = entry["perms"] + name = entry["name"] + new_content = entry.get("content") + + new_objs[dir_id].add(name, perms, hash_to_bytehex(target)) + if not new_content: continue #print("Skip dup blob", target[:3].hex()); continue + known_ids.add(target) + blob = Blob.from_string(new_content) + new_objs[target] = blob + + return new_objs + + def prepare_bundle(self): + # with tempfile.TemporaryDirectory(prefix=f"vault-{self.obj_id[:4].hex()}", suffix=".git") as tmp_path: + tmp_path = f"/tmp/vault_{self.obj_id[:4].hex()}_{int(time.time())}.git" + repo_path = tmp_path # os.path.join(tmp_path, "repo.git") + print("Repository path:", repo_path) + + repo = Repo.init_bare(repo_path, mkdir=True) + objstore = repo.object_store + repo.refs[b"refs/heads/master"] = hash_to_bytehex(self.obj_id) # default always master + + known_ids = set() + + revlog = list(revision_log(self.storage, self.obj_id)) + print(f"Revs: #{len(revlog)} {self.obj_id.hex()} -> {revlog[-1]['id'].hex()}") + for revision in revlog: + rev_id = revision["id"] + root_dir_id = revision["directory"] + + if rev_id in known_ids: print("Error: duplicate revision", rev_id.hex()); ipdb.set_trace() + known_ids.add(rev_id) + + if root_dir_id in known_ids: + # print("NotError: duplicate root tree", root_dir_id[:3].hex()) + new_objs = {} + root_tree = objstore[root_dir_id] + else: + new_objs = self.process_tree(root_dir_id, known_ids) + root_tree = new_objs[root_dir_id] + + commit = revision_to_dulwich_commit(revision, root_tree) + new_objs[rev_id] = commit + + # correctness test for each commit + if hash_to_bytehex(rev_id) != commit.id: + print("Commits differ:", rev_id.hex(), commit.id.decode()) + ipdb.set_trace() + + for obj in new_objs.values(): + objstore.add_object(obj) # high disk IO + + print(f"rev {rev_id[:3].hex()} tree {root_dir_id[:3].hex()} new objs {len(new_objs)}") + + with tarfile.open(fileobj=self.fileobj, mode="w:gz") as tar: + tar.add(repo_path, arcname=self.obj_id.hex()) + + +config_yaml = """storage: + cls: remote + url: http://127.1:5002/ +vault: + cls: remote + url: http://127.1:5005/ +celery: + task_broker: amqp://guest:guest@127.1// + task_modules: + - swh.vault.cooking_tasks + task_queues: + - swh.vault.cooking_tasks.SWHBatchCookingTask + - swh.vault.cooking_tasks.SWHCookingTask + +max_bundle_size: 536870912""" + + +def main(): + import yaml + import unittest.mock + from swh.storage import get_storage + import io + + config = yaml.safe_load(config_yaml) + vault = unittest.mock.MagicMock() # get_vault(**config["vault"]) + storage = get_storage(**config["storage"]) + rev_id = bytes.fromhex("275d1b52126674764f0f3d15c73c2add511bd310") + cooker = RevisionBareCooker("revision", rev_id, backend=vault, storage=storage) + cooker.fileobj = io.BytesIO() + + assert cooker.check_exists() + cooker.prepare_bundle() + + with open(f"/tmp/bundle_revbare_{rev_id.hex()}_{int(time.time())}.tar.gz", "wb") as f: + f.write(cooker.fileobj.getvalue()) + + +if __name__ == "__main__": + main()