diff --git a/swh/vault/cookers/revision_bare.py b/swh/vault/cookers/revision_bare.py new file mode 100644 --- /dev/null +++ b/swh/vault/cookers/revision_bare.py @@ -0,0 +1,229 @@ +# Copyright (C) 2016-2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import collections +import functools +import io +from itertools import islice +import os +from pprint import pprint +import tarfile +import tempfile +import time +import unittest.mock + +from dulwich.objects import Blob, Commit, Tree, format_timezone +from dulwich.repo import MemoryRepo, Repo + +from swh.model.hashutil import hash_to_bytehex, hash_to_hex +from swh.storage import get_storage +from swh.storage.algos.dir_iterators import dir_iterator +from swh.vault.cookers.base import BaseVaultCooker +from swh.vault.cookers.utils import revision_log +from swh.vault.to_disk import apply_chunked, get_filtered_files_content + + +def revision_to_commit_objdata(revision): + from swh.model.identifiers import ( + format_author_data, + format_manifest, + hash_manifest, + identifier_to_str, + ) + + headers = [(b"tree", identifier_to_str(revision["directory"]).encode())] + for parent in revision["parents"]: + if parent: + headers.append((b"parent", identifier_to_str(parent).encode())) + + headers.append( + (b"author", format_author_data(revision["author"], revision["date"])) + ) + headers.append( + ( + b"committer", + format_author_data(revision["committer"], revision["committer_date"]), + ) + ) + + # Handle extra headers + metadata = revision.get("metadata") or {} + extra_headers = revision.get("extra_headers", ()) + if not extra_headers and "extra_headers" in metadata: + extra_headers = metadata["extra_headers"] + + headers.extend(extra_headers) + out = format_manifest(headers, revision["message"]) + + id = identifier_to_str(hash_manifest("commit", headers, revision["message"])) + + return id, out + + +def revision_to_dulwich_commit(revision, tree): + # pprint(revision) + + commit = Commit() + commit.parents = [hash_to_bytehex(rev) for rev in revision["parents"]] + commit.tree = tree + commit.author = revision["author"]["fullname"] + commit.committer = revision["committer"]["fullname"] + commit.author_time = revision["date"]["timestamp"]["seconds"] # check format/value + commit.commit_time = revision["committer_date"]["timestamp"]["seconds"] + commit.author_timezone = revision["date"]["offset"] * 60 # check format/value + commit.commit_timezone = revision["committer_date"]["offset"] * 60 + # commit.encoding = b"UTF-8" + commit.message = revision["message"] + commit.gpgsig = revision["extra_headers"][0][1] # maybe brittle? + commit.check() + + # commit_fields = ('id', 'parents', 'extra', 'commit_time', 'author_time', 'author_timezone', + # 'commit_timezone', 'author', 'committer', 'tree', 'message', 'gpgsig') + # pprint({k: getattr(commit, k) for k in commit_fields}) + + # commit_objdata = commit._header() + b''.join(commit.as_raw_chunks()) + # print("dulwich commit objdata:") + # print(commit_objdata) + + # rev_objid, rev_objdata = revision_to_commit_objdata(revision) + # print("swh commit objdata:") + # print(rev_objid) + # print(rev_objdata) + return commit + + +class RevisionBareCooker(BaseVaultCooker): + """Cooker to create a revision_bare bundle """ + + CACHE_TYPE_KEY = "revision_bare" + + def check_exists(self): + return not list(self.storage.revision_missing([self.obj_id])) + + def prepare_bundle(self): + + with tempfile.TemporaryDirectory(prefix="vault-revision-") as tmp_path: + repo_path = os.path.join(tmp_path, "repo.git") + print("Repository path:", repo_path) + repo = Repo.init_bare(repo_path, mkdir=True) + objstore = repo.object_store + objs = set() + + revlog = list(revision_log(self.storage, self.obj_id)) + print( + f"Revision log: {len(revlog)} entries from {revlog[0]['id'].hex()} to {revlog[-1]['id'].hex()}" + ) + for revision in revlog: + rev_id = revision["id"] + root_dir_id = revision["directory"] + # print("rev:", rev_id.hex(), "root:", root_dir_id.hex()) + + # add each directory and each content to the repo + + entries = collections.defaultdict(list) + for entry in dir_iterator(self.storage, root_dir_id): + entries[entry["type"]].append(entry) + # print("Entries count: file", len(entries["file"]), "dir", len(entries["dir"]), "subrev", len(entries["rev"])) + + root_tree = Tree() + trees = {root_dir_id: root_tree} + # print("Current tree before:", root_tree.id) + + for entry in entries["dir"]: + dir_id, target, perms, name, path = (entry[key] for key in + ["dir_id", "target", "perms", "name", "path"]) + # print("dir :", dir_id.hex()[:6], hash_to_bytehex(target).hex()[:6], perms, path) + + trees[dir_id].add(name, perms, hash_to_bytehex(target)) + trees[target] = Tree() + objs.update(trees.values()) + + for entry in entries["rev"]: + dir_id, target, perms, name, path = (entry[key] for key in + ["dir_id", "target", "perms", "name", "path"]) + # print("rev :", dir_id.hex()[:6], hash_to_bytehex(target).hex()[:6], perms, path) + + trees[dir_id].add(name, perms, hash_to_bytehex(target)) + + f = functools.partial(get_filtered_files_content, self.storage) + files_data = apply_chunked(f, entries["file"], 1000) + for entry in files_data: + dir_id, target, perms, name, content = (entry[key] for key in + ["dir_id", "target", "perms", "name", "content"]) + blob = Blob.from_string(content) + # print("file:", dir_id.hex()[:6], blob.id.hex()[:6], perms, name) + # print(" dat:", blob.data[:64]) + trees[dir_id].add(name, perms, hash_to_bytehex(target)) + objs.add(blob) + + if root_tree.id != hash_to_bytehex(root_dir_id): + print("Trees differ:", root_tree.id, hash_to_bytehex(root_dir_id)) + input() + # print("Current tree after:", root_tree.id, root_dir_id.hex()) + + commit = revision_to_dulwich_commit(revision, root_tree) + + if commit.id != hash_to_bytehex(rev_id): + print("Commits differ:", commit.id, hash_to_bytehex(rev_id)) + input() + objs.add(commit) + + print( + f"rev {rev_id.hex()[:8]} tree {root_dir_id.hex()[:8]} objs {len(objs)} " + f'file {len(entries["file"])} dir {len(entries["dir"])} subrev {len(entries["rev"])}' + ) + + for obj in objs: + objstore.add_object(obj) + repo.refs[b"refs/heads/master"] = commit.id + # input() + + with tarfile.open(fileobj=self.fileobj, mode="w:gz") as tar: + tar.add(repo_path, arcname=hash_to_hex(self.obj_id)) + + +config_yaml = """storage: + cls: remote + url: http://127.1:5002/ +vault: + cls: remote + url: http://127.1:5005/ +celery: + task_broker: amqp://guest:guest@127.1// + task_modules: + - swh.vault.cooking_tasks + task_queues: + - swh.vault.cooking_tasks.SWHBatchCookingTask + - swh.vault.cooking_tasks.SWHCookingTask + +max_bundle_size: 536870912""" + + +def main(): + import yaml + + config = yaml.safe_load(config_yaml) + vault = unittest.mock.MagicMock() # get_vault(**config["vault"]) + storage = get_storage(**config["storage"]) + vault.storage = storage + rev_id = bytes.fromhex("275d1b52126674764f0f3d15c73c2add511bd310") + dir_id = bytes.fromhex("28ed8ee75346dde7547fbc82ddbe0ed7ce2978a7") + cooker = RevisionBareCooker("revision", rev_id, backend=vault, storage=storage) + cooker.fileobj = io.BytesIO() + + # for rev in islice(revision_log(storage, rev_id), 0, 1): + # print(rev["extra_headers"]) + # for entry in islice(dir_iterator(storage, dir_id), 0, 16): + # print(entry["type"][:3], entry["dir_id"].hex()[:6], entry["target"].hex()[:6], entry["path"]) + + assert cooker.check_exists() + cooker.prepare_bundle() + + with open(f"/tmp/bundle_{rev_id.hex()}_{time.time()}") as f: + f.write(cooker.fileobj.getvalue()) + + +if __name__ == "__main__": + main()