Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/cookers/git_bare.py
| Show All 24 Lines | |||||
| import tempfile | import tempfile | ||||
| from typing import Any, Dict, Iterable, List, Optional, Set | from typing import Any, Dict, Iterable, List, Optional, Set | ||||
| import zlib | import zlib | ||||
| from swh.core.api.classes import stream_results | from swh.core.api.classes import stream_results | ||||
| from swh.model import identifiers | from swh.model import identifiers | ||||
| from swh.model.hashutil import hash_to_bytehex, hash_to_hex | from swh.model.hashutil import hash_to_bytehex, hash_to_hex | ||||
| from swh.model.model import ( | from swh.model.model import ( | ||||
| ObjectType, | |||||
| Person, | Person, | ||||
| Revision, | Revision, | ||||
| RevisionType, | RevisionType, | ||||
| Sha1Git, | Sha1Git, | ||||
| TargetType, | TargetType, | ||||
| TimestampWithTimezone, | TimestampWithTimezone, | ||||
| ) | ) | ||||
| from swh.storage.algos.revisions_walker import DFSRevisionsWalker | from swh.storage.algos.revisions_walker import DFSRevisionsWalker | ||||
| from swh.storage.algos.snapshot import snapshot_get_all_branches | from swh.storage.algos.snapshot import snapshot_get_all_branches | ||||
| from swh.vault.cookers.base import BaseVaultCooker | from swh.vault.cookers.base import BaseVaultCooker | ||||
| from swh.vault.to_disk import HIDDEN_MESSAGE, SKIPPED_MESSAGE | from swh.vault.to_disk import HIDDEN_MESSAGE, SKIPPED_MESSAGE | ||||
| RELEASE_BATCH_SIZE = 10000 | |||||
| REVISION_BATCH_SIZE = 10000 | REVISION_BATCH_SIZE = 10000 | ||||
| DIRECTORY_BATCH_SIZE = 10000 | DIRECTORY_BATCH_SIZE = 10000 | ||||
| CONTENT_BATCH_SIZE = 100 | CONTENT_BATCH_SIZE = 100 | ||||
| class GitBareCooker(BaseVaultCooker): | class GitBareCooker(BaseVaultCooker): | ||||
| use_fsck = True | use_fsck = True | ||||
| Show All 25 Lines | class GitBareCooker(BaseVaultCooker): | ||||
| def _pop(self, stack: List[Sha1Git], n: int) -> List[Sha1Git]: | def _pop(self, stack: List[Sha1Git], n: int) -> List[Sha1Git]: | ||||
| obj_ids = stack[-n:] | obj_ids = stack[-n:] | ||||
| stack[-n:] = [] | stack[-n:] = [] | ||||
| return obj_ids | return obj_ids | ||||
| def prepare_bundle(self): | def prepare_bundle(self): | ||||
| # Objects we will visit soon: | # Objects we will visit soon: | ||||
| self._rel_stack: List[Sha1Git] = [] | |||||
| self._rev_stack: List[Sha1Git] = [] | self._rev_stack: List[Sha1Git] = [] | ||||
| self._dir_stack: List[Sha1Git] = [] | self._dir_stack: List[Sha1Git] = [] | ||||
| self._cnt_stack: List[Sha1Git] = [] | self._cnt_stack: List[Sha1Git] = [] | ||||
| # Set of objects already in any of the stacks: | # Set of objects already in any of the stacks: | ||||
| self._seen: Set[Sha1Git] = set() | self._seen: Set[Sha1Git] = set() | ||||
| self._walker_state: Optional[Any] = None | self._walker_state: Optional[Any] = None | ||||
| ▲ Show 20 Lines • Show All 143 Lines • ▼ Show 20 Lines | def push_subgraph(self, obj_type, obj_id) -> None: | ||||
| elif obj_type == "snapshot": | elif obj_type == "snapshot": | ||||
| self.push_snapshot_subgraph(obj_id) | self.push_snapshot_subgraph(obj_id) | ||||
| else: | else: | ||||
| raise NotImplementedError( | raise NotImplementedError( | ||||
| f"GitBareCooker.queue_subgraph({obj_type!r}, ...)" | f"GitBareCooker.queue_subgraph({obj_type!r}, ...)" | ||||
| ) | ) | ||||
| def load_objects(self) -> None: | def load_objects(self) -> None: | ||||
| while self._rev_stack or self._dir_stack or self._cnt_stack: | while self._rel_stack or self._rev_stack or self._dir_stack or self._cnt_stack: | ||||
| release_ids = self._pop(self._rel_stack, RELEASE_BATCH_SIZE) | |||||
| self.push_releases_subgraphs(release_ids) | |||||
| revision_ids = self._pop(self._rev_stack, REVISION_BATCH_SIZE) | revision_ids = self._pop(self._rev_stack, REVISION_BATCH_SIZE) | ||||
| self.load_revisions(revision_ids) | self.load_revisions(revision_ids) | ||||
| directory_ids = self._pop(self._dir_stack, DIRECTORY_BATCH_SIZE) | directory_ids = self._pop(self._dir_stack, DIRECTORY_BATCH_SIZE) | ||||
| self.load_directories(directory_ids) | self.load_directories(directory_ids) | ||||
| content_ids = self._pop(self._cnt_stack, CONTENT_BATCH_SIZE) | content_ids = self._pop(self._cnt_stack, CONTENT_BATCH_SIZE) | ||||
| self.load_contents(content_ids) | self.load_contents(content_ids) | ||||
| ▲ Show 20 Lines • Show All 48 Lines • ▼ Show 20 Lines | def push_snapshot_subgraph(self, obj_id: Sha1Git) -> None: | ||||
| # TODO: when self.graph is available and supports edge labels, use it | # TODO: when self.graph is available and supports edge labels, use it | ||||
| # directly to get branch names. | # directly to get branch names. | ||||
| snapshot = snapshot_get_all_branches(self.storage, obj_id) | snapshot = snapshot_get_all_branches(self.storage, obj_id) | ||||
| assert snapshot, "Unknown snapshot" # should have been caught by check_exists() | assert snapshot, "Unknown snapshot" # should have been caught by check_exists() | ||||
| for branch in snapshot.branches.values(): | for branch in snapshot.branches.values(): | ||||
| if not loaded_from_graph: | if not loaded_from_graph: | ||||
| if branch.target_type == TargetType.REVISION: | if branch.target_type == TargetType.REVISION: | ||||
| self.push_revision_subgraph(branch.target) | self.push_revision_subgraph(branch.target) | ||||
| elif branch.target_type == TargetType.RELEASE: | |||||
| self.push_releases_subgraphs([branch.target]) | |||||
| elif branch.target_type == TargetType.ALIAS: | elif branch.target_type == TargetType.ALIAS: | ||||
| # Nothing to do, this for loop also iterates on the target branch | # Nothing to do, this for loop also iterates on the target branch | ||||
| # (if it exists) | # (if it exists) | ||||
| pass | pass | ||||
| else: | else: | ||||
| raise NotImplementedError(f"{branch.target_type} branches") | raise NotImplementedError(f"{branch.target_type} branches") | ||||
| self.write_refs(snapshot=snapshot) | self.write_refs(snapshot=snapshot) | ||||
| def load_revisions(self, obj_ids: List[Sha1Git]) -> None: | def load_revisions(self, obj_ids: List[Sha1Git]) -> None: | ||||
| """Given a list of revision ids, loads these revisions and their directories; | """Given a list of revision ids, loads these revisions and their directories; | ||||
| but not their parent revisions.""" | but not their parent revisions.""" | ||||
| revisions = self.storage.revision_get(obj_ids) | revisions = self.storage.revision_get(obj_ids) | ||||
| for revision in revisions: | for revision in revisions: | ||||
| self.write_revision_node(revision.to_dict()) | self.write_revision_node(revision.to_dict()) | ||||
| self._push(self._dir_stack, (rev.directory for rev in revisions)) | self._push(self._dir_stack, (rev.directory for rev in revisions)) | ||||
| def write_revision_node(self, revision: Dict[str, Any]) -> bool: | def write_revision_node(self, revision: Dict[str, Any]) -> bool: | ||||
| """Writes a revision object to disk""" | """Writes a revision object to disk""" | ||||
| git_object = identifiers.revision_git_object(revision) | git_object = identifiers.revision_git_object(revision) | ||||
| return self.write_object(revision["id"], git_object) | return self.write_object(revision["id"], git_object) | ||||
| def push_releases_subgraphs(self, obj_ids: List[Sha1Git]) -> None: | |||||
| """Given a list of release ids, loads these releases and adds their | |||||
| target to the list of objects to visit""" | |||||
| releases = self.storage.release_get(obj_ids) | |||||
| revision_ids: List[Sha1Git] = [] | |||||
| for release in releases: | |||||
| self.write_release_node(release.to_dict()) | |||||
| if release.target_type == ObjectType.REVISION: | |||||
| self.push_revision_subgraph(release.target) | |||||
| else: | |||||
| raise NotImplementedError(f"{release.target_type} release targets") | |||||
| self._push(self._rev_stack, revision_ids) | |||||
| def write_release_node(self, release: Dict[str, Any]) -> bool: | |||||
| """Writes a release object to disk""" | |||||
| git_object = identifiers.release_git_object(release) | |||||
| return self.write_object(release["id"], git_object) | |||||
| def load_directories(self, obj_ids: List[Sha1Git]) -> None: | def load_directories(self, obj_ids: List[Sha1Git]) -> None: | ||||
| for obj_id in obj_ids: | for obj_id in obj_ids: | ||||
| self.load_directory(obj_id) | self.load_directory(obj_id) | ||||
| def load_directory(self, obj_id: Sha1Git) -> None: | def load_directory(self, obj_id: Sha1Git) -> None: | ||||
| # Load the directory | # Load the directory | ||||
| entries = [ | entries = [ | ||||
| entry.to_dict() | entry.to_dict() | ||||
| ▲ Show 20 Lines • Show All 69 Lines • Show Last 20 Lines | |||||