Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/cookers/git_bare.py
Show All 12 Lines | |||||
2. Calls ``git repack`` to pack all these objects into git packfiles. | 2. Calls ``git repack`` to pack all these objects into git packfiles. | ||||
3. Creates a tarball of the resulting repository | 3. Creates a tarball of the resulting repository | ||||
It keeps a set of all written (or about-to-be-written) object hashes in memory | It keeps a set of all written (or about-to-be-written) object hashes in memory | ||||
to avoid downloading and writing the same objects twice. | to avoid downloading and writing the same objects twice. | ||||
""" | """ | ||||
import datetime | import datetime | ||||
import enum | |||||
import logging | import logging | ||||
import os.path | import os.path | ||||
import re | import re | ||||
import subprocess | import subprocess | ||||
import tarfile | import tarfile | ||||
import tempfile | import tempfile | ||||
from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple | from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple | ||||
import zlib | import zlib | ||||
Show All 22 Lines | |||||
REVISION_BATCH_SIZE = 10000 | REVISION_BATCH_SIZE = 10000 | ||||
DIRECTORY_BATCH_SIZE = 10000 | DIRECTORY_BATCH_SIZE = 10000 | ||||
CONTENT_BATCH_SIZE = 100 | CONTENT_BATCH_SIZE = 100 | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
class RootObjectType(enum.Enum): | |||||
DIRECTORY = "directory" | |||||
REVISION = "revision" | |||||
SNAPSHOT = "snapshot" | |||||
class GitBareCooker(BaseVaultCooker): | class GitBareCooker(BaseVaultCooker): | ||||
use_fsck = True | use_fsck = True | ||||
def __init__(self, *args, **kwargs): | |||||
super().__init__(*args, **kwargs) | |||||
self.obj_type = RootObjectType(self.bundle_type.split("_")[0]) | |||||
def cache_type_key(self) -> str: | def cache_type_key(self) -> str: | ||||
return self.bundle_type | return self.bundle_type | ||||
def check_exists(self): | def check_exists(self): | ||||
obj_type = self.bundle_type.split("_")[0] | if self.obj_type == RootObjectType.REVISION: | ||||
if obj_type == "revision": | |||||
return not list(self.storage.revision_missing([self.obj_id])) | return not list(self.storage.revision_missing([self.obj_id])) | ||||
elif obj_type == "directory": | elif self.obj_type == RootObjectType.DIRECTORY: | ||||
return not list(self.storage.directory_missing([self.obj_id])) | return not list(self.storage.directory_missing([self.obj_id])) | ||||
if obj_type == "snapshot": | elif self.obj_type == RootObjectType.SNAPSHOT: | ||||
return not list(self.storage.snapshot_missing([self.obj_id])) | return not list(self.storage.snapshot_missing([self.obj_id])) | ||||
else: | else: | ||||
raise NotImplementedError(f"GitBareCooker for {obj_type}") | assert False, f"Unexpected root object type: {self.obj_type}" | ||||
def obj_swhid(self) -> identifiers.CoreSWHID: | def obj_swhid(self) -> identifiers.CoreSWHID: | ||||
obj_type = self.bundle_type.split("_")[0] | |||||
return identifiers.CoreSWHID( | return identifiers.CoreSWHID( | ||||
object_type=identifiers.ObjectType[obj_type.upper()], object_id=self.obj_id, | object_type=identifiers.ObjectType[self.obj_type.name], | ||||
object_id=self.obj_id, | |||||
) | ) | ||||
def _push(self, stack: List[Sha1Git], obj_ids: Iterable[Sha1Git]) -> None: | def _push(self, stack: List[Sha1Git], obj_ids: Iterable[Sha1Git]) -> None: | ||||
assert not isinstance(obj_ids, bytes) | assert not isinstance(obj_ids, bytes) | ||||
revision_ids = [id_ for id_ in obj_ids if id_ not in self._seen] | revision_ids = [id_ for id_ in obj_ids if id_ not in self._seen] | ||||
self._seen.update(revision_ids) | self._seen.update(revision_ids) | ||||
stack.extend(revision_ids) | stack.extend(revision_ids) | ||||
Show All 19 Lines | def prepare_bundle(self): | ||||
with tempfile.TemporaryDirectory(prefix="swh-vault-gitbare-") as workdir: | with tempfile.TemporaryDirectory(prefix="swh-vault-gitbare-") as workdir: | ||||
# Initialize a Git directory | # Initialize a Git directory | ||||
self.workdir = workdir | self.workdir = workdir | ||||
self.gitdir = os.path.join(workdir, "clone.git") | self.gitdir = os.path.join(workdir, "clone.git") | ||||
os.mkdir(self.gitdir) | os.mkdir(self.gitdir) | ||||
self.init_git() | self.init_git() | ||||
# Add the root object to the stack of objects to visit | # Add the root object to the stack of objects to visit | ||||
self.push_subgraph(self.bundle_type.split("_")[0], self.obj_id) | self.push_subgraph(self.obj_type, self.obj_id) | ||||
# Load and write all the objects to disk | # Load and write all the objects to disk | ||||
self.load_objects() | self.load_objects() | ||||
# Write the root object as a ref (this step is skipped if it's a snapshot) | # Write the root object as a ref (this step is skipped if it's a snapshot) | ||||
# This must be done before repacking; git-repack ignores orphan objects. | # This must be done before repacking; git-repack ignores orphan objects. | ||||
self.write_refs() | self.write_refs() | ||||
▲ Show 20 Lines • Show All 44 Lines • ▼ Show 20 Lines | def git_fsck(self) -> None: | ||||
raise Exception( | raise Exception( | ||||
"\n".join( | "\n".join( | ||||
["Unexpected errors from git-fsck:"] + sorted(unexpected_errors) | ["Unexpected errors from git-fsck:"] + sorted(unexpected_errors) | ||||
) | ) | ||||
) | ) | ||||
def write_refs(self, snapshot=None): | def write_refs(self, snapshot=None): | ||||
refs: Dict[bytes, bytes] # ref name -> target | refs: Dict[bytes, bytes] # ref name -> target | ||||
obj_type = self.bundle_type.split("_")[0] | if self.obj_type == RootObjectType.DIRECTORY: | ||||
if obj_type == "directory": | |||||
# We need a synthetic revision pointing to the directory | # We need a synthetic revision pointing to the directory | ||||
author = Person.from_fullname( | author = Person.from_fullname( | ||||
b"swh-vault, git-bare cooker <robot@softwareheritage.org>" | b"swh-vault, git-bare cooker <robot@softwareheritage.org>" | ||||
) | ) | ||||
dt = datetime.datetime.now(tz=datetime.timezone.utc) | dt = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
dt = dt.replace(microsecond=0) # not supported by git | dt = dt.replace(microsecond=0) # not supported by git | ||||
date = TimestampWithTimezone.from_datetime(dt) | date = TimestampWithTimezone.from_datetime(dt) | ||||
revision = Revision( | revision = Revision( | ||||
author=author, | author=author, | ||||
committer=author, | committer=author, | ||||
date=date, | date=date, | ||||
committer_date=date, | committer_date=date, | ||||
message=b"Initial commit", | message=b"Initial commit", | ||||
type=RevisionType.GIT, | type=RevisionType.GIT, | ||||
directory=self.obj_id, | directory=self.obj_id, | ||||
synthetic=True, | synthetic=True, | ||||
) | ) | ||||
self.write_revision_node(revision.to_dict()) | self.write_revision_node(revision.to_dict()) | ||||
refs = {b"refs/heads/master": hash_to_bytehex(revision.id)} | refs = {b"refs/heads/master": hash_to_bytehex(revision.id)} | ||||
elif obj_type == "revision": | elif self.obj_type == RootObjectType.REVISION: | ||||
refs = {b"refs/heads/master": hash_to_bytehex(self.obj_id)} | refs = {b"refs/heads/master": hash_to_bytehex(self.obj_id)} | ||||
elif obj_type == "snapshot": | elif self.obj_type == RootObjectType.SNAPSHOT: | ||||
if snapshot is None: | if snapshot is None: | ||||
# refs were already written in a previous step | # refs were already written in a previous step | ||||
return | return | ||||
branches = [] | branches = [] | ||||
for (branch_name, branch) in snapshot.branches.items(): | for (branch_name, branch) in snapshot.branches.items(): | ||||
if branch is None: | if branch is None: | ||||
logging.error( | logging.error( | ||||
"%s has dangling branch: %r", snapshot.swhid(), branch_name | "%s has dangling branch: %r", snapshot.swhid(), branch_name | ||||
) | ) | ||||
else: | else: | ||||
branches.append((branch_name, branch)) | branches.append((branch_name, branch)) | ||||
refs = { | refs = { | ||||
branch_name: ( | branch_name: ( | ||||
b"ref: " + branch.target | b"ref: " + branch.target | ||||
if branch.target_type == TargetType.ALIAS | if branch.target_type == TargetType.ALIAS | ||||
else hash_to_bytehex(branch.target) | else hash_to_bytehex(branch.target) | ||||
) | ) | ||||
for (branch_name, branch) in branches | for (branch_name, branch) in branches | ||||
} | } | ||||
else: | else: | ||||
assert False, obj_type | assert False, f"Unexpected root object type: {self.obj_type}" | ||||
for (ref_name, ref_target) in refs.items(): | for (ref_name, ref_target) in refs.items(): | ||||
path = os.path.join(self.gitdir.encode(), ref_name) | path = os.path.join(self.gitdir.encode(), ref_name) | ||||
os.makedirs(os.path.dirname(path), exist_ok=True) | os.makedirs(os.path.dirname(path), exist_ok=True) | ||||
with open(path, "wb") as fd: | with open(path, "wb") as fd: | ||||
fd.write(ref_target) | fd.write(ref_target) | ||||
def write_archive(self): | def write_archive(self): | ||||
Show All 19 Lines | def write_object(self, obj_id: Sha1Git, obj: bytes) -> bool: | ||||
# Git requires objects to be zlib-compressed; but repacking decompresses and | # Git requires objects to be zlib-compressed; but repacking decompresses and | ||||
# removes them, so we don't need to compress them too much. | # removes them, so we don't need to compress them too much. | ||||
data = zlib.compress(obj, level=1) | data = zlib.compress(obj, level=1) | ||||
with open(self._obj_path(obj_id), "wb") as fd: | with open(self._obj_path(obj_id), "wb") as fd: | ||||
fd.write(data) | fd.write(data) | ||||
return True | return True | ||||
def push_subgraph(self, obj_type, obj_id) -> None: | def push_subgraph(self, obj_type: RootObjectType, obj_id) -> None: | ||||
if obj_type == "revision": | if self.obj_type == RootObjectType.REVISION: | ||||
self.push_revision_subgraph(obj_id) | self.push_revision_subgraph(obj_id) | ||||
elif obj_type == "directory": | elif self.obj_type == RootObjectType.DIRECTORY: | ||||
self._push(self._dir_stack, [obj_id]) | self._push(self._dir_stack, [obj_id]) | ||||
elif obj_type == "snapshot": | elif self.obj_type == RootObjectType.SNAPSHOT: | ||||
self.push_snapshot_subgraph(obj_id) | self.push_snapshot_subgraph(obj_id) | ||||
else: | else: | ||||
raise NotImplementedError( | assert False, f"Unexpected root object type: {self.obj_type}" | ||||
f"GitBareCooker.queue_subgraph({obj_type!r}, ...)" | |||||
) | |||||
def load_objects(self) -> None: | def load_objects(self) -> None: | ||||
while self._rel_stack or self._rev_stack or self._dir_stack or self._cnt_stack: | while self._rel_stack or self._rev_stack or self._dir_stack or self._cnt_stack: | ||||
release_ids = self._pop(self._rel_stack, RELEASE_BATCH_SIZE) | release_ids = self._pop(self._rel_stack, RELEASE_BATCH_SIZE) | ||||
if release_ids: | if release_ids: | ||||
self.load_releases(release_ids) | self.load_releases(release_ids) | ||||
revision_ids = self._pop(self._rev_stack, REVISION_BATCH_SIZE) | revision_ids = self._pop(self._rev_stack, REVISION_BATCH_SIZE) | ||||
▲ Show 20 Lines • Show All 271 Lines • Show Last 20 Lines |