Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/cookers/git_bare.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
""" | """ | ||||
This cooker creates tarballs containing a bare .git directory, | This cooker creates tarballs containing a bare .git directory, | ||||
that can be unpacked and cloned like any git repository. | that can be unpacked and cloned like any git repository. | ||||
It works in three steps: | It works in three steps: | ||||
1. Write objects one by one in :file:`.git/objects/` | 1. Write objects one by one in :file:`.git/objects/` | ||||
2. Calls ``git repack`` to pack all these objects into git packfiles. | 2. Calls ``git repack`` to pack all these objects into git packfiles. | ||||
3. Creates a tarball of the resulting repository | 3. Creates a tarball of the resulting repository | ||||
To avoid downloading and writing the same objects twice, | It keeps a set of all written (or about-to-be-written) object hashes in memory | ||||
it checks the existence of the object file in the temporary directory. | to avoid downloading and writing the same objects twice. | ||||
To avoid sending a syscall every time, it also uses ``functools.lru_cache``, | |||||
as a first layer of cache before checking the file's existence. | |||||
""" | """ | ||||
import datetime | import datetime | ||||
import functools | |||||
import os.path | import os.path | ||||
import subprocess | import subprocess | ||||
import tarfile | import tarfile | ||||
import tempfile | import tempfile | ||||
from typing import Any, Callable, Dict, List | from typing import Any, Dict, Iterable, List, Set | ||||
import zlib | import zlib | ||||
from swh.core.utils import grouper | |||||
from swh.graph.client import GraphArgumentException | from swh.graph.client import GraphArgumentException | ||||
from swh.model import identifiers | from swh.model import identifiers | ||||
from swh.model.hashutil import hash_to_bytehex, hash_to_hex | from swh.model.hashutil import hash_to_bytehex, hash_to_hex | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Person, | Person, | ||||
Revision, | Revision, | ||||
RevisionType, | RevisionType, | ||||
Sha1Git, | Sha1Git, | ||||
TimestampWithTimezone, | TimestampWithTimezone, | ||||
) | ) | ||||
from swh.storage.algos.revisions_walker import DFSRevisionsWalker | from swh.storage.algos.revisions_walker import DFSRevisionsWalker | ||||
from swh.vault.cookers.base import BaseVaultCooker | from swh.vault.cookers.base import BaseVaultCooker | ||||
REVISION_BATCH_SIZE = 10000 | REVISION_BATCH_SIZE = 10000 | ||||
DIRECTORY_BATCH_SIZE = 10000 | |||||
CONTENT_BATCH_SIZE = 100 | |||||
class GitBareCooker(BaseVaultCooker): | class GitBareCooker(BaseVaultCooker): | ||||
use_fsck = True | use_fsck = False | ||||
def cache_type_key(self) -> str: | def cache_type_key(self) -> str: | ||||
return self.obj_type | return self.obj_type | ||||
def check_exists(self): | def check_exists(self): | ||||
obj_type = self.obj_type.split("_")[0] | obj_type = self.obj_type.split("_")[0] | ||||
if obj_type == "revision": | if obj_type == "revision": | ||||
return not list(self.storage.revision_missing([self.obj_id])) | return not list(self.storage.revision_missing([self.obj_id])) | ||||
elif obj_type == "directory": | elif obj_type == "directory": | ||||
return not list(self.storage.directory_missing([self.obj_id])) | return not list(self.storage.directory_missing([self.obj_id])) | ||||
else: | else: | ||||
raise NotImplementedError(f"GitBareCooker for {obj_type}") | raise NotImplementedError(f"GitBareCooker for {obj_type}") | ||||
def obj_swhid(self) -> identifiers.CoreSWHID: | def obj_swhid(self) -> identifiers.CoreSWHID: | ||||
obj_type = self.obj_type.split("_")[0] | obj_type = self.obj_type.split("_")[0] | ||||
return identifiers.CoreSWHID( | return identifiers.CoreSWHID( | ||||
object_type=identifiers.ObjectType[obj_type.upper()], object_id=self.obj_id, | object_type=identifiers.ObjectType[obj_type.upper()], object_id=self.obj_id, | ||||
) | ) | ||||
def _push(self, stack: List[Sha1Git], obj_ids: Iterable[Sha1Git]) -> None: | |||||
assert not isinstance(obj_ids, bytes) | |||||
revision_ids = [id_ for id_ in obj_ids if id_ not in self._seen] | |||||
self._seen.update(revision_ids) | |||||
stack.extend(revision_ids) | |||||
def _pop(self, stack: List[Sha1Git], n: int) -> List[Sha1Git]: | |||||
obj_ids = stack[-n:] | |||||
stack[-n:] = [] | |||||
return obj_ids | |||||
def prepare_bundle(self): | def prepare_bundle(self): | ||||
# Objects we will visit soon: | |||||
self._rev_stack: List[Sha1Git] = [] | |||||
self._dir_stack: List[Sha1Git] = [] | |||||
self._cnt_stack: List[Sha1Git] = [] | |||||
# Set of objects already in any of the stacks: | |||||
self._seen: Set[Sha1Git] = set() | |||||
with tempfile.TemporaryDirectory(prefix="swh-vault-gitbare-") as workdir: | with tempfile.TemporaryDirectory(prefix="swh-vault-gitbare-") as workdir: | ||||
olasd: Sneaky hunk from another commit, I guess (no need for re-review) | |||||
# Initialize a Git directory | # Initialize a Git directory | ||||
self.workdir = workdir | self.workdir = workdir | ||||
self.gitdir = os.path.join(workdir, "clone.git") | self.gitdir = os.path.join(workdir, "clone.git") | ||||
os.mkdir(self.gitdir) | os.mkdir(self.gitdir) | ||||
self.init_git() | self.init_git() | ||||
# Add the root object to the stack of objects to visit | |||||
self.push_subgraph(self.obj_type.split("_")[0], self.obj_id) | |||||
# Load and write all the objects to disk | # Load and write all the objects to disk | ||||
self.load_subgraph(self.obj_type.split("_")[0], self.obj_id) | self.load_objects() | ||||
# Write the root object as a ref. | # Write the root object as a ref. | ||||
# This must be done before repacking; git-repack ignores orphan objects. | # This must be done before repacking; git-repack ignores orphan objects. | ||||
self.write_refs() | self.write_refs() | ||||
self.repack() | self.repack() | ||||
self.write_archive() | self.write_archive() | ||||
▲ Show 20 Lines • Show All 65 Lines • ▼ Show 20 Lines | def write_object(self, obj_id: Sha1Git, obj: bytes) -> bool: | ||||
# Git requires objects to be zlib-compressed; but repacking decompresses and | # Git requires objects to be zlib-compressed; but repacking decompresses and | ||||
# removes them, so we don't need to compress them too much. | # removes them, so we don't need to compress them too much. | ||||
data = zlib.compress(obj, level=1) | data = zlib.compress(obj, level=1) | ||||
with open(self._obj_path(obj_id), "wb") as fd: | with open(self._obj_path(obj_id), "wb") as fd: | ||||
fd.write(data) | fd.write(data) | ||||
return True | return True | ||||
def load_subgraph(self, obj_type, obj_id) -> None: | def push_subgraph(self, obj_type, obj_id) -> None: | ||||
if obj_type == "revision": | if obj_type == "revision": | ||||
self.load_revision_subgraph(obj_id) | self.push_revision_subgraph(obj_id) | ||||
elif obj_type == "directory": | elif obj_type == "directory": | ||||
self.load_directory_subgraph(obj_id) | self._push(self._dir_stack, [obj_id]) | ||||
else: | else: | ||||
raise NotImplementedError(f"GitBareCooker.load_subgraph({obj_type!r}, ...)") | raise NotImplementedError( | ||||
f"GitBareCooker.queue_subgraph({obj_type!r}, ...)" | |||||
) | |||||
def load_objects(self) -> None: | |||||
while self._rev_stack or self._dir_stack or self._cnt_stack: | |||||
revision_ids = self._pop(self._rev_stack, REVISION_BATCH_SIZE) | |||||
self.load_revisions(revision_ids) | |||||
directory_ids = self._pop(self._dir_stack, DIRECTORY_BATCH_SIZE) | |||||
self.load_directories(directory_ids) | |||||
content_ids = self._pop(self._cnt_stack, CONTENT_BATCH_SIZE) | |||||
self.load_contents(content_ids) | |||||
def load_revision_subgraph(self, obj_id: Sha1Git) -> None: | def push_revision_subgraph(self, obj_id: Sha1Git) -> None: | ||||
"""Fetches a revision and all its children, and writes them to disk""" | """Fetches a revision and all its children, and writes them to disk""" | ||||
loaded_from_graph = False | loaded_from_graph = False | ||||
if self.graph: | if self.graph: | ||||
# First, try to cook using swh-graph, as it is more efficient than | # First, try to cook using swh-graph, as it is more efficient than | ||||
# swh-storage for querying the history | # swh-storage for querying the history | ||||
obj_swhid = identifiers.CoreSWHID( | obj_swhid = identifiers.CoreSWHID( | ||||
object_type=identifiers.ObjectType.REVISION, object_id=obj_id, | object_type=identifiers.ObjectType.REVISION, object_id=obj_id, | ||||
) | ) | ||||
try: | try: | ||||
revision_ids = ( | revision_ids = ( | ||||
swhid.object_id | swhid.object_id | ||||
for swhid in map( | for swhid in map( | ||||
identifiers.CoreSWHID.from_string, | identifiers.CoreSWHID.from_string, | ||||
self.graph.visit_nodes(str(obj_swhid), edges="rev:rev"), | self.graph.visit_nodes(str(obj_swhid), edges="rev:rev"), | ||||
) | ) | ||||
) | ) | ||||
for revision_id_group in grouper(revision_ids, REVISION_BATCH_SIZE): | self._push(self._rev_stack, revision_ids) | ||||
self.load_revisions_and_directory_subgraphs(revision_id_group) | |||||
except GraphArgumentException: | except GraphArgumentException: | ||||
# Revision not found in the graph | # Revision not found in the graph | ||||
pass | pass | ||||
else: | else: | ||||
loaded_from_graph = True | loaded_from_graph = True | ||||
if not loaded_from_graph: | if not loaded_from_graph: | ||||
# If swh-graph is not available, or the revision is not yet in | # If swh-graph is not available, or the revision is not yet in | ||||
# swh-graph, fall back to self.storage.revision_log. | # swh-graph, fall back to self.storage.revision_log. | ||||
# self.storage.revision_log also gives us the full revisions, | |||||
# so we load them right now instead of just pushing them on the stack. | |||||
walker = DFSRevisionsWalker(self.storage, obj_id) | walker = DFSRevisionsWalker(self.storage, obj_id) | ||||
for revision in walker: | for revision in walker: | ||||
self.write_revision_node(revision) | self.write_revision_node(revision) | ||||
self.load_directory_subgraph(revision["directory"]) | self._push(self._dir_stack, [revision["directory"]]) | ||||
def load_revisions_and_directory_subgraphs(self, obj_ids: List[Sha1Git]) -> None: | def load_revisions(self, obj_ids: List[Sha1Git]) -> None: | ||||
"""Given a list of revision ids, loads these revisions and their directories; | """Given a list of revision ids, loads these revisions and their directories; | ||||
but not their parent revisions.""" | but not their parent revisions.""" | ||||
revisions = self.storage.revision_get(obj_ids) | revisions = self.storage.revision_get(obj_ids) | ||||
for revision in revisions: | for revision in revisions: | ||||
self.write_revision_node(revision.to_dict()) | self.write_revision_node(revision.to_dict()) | ||||
self.load_directory_subgraph(revision.directory) | self._push(self._dir_stack, (rev.directory for rev in revisions)) | ||||
def write_revision_node(self, revision: Dict[str, Any]) -> bool: | def write_revision_node(self, revision: Dict[str, Any]) -> bool: | ||||
"""Writes a revision object to disk""" | """Writes a revision object to disk""" | ||||
git_object = identifiers.revision_git_object(revision) | git_object = identifiers.revision_git_object(revision) | ||||
return self.write_object(revision["id"], git_object) | return self.write_object(revision["id"], git_object) | ||||
@functools.lru_cache(10240) | def load_directories(self, obj_ids: List[Sha1Git]) -> None: | ||||
def load_directory_subgraph(self, obj_id: Sha1Git) -> None: | for obj_id in obj_ids: | ||||
"""Fetches a directory and all its children, and writes them to disk""" | self.load_directory(obj_id) | ||||
if self.object_exists(obj_id): | |||||
# Checks if the object is already written on disk. | |||||
# This rarely happens thanks to @lru_cache() | |||||
return | |||||
directory = self.load_directory_node(obj_id) | |||||
entry_loaders: Dict[str, Callable[[Sha1Git], None]] = { | |||||
"file": self.load_content, | |||||
"dir": self.load_directory_subgraph, | |||||
"rev": self.load_revision_subgraph, | |||||
} | |||||
for entry in directory["entries"]: | |||||
entry_loader = entry_loaders[entry["type"]] | |||||
entry_loader(entry["target"]) | |||||
def load_directory_node(self, obj_id: Sha1Git) -> Dict[str, Any]: | def load_directory(self, obj_id: Sha1Git) -> None: | ||||
"""Fetches a directory, writes it to disk (non-recursively), and returns it.""" | # Load the directory | ||||
entries = list(self.storage.directory_ls(obj_id, recursive=False)) | entries = list(self.storage.directory_ls(obj_id, recursive=False)) | ||||
directory = {"id": obj_id, "entries": entries} | directory = {"id": obj_id, "entries": entries} | ||||
git_object = identifiers.directory_git_object(directory) | git_object = identifiers.directory_git_object(directory) | ||||
self.write_object(obj_id, git_object) | self.write_object(obj_id, git_object) | ||||
return directory | |||||
@functools.lru_cache(10240) | # Add children to the stack | ||||
def load_content(self, obj_id: Sha1Git) -> None: | entry_loaders: Dict[str, List[Sha1Git]] = { | ||||
if self.object_exists(obj_id): | "file": self._cnt_stack, | ||||
# Checks if the object is already written on disk. | "dir": self._dir_stack, | ||||
# This rarely happens thanks to @lru_cache() | "rev": self._rev_stack, | ||||
return | } | ||||
for entry in directory["entries"]: | |||||
stack = entry_loaders[entry["type"]] | |||||
self._push(stack, [entry["target"]]) | |||||
def load_contents(self, obj_ids: List[Sha1Git]) -> None: | |||||
for obj_id in obj_ids: | |||||
self.load_content(obj_id) | |||||
def load_content(self, obj_id: Sha1Git) -> None: | |||||
# TODO: add support of filtered objects, somehow? | # TODO: add support of filtered objects, somehow? | ||||
# It's tricky, because, by definition, we can't write a git object with | # It's tricky, because, by definition, we can't write a git object with | ||||
# the expected hash, so git-fsck *will* choke on it. | # the expected hash, so git-fsck *will* choke on it. | ||||
content_sha1 = self.storage.content_find({"sha1_git": obj_id})[0].sha1 | content_sha1 = self.storage.content_find({"sha1_git": obj_id})[0].sha1 | ||||
content = self.storage.content_get_data(content_sha1) | content = self.storage.content_get_data(content_sha1) | ||||
self.write_object(obj_id, f"blob {len(content)}\0".encode("ascii") + content) | self.write_object(obj_id, f"blob {len(content)}\0".encode("ascii") + content) |
Sneaky hunk from another commit, I guess (no need for re-review)