Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/cookers/git_bare.py
Show All 13 Lines | |||||
3. Creates a tarball of the resulting repository | 3. Creates a tarball of the resulting repository | ||||
It keeps a set of all written (or about-to-be-written) object hashes in memory | It keeps a set of all written (or about-to-be-written) object hashes in memory | ||||
to avoid downloading and writing the same objects twice. | to avoid downloading and writing the same objects twice. | ||||
""" | """ | ||||
import datetime | import datetime | ||||
import os.path | import os.path | ||||
import re | |||||
import subprocess | import subprocess | ||||
import tarfile | import tarfile | ||||
import tempfile | import tempfile | ||||
from typing import Any, Dict, Iterable, List, Set | from typing import Any, Dict, Iterable, List, Set | ||||
import zlib | import zlib | ||||
from swh.core.api.classes import stream_results | from swh.core.api.classes import stream_results | ||||
from swh.graph.client import GraphArgumentException | from swh.graph.client import GraphArgumentException | ||||
from swh.model import identifiers | from swh.model import identifiers | ||||
from swh.model.hashutil import hash_to_bytehex, hash_to_hex | from swh.model.hashutil import hash_to_bytehex, hash_to_hex | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Person, | Person, | ||||
Revision, | Revision, | ||||
RevisionType, | RevisionType, | ||||
Sha1Git, | Sha1Git, | ||||
TimestampWithTimezone, | TimestampWithTimezone, | ||||
) | ) | ||||
from swh.storage.algos.revisions_walker import DFSRevisionsWalker | from swh.storage.algos.revisions_walker import DFSRevisionsWalker | ||||
from swh.vault.cookers.base import BaseVaultCooker | from swh.vault.cookers.base import BaseVaultCooker | ||||
from swh.vault.to_disk import HIDDEN_MESSAGE, SKIPPED_MESSAGE | |||||
REVISION_BATCH_SIZE = 10000 | REVISION_BATCH_SIZE = 10000 | ||||
DIRECTORY_BATCH_SIZE = 10000 | DIRECTORY_BATCH_SIZE = 10000 | ||||
CONTENT_BATCH_SIZE = 100 | CONTENT_BATCH_SIZE = 100 | ||||
class GitBareCooker(BaseVaultCooker): | class GitBareCooker(BaseVaultCooker): | ||||
use_fsck = False | use_fsck = True | ||||
def cache_type_key(self) -> str: | def cache_type_key(self) -> str: | ||||
return self.obj_type | return self.obj_type | ||||
def check_exists(self): | def check_exists(self): | ||||
obj_type = self.obj_type.split("_")[0] | obj_type = self.obj_type.split("_")[0] | ||||
if obj_type == "revision": | if obj_type == "revision": | ||||
return not list(self.storage.revision_missing([self.obj_id])) | return not list(self.storage.revision_missing([self.obj_id])) | ||||
Show All 19 Lines | def _pop(self, stack: List[Sha1Git], n: int) -> List[Sha1Git]: | ||||
stack[-n:] = [] | stack[-n:] = [] | ||||
return obj_ids | return obj_ids | ||||
def prepare_bundle(self): | def prepare_bundle(self): | ||||
# Objects we will visit soon: | # Objects we will visit soon: | ||||
self._rev_stack: List[Sha1Git] = [] | self._rev_stack: List[Sha1Git] = [] | ||||
self._dir_stack: List[Sha1Git] = [] | self._dir_stack: List[Sha1Git] = [] | ||||
self._cnt_stack: List[Sha1Git] = [] | self._cnt_stack: List[Sha1Git] = [] | ||||
# Set of objects already in any of the stacks: | # Set of objects already in any of the stacks: | ||||
self._seen: Set[Sha1Git] = set() | self._seen: Set[Sha1Git] = set() | ||||
# Set of errors we expect git-fsck to raise at the end: | |||||
self._expected_fsck_errors = set() | |||||
with tempfile.TemporaryDirectory(prefix="swh-vault-gitbare-") as workdir: | with tempfile.TemporaryDirectory(prefix="swh-vault-gitbare-") as workdir: | ||||
# Initialize a Git directory | # Initialize a Git directory | ||||
self.workdir = workdir | self.workdir = workdir | ||||
self.gitdir = os.path.join(workdir, "clone.git") | self.gitdir = os.path.join(workdir, "clone.git") | ||||
os.mkdir(self.gitdir) | os.mkdir(self.gitdir) | ||||
self.init_git() | self.init_git() | ||||
# Add the root object to the stack of objects to visit | # Add the root object to the stack of objects to visit | ||||
Show All 14 Lines | def init_git(self) -> None: | ||||
# Create all possible dirs ahead of time, so we don't have to check for | # Create all possible dirs ahead of time, so we don't have to check for | ||||
# existence every time. | # existence every time. | ||||
for byte in range(256): | for byte in range(256): | ||||
os.mkdir(os.path.join(self.gitdir, "objects", f"{byte:02x}")) | os.mkdir(os.path.join(self.gitdir, "objects", f"{byte:02x}")) | ||||
def repack(self) -> None: | def repack(self) -> None: | ||||
if self.use_fsck: | if self.use_fsck: | ||||
subprocess.run(["git", "-C", self.gitdir, "fsck"], check=True) | self.git_fsck() | ||||
# Add objects we wrote in a pack | # Add objects we wrote in a pack | ||||
subprocess.run(["git", "-C", self.gitdir, "repack"], check=True) | subprocess.run(["git", "-C", self.gitdir, "repack"], check=True) | ||||
# Remove their non-packed originals | # Remove their non-packed originals | ||||
subprocess.run(["git", "-C", self.gitdir, "prune-packed"], check=True) | subprocess.run(["git", "-C", self.gitdir, "prune-packed"], check=True) | ||||
def git_fsck(self) -> None: | |||||
proc = subprocess.run( | |||||
["git", "-C", self.gitdir, "fsck"], | |||||
stdout=subprocess.PIPE, | |||||
stderr=subprocess.STDOUT, | |||||
env={"LANG": "C.utf8"}, | |||||
) | |||||
if not self._expected_fsck_errors: | |||||
# All went well, there should not be any error | |||||
proc.check_returncode() | |||||
return | |||||
# Split on newlines not followed by a space | |||||
errors = re.split("\n(?! )", proc.stdout.decode()) | |||||
unexpected_errors = set(filter(bool, errors)) - self._expected_fsck_errors | |||||
if unexpected_errors: | |||||
raise Exception( | |||||
"\n".join( | |||||
["Unexpected errors from git-fsck:"] + sorted(unexpected_errors) | |||||
) | |||||
) | |||||
def write_refs(self): | def write_refs(self): | ||||
obj_type = self.obj_type.split("_")[0] | obj_type = self.obj_type.split("_")[0] | ||||
if obj_type == "directory": | if obj_type == "directory": | ||||
# We need a synthetic revision pointing to the directory | # We need a synthetic revision pointing to the directory | ||||
author = Person.from_fullname( | author = Person.from_fullname( | ||||
b"swh-vault, git-bare cooker <robot@softwareheritage.org>" | b"swh-vault, git-bare cooker <robot@softwareheritage.org>" | ||||
) | ) | ||||
dt = datetime.datetime.now(tz=datetime.timezone.utc) | dt = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
Show All 19 Lines | def write_refs(self): | ||||
with open(os.path.join(self.gitdir, "refs", "heads", "master"), "wb") as fd: | with open(os.path.join(self.gitdir, "refs", "heads", "master"), "wb") as fd: | ||||
fd.write(hash_to_bytehex(head)) | fd.write(hash_to_bytehex(head)) | ||||
def write_archive(self): | def write_archive(self): | ||||
with tarfile.TarFile(mode="w", fileobj=self.fileobj) as tf: | with tarfile.TarFile(mode="w", fileobj=self.fileobj) as tf: | ||||
tf.add(self.gitdir, arcname=f"{self.obj_swhid()}.git", recursive=True) | tf.add(self.gitdir, arcname=f"{self.obj_swhid()}.git", recursive=True) | ||||
def _obj_path(self, obj_id: Sha1Git): | def _obj_path(self, obj_id: Sha1Git): | ||||
return os.path.join(self.gitdir, self._obj_relative_path(obj_id)) | |||||
def _obj_relative_path(self, obj_id: Sha1Git): | |||||
obj_id_hex = hash_to_hex(obj_id) | obj_id_hex = hash_to_hex(obj_id) | ||||
directory = obj_id_hex[0:2] | directory = obj_id_hex[0:2] | ||||
filename = obj_id_hex[2:] | filename = obj_id_hex[2:] | ||||
return os.path.join(self.gitdir, "objects", directory, filename) | return os.path.join("objects", directory, filename) | ||||
def object_exists(self, obj_id: Sha1Git) -> bool: | def object_exists(self, obj_id: Sha1Git) -> bool: | ||||
return os.path.exists(self._obj_path(obj_id)) | return os.path.exists(self._obj_path(obj_id)) | ||||
def write_object(self, obj_id: Sha1Git, obj: bytes) -> bool: | def write_object(self, obj_id: Sha1Git, obj: bytes) -> bool: | ||||
"""Writes a git object on disk. | """Writes a git object on disk. | ||||
Returns whether it was already written.""" | Returns whether it was already written.""" | ||||
▲ Show 20 Lines • Show All 99 Lines • ▼ Show 20 Lines | def load_directory(self, obj_id: Sha1Git) -> None: | ||||
self._push(stack, [entry["target"]]) | self._push(stack, [entry["target"]]) | ||||
def load_contents(self, obj_ids: List[Sha1Git]) -> None: | def load_contents(self, obj_ids: List[Sha1Git]) -> None: | ||||
# TODO: add support of filtered objects, somehow? | # TODO: add support of filtered objects, somehow? | ||||
# It's tricky, because, by definition, we can't write a git object with | # It's tricky, because, by definition, we can't write a git object with | ||||
# the expected hash, so git-fsck *will* choke on it. | # the expected hash, so git-fsck *will* choke on it. | ||||
contents = self.storage.content_get(obj_ids, "sha1_git") | contents = self.storage.content_get(obj_ids, "sha1_git") | ||||
visible_contents = [] | |||||
for (obj_id, content) in zip(obj_ids, contents): | |||||
if content is None: | |||||
# FIXME: this may also happen for missing content | |||||
self.write_content(obj_id, SKIPPED_MESSAGE) | |||||
self._expect_mismatched_object_error(obj_id) | |||||
elif content.status == "visible": | |||||
visible_contents.append(content) | |||||
elif content.status == "hidden": | |||||
self.write_content(obj_id, HIDDEN_MESSAGE) | |||||
self._expect_mismatched_object_error(obj_id) | |||||
else: | |||||
assert False, ( | |||||
f"unexpected status {content.status!r} " | |||||
f"for content {hash_to_hex(content.sha1_git)}" | |||||
) | |||||
if self.objstorage is None: | if self.objstorage is None: | ||||
for content in contents: | for content in visible_contents: | ||||
data = self.storage.content_get_data(content.sha1) | data = self.storage.content_get_data(content.sha1) | ||||
self.write_content(content.sha1_git, data) | self.write_content(content.sha1_git, data) | ||||
else: | else: | ||||
content_data = self.objstorage.get_batch(c.sha1 for c in contents) | content_data = self.objstorage.get_batch(c.sha1 for c in visible_contents) | ||||
for (content, data) in zip(contents, content_data): | for (content, data) in zip(contents, content_data): | ||||
self.write_content(content.sha1_git, data) | self.write_content(content.sha1_git, data) | ||||
def write_content(self, obj_id: Sha1Git, content: bytes) -> None: | def write_content(self, obj_id: Sha1Git, content: bytes) -> None: | ||||
header = identifiers.git_object_header("blob", len(content)) | header = identifiers.git_object_header("blob", len(content)) | ||||
self.write_object(obj_id, header + content) | self.write_object(obj_id, header + content) | ||||
def _expect_mismatched_object_error(self, obj_id): | |||||
obj_id_hex = hash_to_hex(obj_id) | |||||
obj_path = self._obj_relative_path(obj_id) | |||||
self._expected_fsck_errors.add( | |||||
f"error: sha1 mismatch for ./{obj_path} (expected {obj_id_hex})" | |||||
) | |||||
self._expected_fsck_errors.add( | |||||
f"error: {obj_id_hex}: object corrupt or missing: ./{obj_path}" | |||||
) | |||||
self._expected_fsck_errors.add(f"missing blob {obj_id_hex}") |