diff --git a/swh/vault/cookers/__init__.py b/swh/vault/cookers/__init__.py --- a/swh/vault/cookers/__init__.py +++ b/swh/vault/cookers/__init__.py @@ -14,6 +14,7 @@ from swh.vault import get_vault from swh.vault.cookers.base import DEFAULT_CONFIG, DEFAULT_CONFIG_PATH from swh.vault.cookers.directory import DirectoryCooker +from swh.vault.cookers.revision_bare import RevisionGitBareCooker from swh.vault.cookers.revision_flat import RevisionFlatCooker from swh.vault.cookers.revision_gitfast import RevisionGitfastCooker @@ -21,6 +22,7 @@ "directory": DirectoryCooker, "revision_flat": RevisionFlatCooker, "revision_gitfast": RevisionGitfastCooker, + "revision_bare": RevisionGitBareCooker, } diff --git a/swh/vault/cookers/revision_bare.py b/swh/vault/cookers/revision_bare.py new file mode 100644 --- /dev/null +++ b/swh/vault/cookers/revision_bare.py @@ -0,0 +1,233 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from __future__ import annotations + +import collections +import functools +from itertools import chain, filterfalse, tee +import logging +import tarfile +import tempfile +import time +from typing import Any, Dict, List, Set, cast + +from dulwich.object_store import BaseObjectStore +from dulwich.objects import Blob, Commit, ShaFile, Tree +from dulwich.repo import Repo + +from swh.model.hashutil import hash_to_bytehex +from swh.vault.cookers.base import BaseVaultCooker +from swh.vault.cookers.utils import revision_log +from swh.vault.to_disk import get_filtered_files_content + +logger = logging.getLogger(__name__) + + +def partition(pred, iterable): + """Use a predicate to partition entries into false entries and true entries""" + t1, t2 = tee(iterable) + return filterfalse(pred, t1), filter(pred, t2) + + +def revision_to_dulwich_commit(revision: Dict[str, Any], tree: Tree) -> Commit: + commit = Commit() + commit.parents = [hash_to_bytehex(rev) for rev in revision["parents"]] + commit.tree = tree + commit.author = revision["author"]["fullname"] + commit.committer = revision["committer"]["fullname"] + commit.author_time = revision["date"]["timestamp"]["seconds"] + commit.commit_time = revision["committer_date"]["timestamp"]["seconds"] + commit.author_timezone = revision["date"]["offset"] * 60 + commit.commit_timezone = revision["committer_date"]["offset"] * 60 + # commit.encoding = b"UTF-8" + commit.message = revision["message"] + if ( + len(revision["extra_headers"]) > 0 + and revision["extra_headers"][0][0] == b"gpgsig" + ): + commit.gpgsig = revision["extra_headers"][0][1] + commit.check() + return commit + + +class RevisionGitBareCooker(BaseVaultCooker): + """Cooker to create a revision_bare bundle """ + + CACHE_TYPE_KEY = "revision_bare" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.known_ids: Set[bytes] = set() + + def check_exists(self) -> bool: + return not next(iter(self.storage.revision_missing([self.obj_id])), None) + + @functools.lru_cache(maxsize=4096) + def directory_entries(self, dir_id: bytes) -> List[Dict[str, Any]]: + """Get the entries of a directory from its object id. + + This function has a cache to avoid doing multiple requests to retrieve + the same entries, as doing the call to storage is expensive. + """ + return self.storage.directory_ls(dir_id) + + def is_file_known(self, file_entry: Dict[str, Any]) -> bool: + return file_entry["target"] in self.known_ids + + def process_tree(self, root_tree_id: bytes) -> Dict[bytes, ShaFile]: + root_tree: Tree = Tree() + dir_stack: List[bytes] = [root_tree_id] + new_objs: Dict[bytes, ShaFile] = {root_tree_id: root_tree} + + while dir_stack: + dir_id = dir_stack.pop() + + entries: Dict[str, List[Dict[str, Any]]] = collections.defaultdict(list) + for entry in self.directory_entries(dir_id): + entries[entry["type"]].append(entry) + + parent_tree = cast(Tree, new_objs[dir_id]) + for entry in entries["dir"]: + target = entry["target"] + parent_tree.add(entry["name"], entry["perms"], hash_to_bytehex(target)) + + if target in self.known_ids: + continue + self.known_ids.add(target) + + dir_stack.append(target) + new_objs[target] = Tree() + + for entry in entries["rev"]: + target = entry["target"] + parent_tree.add(entry["name"], entry["perms"], hash_to_bytehex(target)) + + if target in self.known_ids: + continue + self.known_ids.add(target) + + new_files, old_files = partition(self.is_file_known, entries["file"]) + new_files = list(get_filtered_files_content(self.storage, new_files)) + + for entry in new_files: + target = entry["target"] + self.known_ids.add(target) + new_objs[target] = Blob.from_string(entry["content"]) + + for entry in chain(old_files, new_files): + parent_tree.add( + entry["name"], entry["perms"], hash_to_bytehex(entry["target"]) + ) + + return new_objs + + def prepare_bundle(self) -> None: + with tempfile.TemporaryDirectory( + prefix=f"vault-{self.obj_id[:4].hex()}-", suffix=".git" + ) as repo_path: + logger.debug("Repository path: %s", repo_path) + + repo: Repo = Repo.init_bare(repo_path, mkdir=False) + objstore: BaseObjectStore = repo.object_store + repo.refs[b"refs/heads/master"] = hash_to_bytehex(self.obj_id) + + new_objs: Dict[bytes, ShaFile] + root_tree: Tree + commit: Commit + + revlog: List[Dict[str, Any]] = list(revision_log(self.storage, self.obj_id)) + logger.debug( + f"Revs: #{len(revlog)} {self.obj_id.hex()} -> {revlog[-1]['id'].hex()}" + ) + for i, revision in enumerate(revlog): + progress_msg = "Computing revision %s/%s" % (i, len(revlog)) + self.backend.set_progress(self.obj_type, self.obj_id, progress_msg) + + rev_id: bytes = revision["id"] + root_dir_id: bytes = revision["directory"] + + assert rev_id not in self.known_ids, ( + "duplicate revision %s" % rev_id.hex() + ) + self.known_ids.add(rev_id) + + if root_dir_id in self.known_ids: + # Duplicate root tree can happen, e.g. when commits are reverted + # We retrieve the already processed tree from the object store + new_objs = {} + root_tree = cast(Tree, objstore[root_dir_id]) + else: + # Add each directory and each content to the repo + new_objs = self.process_tree(root_dir_id) + root_tree = cast(Tree, new_objs[root_dir_id]) + + commit = revision_to_dulwich_commit(revision, root_tree) + new_objs[rev_id] = commit + + # Correctness test for each commit + assert ( + hash_to_bytehex(rev_id) == commit.id + ), "commit hash differ: %s != %s" % (rev_id.hex(), commit.id.decode()) + + for obj in new_objs.values(): + objstore.add_object(obj) + + logger.debug( + f"rev {rev_id[:3].hex()} tree {root_dir_id[:3].hex()} " + f"new objs {len(new_objs):3d} " + ) + + with tarfile.open(fileobj=self.fileobj, mode="w:gz") as tar: + tar.add(repo_path, arcname=self.obj_id.hex()) + + +def main(): + # For local testing purpose + import io + import unittest.mock + + import yaml + + from swh.storage import get_storage + + config_yaml = """ + storage: + cls: remote + url: http://127.1:5002/ + vault: + cls: remote + url: http://127.1:5005/ + celery: + task_broker: amqp://guest:guest@127.1// + task_modules: + - swh.vault.cooking_tasks + task_queues: + - swh.vault.cooking_tasks.SWHBatchCookingTask + - swh.vault.cooking_tasks.SWHCookingTask + + max_bundle_size: 536870912""" + logging.basicConfig(level=logging.DEBUG) + + # Real config, but test only based on storage (thus also objstorage) + config = yaml.safe_load(config_yaml) + vault = unittest.mock.MagicMock() + storage = get_storage(**config["storage"]) + # vault.storage = storage + rev_id = bytes.fromhex("275d1b52126674764f0f3d15c73c2add511bd310") + cooker = RevisionGitBareCooker("revision", rev_id, backend=vault, storage=storage) + cooker.fileobj = io.BytesIO() + + assert cooker.check_exists() + cooker.prepare_bundle() + + with open( + f"/tmp/bundle_revbare_{rev_id.hex()}_{int(time.time())}.tar.gz", "wb" + ) as f: + f.write(cooker.fileobj.getvalue()) + + +if __name__ == "__main__": + main() diff --git a/swh/vault/to_disk.py b/swh/vault/to_disk.py --- a/swh/vault/to_disk.py +++ b/swh/vault/to_disk.py @@ -6,7 +6,7 @@ import collections import functools import os -from typing import Any, Dict, Iterator, List +from typing import Any, Dict, Iterable, Iterator from swh.model import hashutil from swh.model.from_disk import DentryPerms, mode_to_perms @@ -22,7 +22,7 @@ def get_filtered_files_content( - storage: StorageInterface, files_data: List[Dict] + storage: StorageInterface, files_data: Iterable[Dict] ) -> Iterator[Dict[str, Any]]: """Retrieve the files specified by files_data and apply filters for skipped and missing contents.