diff --git a/swh/vault/cookers/git_bare.py b/swh/vault/cookers/git_bare.py --- a/swh/vault/cookers/git_bare.py +++ b/swh/vault/cookers/git_bare.py @@ -13,22 +13,18 @@ 2. Calls ``git repack`` to pack all these objects into git packfiles. 3. Creates a tarball of the resulting repository -To avoid downloading and writing the same objects twice, -it checks the existence of the object file in the temporary directory. -To avoid sending a syscall every time, it also uses ``functools.lru_cache``, -as a first layer of cache before checking the file's existence. +It keeps a set of all written (or about-to-be-written) object hashes in memory +to avoid downloading and writing the same objects twice. """ import datetime -import functools import os.path import subprocess import tarfile import tempfile -from typing import Any, Callable, Dict, List +from typing import Any, Dict, Iterable, List, Set import zlib -from swh.core.utils import grouper from swh.graph.client import GraphArgumentException from swh.model import identifiers from swh.model.hashutil import hash_to_bytehex, hash_to_hex @@ -43,10 +39,12 @@ from swh.vault.cookers.base import BaseVaultCooker REVISION_BATCH_SIZE = 10000 +DIRECTORY_BATCH_SIZE = 10000 +CONTENT_BATCH_SIZE = 100 class GitBareCooker(BaseVaultCooker): - use_fsck = True + use_fsck = False def cache_type_key(self) -> str: return self.obj_type @@ -66,7 +64,25 @@ object_type=identifiers.ObjectType[obj_type.upper()], object_id=self.obj_id, ) + def _push(self, stack: List[Sha1Git], obj_ids: Iterable[Sha1Git]) -> None: + assert not isinstance(obj_ids, bytes) + revision_ids = [id_ for id_ in obj_ids if id_ not in self._seen] + self._seen.update(revision_ids) + stack.extend(revision_ids) + + def _pop(self, stack: List[Sha1Git], n: int) -> List[Sha1Git]: + obj_ids = stack[-n:] + stack[-n:] = [] + return obj_ids + def prepare_bundle(self): + # Objects we will visit soon: + self._rev_stack: List[Sha1Git] = [] + self._dir_stack: List[Sha1Git] = [] + self._cnt_stack: List[Sha1Git] = [] + # Set of objects already in any of the stacks: + self._seen: Set[Sha1Git] = set() + with tempfile.TemporaryDirectory(prefix="swh-vault-gitbare-") as workdir: # Initialize a Git directory self.workdir = workdir @@ -74,8 +90,11 @@ os.mkdir(self.gitdir) self.init_git() + # Add the root object to the stack of objects to visit + self.push_subgraph(self.obj_type.split("_")[0], self.obj_id) + # Load and write all the objects to disk - self.load_subgraph(self.obj_type.split("_")[0], self.obj_id) + self.load_objects() # Write the root object as a ref. # This must be done before repacking; git-repack ignores orphan objects. @@ -157,15 +176,28 @@ fd.write(data) return True - def load_subgraph(self, obj_type, obj_id) -> None: + def push_subgraph(self, obj_type, obj_id) -> None: if obj_type == "revision": - self.load_revision_subgraph(obj_id) + self.push_revision_subgraph(obj_id) elif obj_type == "directory": - self.load_directory_subgraph(obj_id) + self._push(self._dir_stack, [obj_id]) else: - raise NotImplementedError(f"GitBareCooker.load_subgraph({obj_type!r}, ...)") + raise NotImplementedError( + f"GitBareCooker.queue_subgraph({obj_type!r}, ...)" + ) + + def load_objects(self) -> None: + while self._rev_stack or self._dir_stack or self._cnt_stack: + revision_ids = self._pop(self._rev_stack, REVISION_BATCH_SIZE) + self.load_revisions(revision_ids) + + directory_ids = self._pop(self._dir_stack, DIRECTORY_BATCH_SIZE) + self.load_directories(directory_ids) + + content_ids = self._pop(self._cnt_stack, CONTENT_BATCH_SIZE) + self.load_contents(content_ids) - def load_revision_subgraph(self, obj_id: Sha1Git) -> None: + def push_revision_subgraph(self, obj_id: Sha1Git) -> None: """Fetches a revision and all its children, and writes them to disk""" loaded_from_graph = False @@ -183,8 +215,7 @@ self.graph.visit_nodes(str(obj_swhid), edges="rev:rev"), ) ) - for revision_id_group in grouper(revision_ids, REVISION_BATCH_SIZE): - self.load_revisions_and_directory_subgraphs(revision_id_group) + self._push(self._rev_stack, revision_ids) except GraphArgumentException: # Revision not found in the graph pass @@ -194,56 +225,52 @@ if not loaded_from_graph: # If swh-graph is not available, or the revision is not yet in # swh-graph, fall back to self.storage.revision_log. + # self.storage.revision_log also gives us the full revisions, + # so we load them right now instead of just pushing them on the stack. walker = DFSRevisionsWalker(self.storage, obj_id) for revision in walker: self.write_revision_node(revision) - self.load_directory_subgraph(revision["directory"]) + self._push(self._dir_stack, [revision["directory"]]) - def load_revisions_and_directory_subgraphs(self, obj_ids: List[Sha1Git]) -> None: + def load_revisions(self, obj_ids: List[Sha1Git]) -> None: """Given a list of revision ids, loads these revisions and their directories; but not their parent revisions.""" revisions = self.storage.revision_get(obj_ids) for revision in revisions: self.write_revision_node(revision.to_dict()) - self.load_directory_subgraph(revision.directory) + self._push(self._dir_stack, (rev.directory for rev in revisions)) def write_revision_node(self, revision: Dict[str, Any]) -> bool: """Writes a revision object to disk""" git_object = identifiers.revision_git_object(revision) return self.write_object(revision["id"], git_object) - @functools.lru_cache(10240) - def load_directory_subgraph(self, obj_id: Sha1Git) -> None: - """Fetches a directory and all its children, and writes them to disk""" - if self.object_exists(obj_id): - # Checks if the object is already written on disk. - # This rarely happens thanks to @lru_cache() - return - directory = self.load_directory_node(obj_id) - entry_loaders: Dict[str, Callable[[Sha1Git], None]] = { - "file": self.load_content, - "dir": self.load_directory_subgraph, - "rev": self.load_revision_subgraph, - } - for entry in directory["entries"]: - entry_loader = entry_loaders[entry["type"]] - entry_loader(entry["target"]) + def load_directories(self, obj_ids: List[Sha1Git]) -> None: + for obj_id in obj_ids: + self.load_directory(obj_id) - def load_directory_node(self, obj_id: Sha1Git) -> Dict[str, Any]: - """Fetches a directory, writes it to disk (non-recursively), and returns it.""" + def load_directory(self, obj_id: Sha1Git) -> None: + # Load the directory entries = list(self.storage.directory_ls(obj_id, recursive=False)) directory = {"id": obj_id, "entries": entries} git_object = identifiers.directory_git_object(directory) self.write_object(obj_id, git_object) - return directory - @functools.lru_cache(10240) - def load_content(self, obj_id: Sha1Git) -> None: - if self.object_exists(obj_id): - # Checks if the object is already written on disk. - # This rarely happens thanks to @lru_cache() - return + # Add children to the stack + entry_loaders: Dict[str, List[Sha1Git]] = { + "file": self._cnt_stack, + "dir": self._dir_stack, + "rev": self._rev_stack, + } + for entry in directory["entries"]: + stack = entry_loaders[entry["type"]] + self._push(stack, [entry["target"]]) + + def load_contents(self, obj_ids: List[Sha1Git]) -> None: + for obj_id in obj_ids: + self.load_content(obj_id) + def load_content(self, obj_id: Sha1Git) -> None: # TODO: add support of filtered objects, somehow? # It's tricky, because, by definition, we can't write a git object with # the expected hash, so git-fsck *will* choke on it.