Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/cookers/git_bare.py
Show All 17 Lines | |||||
""" | """ | ||||
import datetime | import datetime | ||||
import os.path | import os.path | ||||
import re | import re | ||||
import subprocess | import subprocess | ||||
import tarfile | import tarfile | ||||
import tempfile | import tempfile | ||||
from typing import Any, Dict, Iterable, List, Set | from typing import Any, Dict, Iterable, List, Optional, Set | ||||
import zlib | import zlib | ||||
from swh.core.api.classes import stream_results | from swh.core.api.classes import stream_results | ||||
from swh.model import identifiers | from swh.model import identifiers | ||||
from swh.model.hashutil import hash_to_bytehex, hash_to_hex | from swh.model.hashutil import hash_to_bytehex, hash_to_hex | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Person, | Person, | ||||
Revision, | Revision, | ||||
RevisionType, | RevisionType, | ||||
Sha1Git, | Sha1Git, | ||||
TargetType, | |||||
TimestampWithTimezone, | TimestampWithTimezone, | ||||
) | ) | ||||
from swh.storage.algos.revisions_walker import DFSRevisionsWalker | from swh.storage.algos.revisions_walker import DFSRevisionsWalker | ||||
from swh.storage.algos.snapshot import snapshot_get_all_branches | |||||
from swh.vault.cookers.base import BaseVaultCooker | from swh.vault.cookers.base import BaseVaultCooker | ||||
from swh.vault.to_disk import HIDDEN_MESSAGE, SKIPPED_MESSAGE | from swh.vault.to_disk import HIDDEN_MESSAGE, SKIPPED_MESSAGE | ||||
REVISION_BATCH_SIZE = 10000 | REVISION_BATCH_SIZE = 10000 | ||||
DIRECTORY_BATCH_SIZE = 10000 | DIRECTORY_BATCH_SIZE = 10000 | ||||
CONTENT_BATCH_SIZE = 100 | CONTENT_BATCH_SIZE = 100 | ||||
class GitBareCooker(BaseVaultCooker): | class GitBareCooker(BaseVaultCooker): | ||||
use_fsck = True | use_fsck = True | ||||
def cache_type_key(self) -> str: | def cache_type_key(self) -> str: | ||||
return self.obj_type | return self.obj_type | ||||
def check_exists(self): | def check_exists(self): | ||||
obj_type = self.obj_type.split("_")[0] | obj_type = self.obj_type.split("_")[0] | ||||
if obj_type == "revision": | if obj_type == "revision": | ||||
return not list(self.storage.revision_missing([self.obj_id])) | return not list(self.storage.revision_missing([self.obj_id])) | ||||
elif obj_type == "directory": | elif obj_type == "directory": | ||||
return not list(self.storage.directory_missing([self.obj_id])) | return not list(self.storage.directory_missing([self.obj_id])) | ||||
if obj_type == "snapshot": | |||||
return not list(self.storage.snapshot_missing([self.obj_id])) | |||||
else: | else: | ||||
raise NotImplementedError(f"GitBareCooker for {obj_type}") | raise NotImplementedError(f"GitBareCooker for {obj_type}") | ||||
def obj_swhid(self) -> identifiers.CoreSWHID: | def obj_swhid(self) -> identifiers.CoreSWHID: | ||||
obj_type = self.obj_type.split("_")[0] | obj_type = self.obj_type.split("_")[0] | ||||
return identifiers.CoreSWHID( | return identifiers.CoreSWHID( | ||||
object_type=identifiers.ObjectType[obj_type.upper()], object_id=self.obj_id, | object_type=identifiers.ObjectType[obj_type.upper()], object_id=self.obj_id, | ||||
) | ) | ||||
Show All 12 Lines | class GitBareCooker(BaseVaultCooker): | ||||
def prepare_bundle(self): | def prepare_bundle(self): | ||||
# Objects we will visit soon: | # Objects we will visit soon: | ||||
self._rev_stack: List[Sha1Git] = [] | self._rev_stack: List[Sha1Git] = [] | ||||
self._dir_stack: List[Sha1Git] = [] | self._dir_stack: List[Sha1Git] = [] | ||||
self._cnt_stack: List[Sha1Git] = [] | self._cnt_stack: List[Sha1Git] = [] | ||||
# Set of objects already in any of the stacks: | # Set of objects already in any of the stacks: | ||||
self._seen: Set[Sha1Git] = set() | self._seen: Set[Sha1Git] = set() | ||||
self._walker_state: Optional[Any] = None | |||||
# Set of errors we expect git-fsck to raise at the end: | # Set of errors we expect git-fsck to raise at the end: | ||||
self._expected_fsck_errors = set() | self._expected_fsck_errors = set() | ||||
with tempfile.TemporaryDirectory(prefix="swh-vault-gitbare-") as workdir: | with tempfile.TemporaryDirectory(prefix="swh-vault-gitbare-") as workdir: | ||||
# Initialize a Git directory | # Initialize a Git directory | ||||
self.workdir = workdir | self.workdir = workdir | ||||
self.gitdir = os.path.join(workdir, "clone.git") | self.gitdir = os.path.join(workdir, "clone.git") | ||||
os.mkdir(self.gitdir) | os.mkdir(self.gitdir) | ||||
self.init_git() | self.init_git() | ||||
# Add the root object to the stack of objects to visit | # Add the root object to the stack of objects to visit | ||||
self.push_subgraph(self.obj_type.split("_")[0], self.obj_id) | self.push_subgraph(self.obj_type.split("_")[0], self.obj_id) | ||||
# Load and write all the objects to disk | # Load and write all the objects to disk | ||||
self.load_objects() | self.load_objects() | ||||
# Write the root object as a ref. | # Write the root object as a ref (this step is skipped if it's a snapshot) | ||||
# This must be done before repacking; git-repack ignores orphan objects. | # This must be done before repacking; git-repack ignores orphan objects. | ||||
self.write_refs() | self.write_refs() | ||||
self.repack() | self.repack() | ||||
self.write_archive() | self.write_archive() | ||||
def init_git(self) -> None: | def init_git(self) -> None: | ||||
subprocess.run(["git", "-C", self.gitdir, "init", "--bare"], check=True) | subprocess.run(["git", "-C", self.gitdir, "init", "--bare"], check=True) | ||||
Show All 31 Lines | def git_fsck(self) -> None: | ||||
unexpected_errors = set(filter(bool, errors)) - self._expected_fsck_errors | unexpected_errors = set(filter(bool, errors)) - self._expected_fsck_errors | ||||
if unexpected_errors: | if unexpected_errors: | ||||
raise Exception( | raise Exception( | ||||
"\n".join( | "\n".join( | ||||
["Unexpected errors from git-fsck:"] + sorted(unexpected_errors) | ["Unexpected errors from git-fsck:"] + sorted(unexpected_errors) | ||||
) | ) | ||||
) | ) | ||||
def write_refs(self): | def write_refs(self, snapshot=None): | ||||
refs: Dict[bytes, bytes] # ref name -> target | |||||
obj_type = self.obj_type.split("_")[0] | obj_type = self.obj_type.split("_")[0] | ||||
if obj_type == "directory": | if obj_type == "directory": | ||||
# We need a synthetic revision pointing to the directory | # We need a synthetic revision pointing to the directory | ||||
author = Person.from_fullname( | author = Person.from_fullname( | ||||
b"swh-vault, git-bare cooker <robot@softwareheritage.org>" | b"swh-vault, git-bare cooker <robot@softwareheritage.org>" | ||||
) | ) | ||||
dt = datetime.datetime.now(tz=datetime.timezone.utc) | dt = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
dt = dt.replace(microsecond=0) # not supported by git | dt = dt.replace(microsecond=0) # not supported by git | ||||
date = TimestampWithTimezone.from_datetime(dt) | date = TimestampWithTimezone.from_datetime(dt) | ||||
revision = Revision( | revision = Revision( | ||||
author=author, | author=author, | ||||
committer=author, | committer=author, | ||||
date=date, | date=date, | ||||
committer_date=date, | committer_date=date, | ||||
message=b"Initial commit", | message=b"Initial commit", | ||||
type=RevisionType.GIT, | type=RevisionType.GIT, | ||||
directory=self.obj_id, | directory=self.obj_id, | ||||
synthetic=True, | synthetic=True, | ||||
) | ) | ||||
self.write_revision_node(revision.to_dict()) | self.write_revision_node(revision.to_dict()) | ||||
head = revision.id | refs = {b"refs/heads/master": hash_to_bytehex(revision.id)} | ||||
elif obj_type == "revision": | elif obj_type == "revision": | ||||
head = self.obj_id | refs = {b"refs/heads/master": hash_to_bytehex(self.obj_id)} | ||||
elif obj_type == "snapshot": | |||||
if snapshot is None: | |||||
# refs were already written in a previous step | |||||
return | |||||
refs = { | |||||
branch_name: ( | |||||
b"ref: " + branch.target | |||||
if branch.target_type == TargetType.ALIAS | |||||
else hash_to_bytehex(branch.target) | |||||
) | |||||
for (branch_name, branch) in snapshot.branches.items() | |||||
} | |||||
else: | else: | ||||
assert False, obj_type | assert False, obj_type | ||||
with open(os.path.join(self.gitdir, "refs", "heads", "master"), "wb") as fd: | for (ref_name, ref_target) in refs.items(): | ||||
fd.write(hash_to_bytehex(head)) | path = os.path.join(self.gitdir.encode(), ref_name) | ||||
os.makedirs(os.path.dirname(path), exist_ok=True) | |||||
with open(path, "wb") as fd: | |||||
fd.write(ref_target) | |||||
def write_archive(self): | def write_archive(self): | ||||
with tarfile.TarFile(mode="w", fileobj=self.fileobj) as tf: | with tarfile.TarFile(mode="w", fileobj=self.fileobj) as tf: | ||||
tf.add(self.gitdir, arcname=f"{self.obj_swhid()}.git", recursive=True) | tf.add(self.gitdir, arcname=f"{self.obj_swhid()}.git", recursive=True) | ||||
def _obj_path(self, obj_id: Sha1Git): | def _obj_path(self, obj_id: Sha1Git): | ||||
return os.path.join(self.gitdir, self._obj_relative_path(obj_id)) | return os.path.join(self.gitdir, self._obj_relative_path(obj_id)) | ||||
Show All 18 Lines | def write_object(self, obj_id: Sha1Git, obj: bytes) -> bool: | ||||
fd.write(data) | fd.write(data) | ||||
return True | return True | ||||
def push_subgraph(self, obj_type, obj_id) -> None: | def push_subgraph(self, obj_type, obj_id) -> None: | ||||
if obj_type == "revision": | if obj_type == "revision": | ||||
self.push_revision_subgraph(obj_id) | self.push_revision_subgraph(obj_id) | ||||
elif obj_type == "directory": | elif obj_type == "directory": | ||||
self._push(self._dir_stack, [obj_id]) | self._push(self._dir_stack, [obj_id]) | ||||
elif obj_type == "snapshot": | |||||
self.push_snapshot_subgraph(obj_id) | |||||
else: | else: | ||||
raise NotImplementedError( | raise NotImplementedError( | ||||
f"GitBareCooker.queue_subgraph({obj_type!r}, ...)" | f"GitBareCooker.queue_subgraph({obj_type!r}, ...)" | ||||
) | ) | ||||
def load_objects(self) -> None: | def load_objects(self) -> None: | ||||
while self._rev_stack or self._dir_stack or self._cnt_stack: | while self._rev_stack or self._dir_stack or self._cnt_stack: | ||||
revision_ids = self._pop(self._rev_stack, REVISION_BATCH_SIZE) | revision_ids = self._pop(self._rev_stack, REVISION_BATCH_SIZE) | ||||
Show All 32 Lines | def push_revision_subgraph(self, obj_id: Sha1Git) -> None: | ||||
else: | else: | ||||
loaded_from_graph = True | loaded_from_graph = True | ||||
if not loaded_from_graph: | if not loaded_from_graph: | ||||
# If swh-graph is not available, or the revision is not yet in | # If swh-graph is not available, or the revision is not yet in | ||||
# swh-graph, fall back to self.storage.revision_log. | # swh-graph, fall back to self.storage.revision_log. | ||||
# self.storage.revision_log also gives us the full revisions, | # self.storage.revision_log also gives us the full revisions, | ||||
# so we load them right now instead of just pushing them on the stack. | # so we load them right now instead of just pushing them on the stack. | ||||
walker = DFSRevisionsWalker(self.storage, obj_id) | walker = DFSRevisionsWalker(self.storage, obj_id, state=self._walker_state) | ||||
for revision in walker: | for revision in walker: | ||||
self.write_revision_node(revision) | self.write_revision_node(revision) | ||||
self._push(self._dir_stack, [revision["directory"]]) | self._push(self._dir_stack, [revision["directory"]]) | ||||
# Save the state, so the next call to the walker won't return the same | |||||
# revisions | |||||
self._walker_state = walker.export_state() | |||||
def push_snapshot_subgraph(self, obj_id: Sha1Git) -> None: | |||||
"""Fetches a snapshot and all its children, and writes them to disk""" | |||||
loaded_from_graph = False | |||||
if self.graph: | |||||
pass # TODO | |||||
# TODO: when self.graph is available and supports edge labels, use it | |||||
# directly to get branch names. | |||||
snapshot = snapshot_get_all_branches(self.storage, obj_id) | |||||
assert snapshot, "Unknown snapshot" # should have been caught by check_exists() | |||||
for branch in snapshot.branches.values(): | |||||
if not loaded_from_graph: | |||||
if branch.target_type == TargetType.REVISION: | |||||
self.push_revision_subgraph(branch.target) | |||||
elif branch.target_type == TargetType.ALIAS: | |||||
# Nothing to do, this for loop also iterates on the target branch | |||||
# (if it exists) | |||||
pass | |||||
else: | |||||
raise NotImplementedError(f"{branch.target_type} branches") | |||||
self.write_refs(snapshot=snapshot) | |||||
def load_revisions(self, obj_ids: List[Sha1Git]) -> None: | def load_revisions(self, obj_ids: List[Sha1Git]) -> None: | ||||
"""Given a list of revision ids, loads these revisions and their directories; | """Given a list of revision ids, loads these revisions and their directories; | ||||
but not their parent revisions.""" | but not their parent revisions.""" | ||||
revisions = self.storage.revision_get(obj_ids) | revisions = self.storage.revision_get(obj_ids) | ||||
for revision in revisions: | for revision in revisions: | ||||
self.write_revision_node(revision.to_dict()) | self.write_revision_node(revision.to_dict()) | ||||
self._push(self._dir_stack, (rev.directory for rev in revisions)) | self._push(self._dir_stack, (rev.directory for rev in revisions)) | ||||
▲ Show 20 Lines • Show All 83 Lines • Show Last 20 Lines |