Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/cookers/git_bare.py
Show All 19 Lines | |||||
""" | """ | ||||
import datetime | import datetime | ||||
import functools | import functools | ||||
import os.path | import os.path | ||||
import subprocess | import subprocess | ||||
import tarfile | import tarfile | ||||
import tempfile | import tempfile | ||||
from typing import Any, Callable, Dict | from typing import Any, Callable, Dict, List | ||||
import zlib | import zlib | ||||
from swh.core.utils import grouper | |||||
from swh.model import identifiers | from swh.model import identifiers | ||||
from swh.model.hashutil import hash_to_bytehex, hash_to_hex | from swh.model.hashutil import hash_to_bytehex, hash_to_hex | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Person, | Person, | ||||
Revision, | Revision, | ||||
RevisionType, | RevisionType, | ||||
Sha1Git, | Sha1Git, | ||||
TimestampWithTimezone, | TimestampWithTimezone, | ||||
) | ) | ||||
from swh.storage.algos.revisions_walker import DFSRevisionsWalker | from swh.storage.algos.revisions_walker import DFSRevisionsWalker | ||||
from swh.vault.cookers.base import BaseVaultCooker | from swh.vault.cookers.base import BaseVaultCooker | ||||
REVISION_BATCH_SIZE = 10000 | |||||
class GitBareCooker(BaseVaultCooker): | class GitBareCooker(BaseVaultCooker): | ||||
use_fsck = True | use_fsck = True | ||||
def cache_type_key(self) -> str: | def cache_type_key(self) -> str: | ||||
return self.obj_type | return self.obj_type | ||||
def check_exists(self): | def check_exists(self): | ||||
obj_type = self.obj_type.split("_")[0] | obj_type = self.obj_type.split("_")[0] | ||||
if obj_type == "revision": | if obj_type == "revision": | ||||
return not list(self.storage.revision_missing([self.obj_id])) | return not list(self.storage.revision_missing([self.obj_id])) | ||||
elif obj_type == "directory": | elif obj_type == "directory": | ||||
return not list(self.storage.directory_missing([self.obj_id])) | return not list(self.storage.directory_missing([self.obj_id])) | ||||
else: | else: | ||||
raise NotImplementedError(f"GitBareCooker for {obj_type}") | raise NotImplementedError(f"GitBareCooker for {obj_type}") | ||||
def obj_swhid(self) -> identifiers.CoreSWHID: | def obj_swhid(self) -> identifiers.CoreSWHID: | ||||
obj_type = self.obj_type.split("_")[0] | obj_type = self.obj_type.split("_")[0] | ||||
return identifiers.CoreSWHID( | return identifiers.CoreSWHID( | ||||
object_type=identifiers.ObjectType[obj_type.upper()], object_id=self.obj_id, | object_type=identifiers.ObjectType[obj_type.upper()], object_id=self.obj_id, | ||||
) | ) | ||||
def prepare_bundle(self): | def prepare_bundle(self): | ||||
with tempfile.TemporaryDirectory() as workdir: | with tempfile.TemporaryDirectory(prefix="swh-vault-gitbare-") as workdir: | ||||
olasd: Sneaky hunk from another commit, I guess (no need for re-review) | |||||
# Initialize a Git directory | # Initialize a Git directory | ||||
self.workdir = workdir | self.workdir = workdir | ||||
self.gitdir = os.path.join(workdir, "clone.git") | self.gitdir = os.path.join(workdir, "clone.git") | ||||
os.mkdir(self.gitdir) | os.mkdir(self.gitdir) | ||||
self.init_git() | self.init_git() | ||||
# Load and write all the objects to disk | # Load and write all the objects to disk | ||||
self.load_subgraph(self.obj_type.split("_")[0], self.obj_id) | self.load_subgraph(self.obj_type.split("_")[0], self.obj_id) | ||||
▲ Show 20 Lines • Show All 83 Lines • ▼ Show 20 Lines | def load_subgraph(self, obj_type, obj_id) -> None: | ||||
self.load_revision_subgraph(obj_id) | self.load_revision_subgraph(obj_id) | ||||
elif obj_type == "directory": | elif obj_type == "directory": | ||||
self.load_directory_subgraph(obj_id) | self.load_directory_subgraph(obj_id) | ||||
else: | else: | ||||
raise NotImplementedError(f"GitBareCooker.load_subgraph({obj_type!r}, ...)") | raise NotImplementedError(f"GitBareCooker.load_subgraph({obj_type!r}, ...)") | ||||
def load_revision_subgraph(self, obj_id: Sha1Git) -> None: | def load_revision_subgraph(self, obj_id: Sha1Git) -> None: | ||||
"""Fetches a revision and all its children, and writes them to disk""" | """Fetches a revision and all its children, and writes them to disk""" | ||||
loaded_from_graph = False | |||||
if self.graph: | |||||
# First, try to cook using swh-graph, as it is more efficient than | |||||
# swh-storage for querying the history | |||||
obj_swhid = identifiers.CoreSWHID( | |||||
object_type=identifiers.ObjectType.REVISION, object_id=obj_id, | |||||
) | |||||
revision_ids = ( | |||||
swhid.object_id | |||||
for swhid in map( | |||||
identifiers.CoreSWHID.from_string, | |||||
self.graph.visit_nodes(str(obj_swhid), edges="rev:rev"), | |||||
) | |||||
) | |||||
for revision_id_group in grouper(revision_ids, REVISION_BATCH_SIZE): | |||||
loaded_from_graph = True | |||||
self.load_revisions_and_directory_subgraphs(revision_id_group) | |||||
if not loaded_from_graph: | |||||
# If swh-graph is not available, or the revision is not yet in | |||||
# swh-graph, fall back to self.storage.revision_log. | |||||
walker = DFSRevisionsWalker(self.storage, obj_id) | walker = DFSRevisionsWalker(self.storage, obj_id) | ||||
for revision in walker: | for revision in walker: | ||||
self.write_revision_node(revision) | self.write_revision_node(revision) | ||||
self.load_directory_subgraph(revision["directory"]) | self.load_directory_subgraph(revision["directory"]) | ||||
def load_revisions_and_directory_subgraphs(self, obj_ids: List[Sha1Git]) -> None: | |||||
"""Given a list of revision ids, loads these revisions and their directories; | |||||
but not their parent revisions.""" | |||||
revisions = self.storage.revision_get(obj_ids) | |||||
for revision in revisions: | |||||
self.write_revision_node(revision.to_dict()) | |||||
self.load_directory_subgraph(revision.directory) | |||||
def write_revision_node(self, revision: Dict[str, Any]) -> bool: | def write_revision_node(self, revision: Dict[str, Any]) -> bool: | ||||
"""Writes a revision object to disk""" | """Writes a revision object to disk""" | ||||
git_object = identifiers.revision_git_object(revision) | git_object = identifiers.revision_git_object(revision) | ||||
return self.write_object(revision["id"], git_object) | return self.write_object(revision["id"], git_object) | ||||
@functools.lru_cache(10240) | @functools.lru_cache(10240) | ||||
def load_directory_subgraph(self, obj_id: Sha1Git) -> None: | def load_directory_subgraph(self, obj_id: Sha1Git) -> None: | ||||
"""Fetches a directory and all its children, and writes them to disk""" | """Fetches a directory and all its children, and writes them to disk""" | ||||
Show All 35 Lines |
Sneaky hunk from another commit, I guess (no need for re-review)