Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/cookers/git_bare.py
Show All 24 Lines | |||||
import tempfile | import tempfile | ||||
from typing import Any, Dict, Iterable, List, Optional, Set | from typing import Any, Dict, Iterable, List, Optional, Set | ||||
import zlib | import zlib | ||||
from swh.core.api.classes import stream_results | from swh.core.api.classes import stream_results | ||||
from swh.model import identifiers | from swh.model import identifiers | ||||
from swh.model.hashutil import hash_to_bytehex, hash_to_hex | from swh.model.hashutil import hash_to_bytehex, hash_to_hex | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
ObjectType, | |||||
Person, | Person, | ||||
Revision, | Revision, | ||||
RevisionType, | RevisionType, | ||||
Sha1Git, | Sha1Git, | ||||
TargetType, | TargetType, | ||||
TimestampWithTimezone, | TimestampWithTimezone, | ||||
) | ) | ||||
from swh.storage.algos.revisions_walker import DFSRevisionsWalker | from swh.storage.algos.revisions_walker import DFSRevisionsWalker | ||||
from swh.storage.algos.snapshot import snapshot_get_all_branches | from swh.storage.algos.snapshot import snapshot_get_all_branches | ||||
from swh.vault.cookers.base import BaseVaultCooker | from swh.vault.cookers.base import BaseVaultCooker | ||||
from swh.vault.to_disk import HIDDEN_MESSAGE, SKIPPED_MESSAGE | from swh.vault.to_disk import HIDDEN_MESSAGE, SKIPPED_MESSAGE | ||||
RELEASE_BATCH_SIZE = 10000 | |||||
REVISION_BATCH_SIZE = 10000 | REVISION_BATCH_SIZE = 10000 | ||||
DIRECTORY_BATCH_SIZE = 10000 | DIRECTORY_BATCH_SIZE = 10000 | ||||
CONTENT_BATCH_SIZE = 100 | CONTENT_BATCH_SIZE = 100 | ||||
class GitBareCooker(BaseVaultCooker): | class GitBareCooker(BaseVaultCooker): | ||||
use_fsck = True | use_fsck = True | ||||
Show All 25 Lines | class GitBareCooker(BaseVaultCooker): | ||||
def _pop(self, stack: List[Sha1Git], n: int) -> List[Sha1Git]: | def _pop(self, stack: List[Sha1Git], n: int) -> List[Sha1Git]: | ||||
obj_ids = stack[-n:] | obj_ids = stack[-n:] | ||||
stack[-n:] = [] | stack[-n:] = [] | ||||
return obj_ids | return obj_ids | ||||
def prepare_bundle(self): | def prepare_bundle(self): | ||||
# Objects we will visit soon: | # Objects we will visit soon: | ||||
self._rel_stack: List[Sha1Git] = [] | |||||
self._rev_stack: List[Sha1Git] = [] | self._rev_stack: List[Sha1Git] = [] | ||||
self._dir_stack: List[Sha1Git] = [] | self._dir_stack: List[Sha1Git] = [] | ||||
self._cnt_stack: List[Sha1Git] = [] | self._cnt_stack: List[Sha1Git] = [] | ||||
# Set of objects already in any of the stacks: | # Set of objects already in any of the stacks: | ||||
self._seen: Set[Sha1Git] = set() | self._seen: Set[Sha1Git] = set() | ||||
self._walker_state: Optional[Any] = None | self._walker_state: Optional[Any] = None | ||||
▲ Show 20 Lines • Show All 143 Lines • ▼ Show 20 Lines | def push_subgraph(self, obj_type, obj_id) -> None: | ||||
elif obj_type == "snapshot": | elif obj_type == "snapshot": | ||||
self.push_snapshot_subgraph(obj_id) | self.push_snapshot_subgraph(obj_id) | ||||
else: | else: | ||||
raise NotImplementedError( | raise NotImplementedError( | ||||
f"GitBareCooker.queue_subgraph({obj_type!r}, ...)" | f"GitBareCooker.queue_subgraph({obj_type!r}, ...)" | ||||
) | ) | ||||
def load_objects(self) -> None: | def load_objects(self) -> None: | ||||
while self._rev_stack or self._dir_stack or self._cnt_stack: | while self._rel_stack or self._rev_stack or self._dir_stack or self._cnt_stack: | ||||
release_ids = self._pop(self._rel_stack, RELEASE_BATCH_SIZE) | |||||
self.push_releases_subgraphs(release_ids) | |||||
revision_ids = self._pop(self._rev_stack, REVISION_BATCH_SIZE) | revision_ids = self._pop(self._rev_stack, REVISION_BATCH_SIZE) | ||||
self.load_revisions(revision_ids) | self.load_revisions(revision_ids) | ||||
directory_ids = self._pop(self._dir_stack, DIRECTORY_BATCH_SIZE) | directory_ids = self._pop(self._dir_stack, DIRECTORY_BATCH_SIZE) | ||||
self.load_directories(directory_ids) | self.load_directories(directory_ids) | ||||
content_ids = self._pop(self._cnt_stack, CONTENT_BATCH_SIZE) | content_ids = self._pop(self._cnt_stack, CONTENT_BATCH_SIZE) | ||||
self.load_contents(content_ids) | self.load_contents(content_ids) | ||||
▲ Show 20 Lines • Show All 48 Lines • ▼ Show 20 Lines | def push_snapshot_subgraph(self, obj_id: Sha1Git) -> None: | ||||
# TODO: when self.graph is available and supports edge labels, use it | # TODO: when self.graph is available and supports edge labels, use it | ||||
# directly to get branch names. | # directly to get branch names. | ||||
snapshot = snapshot_get_all_branches(self.storage, obj_id) | snapshot = snapshot_get_all_branches(self.storage, obj_id) | ||||
assert snapshot, "Unknown snapshot" # should have been caught by check_exists() | assert snapshot, "Unknown snapshot" # should have been caught by check_exists() | ||||
for branch in snapshot.branches.values(): | for branch in snapshot.branches.values(): | ||||
if not loaded_from_graph: | if not loaded_from_graph: | ||||
if branch.target_type == TargetType.REVISION: | if branch.target_type == TargetType.REVISION: | ||||
self.push_revision_subgraph(branch.target) | self.push_revision_subgraph(branch.target) | ||||
elif branch.target_type == TargetType.RELEASE: | |||||
self.push_releases_subgraphs([branch.target]) | |||||
elif branch.target_type == TargetType.ALIAS: | elif branch.target_type == TargetType.ALIAS: | ||||
# Nothing to do, this for loop also iterates on the target branch | # Nothing to do, this for loop also iterates on the target branch | ||||
# (if it exists) | # (if it exists) | ||||
pass | pass | ||||
else: | else: | ||||
raise NotImplementedError(f"{branch.target_type} branches") | raise NotImplementedError(f"{branch.target_type} branches") | ||||
self.write_refs(snapshot=snapshot) | self.write_refs(snapshot=snapshot) | ||||
def load_revisions(self, obj_ids: List[Sha1Git]) -> None: | def load_revisions(self, obj_ids: List[Sha1Git]) -> None: | ||||
"""Given a list of revision ids, loads these revisions and their directories; | """Given a list of revision ids, loads these revisions and their directories; | ||||
but not their parent revisions.""" | but not their parent revisions.""" | ||||
revisions = self.storage.revision_get(obj_ids) | revisions = self.storage.revision_get(obj_ids) | ||||
for revision in revisions: | for revision in revisions: | ||||
self.write_revision_node(revision.to_dict()) | self.write_revision_node(revision.to_dict()) | ||||
self._push(self._dir_stack, (rev.directory for rev in revisions)) | self._push(self._dir_stack, (rev.directory for rev in revisions)) | ||||
def write_revision_node(self, revision: Dict[str, Any]) -> bool: | def write_revision_node(self, revision: Dict[str, Any]) -> bool: | ||||
"""Writes a revision object to disk""" | """Writes a revision object to disk""" | ||||
git_object = identifiers.revision_git_object(revision) | git_object = identifiers.revision_git_object(revision) | ||||
return self.write_object(revision["id"], git_object) | return self.write_object(revision["id"], git_object) | ||||
def push_releases_subgraphs(self, obj_ids: List[Sha1Git]) -> None: | |||||
"""Given a list of release ids, loads these releases and adds their | |||||
target to the list of objects to visit""" | |||||
releases = self.storage.release_get(obj_ids) | |||||
revision_ids: List[Sha1Git] = [] | |||||
for release in releases: | |||||
self.write_release_node(release.to_dict()) | |||||
if release.target_type == ObjectType.REVISION: | |||||
self.push_revision_subgraph(release.target) | |||||
else: | |||||
raise NotImplementedError(f"{release.target_type} release targets") | |||||
self._push(self._rev_stack, revision_ids) | |||||
def write_release_node(self, release: Dict[str, Any]) -> bool: | |||||
"""Writes a release object to disk""" | |||||
git_object = identifiers.release_git_object(release) | |||||
return self.write_object(release["id"], git_object) | |||||
def load_directories(self, obj_ids: List[Sha1Git]) -> None: | def load_directories(self, obj_ids: List[Sha1Git]) -> None: | ||||
for obj_id in obj_ids: | for obj_id in obj_ids: | ||||
self.load_directory(obj_id) | self.load_directory(obj_id) | ||||
def load_directory(self, obj_id: Sha1Git) -> None: | def load_directory(self, obj_id: Sha1Git) -> None: | ||||
# Load the directory | # Load the directory | ||||
entries = [ | entries = [ | ||||
entry.to_dict() | entry.to_dict() | ||||
▲ Show 20 Lines • Show All 69 Lines • Show Last 20 Lines |