Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/cookers/git_bare.py
Show All 29 Lines | |||||
from swh.core.api.classes import stream_results_optional | from swh.core.api.classes import stream_results_optional | ||||
from swh.model import identifiers | from swh.model import identifiers | ||||
from swh.model.hashutil import hash_to_bytehex, hash_to_hex | from swh.model.hashutil import hash_to_bytehex, hash_to_hex | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Content, | Content, | ||||
DirectoryEntry, | DirectoryEntry, | ||||
ObjectType, | ObjectType, | ||||
Person, | Person, | ||||
Release, | |||||
Revision, | Revision, | ||||
RevisionType, | RevisionType, | ||||
Sha1Git, | Sha1Git, | ||||
TargetType, | TargetType, | ||||
TimestampWithTimezone, | TimestampWithTimezone, | ||||
) | ) | ||||
from swh.storage.algos.revisions_walker import DFSRevisionsWalker | from swh.storage.algos.revisions_walker import DFSRevisionsWalker | ||||
from swh.storage.algos.snapshot import snapshot_get_all_branches | from swh.storage.algos.snapshot import snapshot_get_all_branches | ||||
▲ Show 20 Lines • Show All 210 Lines • ▼ Show 20 Lines | def push_subgraph(self, obj_type, obj_id) -> None: | ||||
else: | else: | ||||
raise NotImplementedError( | raise NotImplementedError( | ||||
f"GitBareCooker.queue_subgraph({obj_type!r}, ...)" | f"GitBareCooker.queue_subgraph({obj_type!r}, ...)" | ||||
) | ) | ||||
def load_objects(self) -> None: | def load_objects(self) -> None: | ||||
while self._rel_stack or self._rev_stack or self._dir_stack or self._cnt_stack: | while self._rel_stack or self._rev_stack or self._dir_stack or self._cnt_stack: | ||||
release_ids = self._pop(self._rel_stack, RELEASE_BATCH_SIZE) | release_ids = self._pop(self._rel_stack, RELEASE_BATCH_SIZE) | ||||
self.push_releases_subgraphs(release_ids) | self.load_releases(release_ids) | ||||
revision_ids = self._pop(self._rev_stack, REVISION_BATCH_SIZE) | revision_ids = self._pop(self._rev_stack, REVISION_BATCH_SIZE) | ||||
self.load_revisions(revision_ids) | self.load_revisions(revision_ids) | ||||
directory_ids = self._pop(self._dir_stack, DIRECTORY_BATCH_SIZE) | directory_ids = self._pop(self._dir_stack, DIRECTORY_BATCH_SIZE) | ||||
self.load_directories(directory_ids) | self.load_directories(directory_ids) | ||||
content_ids = self._pop(self._cnt_stack, CONTENT_BATCH_SIZE) | content_ids = self._pop(self._cnt_stack, CONTENT_BATCH_SIZE) | ||||
▲ Show 20 Lines • Show All 89 Lines • ▼ Show 20 Lines | def push_snapshot_subgraph(self, obj_id: Sha1Git) -> None: | ||||
loaded_from_graph = True | loaded_from_graph = True | ||||
# TODO: when self.graph is available and supports edge labels, use it | # TODO: when self.graph is available and supports edge labels, use it | ||||
# directly to get branch names. | # directly to get branch names. | ||||
snapshot = snapshot_get_all_branches(self.storage, obj_id) | snapshot = snapshot_get_all_branches(self.storage, obj_id) | ||||
assert snapshot, "Unknown snapshot" # should have been caught by check_exists() | assert snapshot, "Unknown snapshot" # should have been caught by check_exists() | ||||
for branch in snapshot.branches.values(): | for branch in snapshot.branches.values(): | ||||
if not loaded_from_graph: | if not loaded_from_graph: | ||||
if branch.target_type == TargetType.REVISION: | if branch is None: | ||||
logging.warning("Dangling branch: %r", branch) | |||||
elif branch.target_type == TargetType.REVISION: | |||||
self.push_revision_subgraph(branch.target) | self.push_revision_subgraph(branch.target) | ||||
elif branch.target_type == TargetType.RELEASE: | elif branch.target_type == TargetType.RELEASE: | ||||
self.push_releases_subgraphs([branch.target]) | self.push_releases_subgraphs([branch.target]) | ||||
elif branch.target_type == TargetType.ALIAS: | elif branch.target_type == TargetType.ALIAS: | ||||
# Nothing to do, this for loop also iterates on the target branch | # Nothing to do, this for loop also iterates on the target branch | ||||
# (if it exists) | # (if it exists) | ||||
pass | pass | ||||
else: | else: | ||||
Show All 14 Lines | def load_revisions(self, obj_ids: List[Sha1Git]) -> None: | ||||
self.write_revision_node(revision.to_dict()) | self.write_revision_node(revision.to_dict()) | ||||
self._push(self._dir_stack, (rev.directory for rev in revisions)) | self._push(self._dir_stack, (rev.directory for rev in revisions)) | ||||
def write_revision_node(self, revision: Dict[str, Any]) -> bool: | def write_revision_node(self, revision: Dict[str, Any]) -> bool: | ||||
"""Writes a revision object to disk""" | """Writes a revision object to disk""" | ||||
git_object = identifiers.revision_git_object(revision) | git_object = identifiers.revision_git_object(revision) | ||||
return self.write_object(revision["id"], git_object) | return self.write_object(revision["id"], git_object) | ||||
def push_releases_subgraphs(self, obj_ids: List[Sha1Git]) -> None: | def load_releases(self, obj_ids: List[Sha1Git]) -> List[Release]: | ||||
"""Given a list of release ids, loads these releases and adds their | """Loads release objects, and returns them.""" | ||||
target to the list of objects to visit""" | |||||
ret = self.storage.release_get(obj_ids) | ret = self.storage.release_get(obj_ids) | ||||
releases = list(filter(None, ret)) | releases = list(filter(None, ret)) | ||||
if len(ret) != len(releases): | if len(ret) != len(releases): | ||||
logger.error("Missing release(s), ignoring them.") | logger.error("Missing release(s), ignoring them.") | ||||
revision_ids: List[Sha1Git] = [] | |||||
for release in releases: | for release in releases: | ||||
self.write_release_node(release.to_dict()) | self.write_release_node(release.to_dict()) | ||||
return releases | |||||
def push_releases_subgraphs(self, obj_ids: List[Sha1Git]) -> None: | |||||
"""Given a list of release ids, loads these releases and adds their | |||||
target to the list of objects to visit""" | |||||
for release in self.load_releases(obj_ids): | |||||
if release.target_type == ObjectType.REVISION: | if release.target_type == ObjectType.REVISION: | ||||
assert release.target, "{release.swhid(}) has no target" | assert release.target, "{release.swhid(}) has no target" | ||||
self.push_revision_subgraph(release.target) | self.push_revision_subgraph(release.target) | ||||
elif release.target_type == ObjectType.DIRECTORY: | |||||
assert release.target, "{release.swhid(}) has no target" | |||||
self._push(self._dir_stack, [release.target]) | |||||
else: | else: | ||||
raise NotImplementedError(f"{release.target_type} release targets") | raise NotImplementedError( | ||||
self._push(self._rev_stack, revision_ids) | f"{release.swhid()} targets {release.target_type}" | ||||
) | |||||
def write_release_node(self, release: Dict[str, Any]) -> bool: | def write_release_node(self, release: Dict[str, Any]) -> bool: | ||||
"""Writes a release object to disk""" | """Writes a release object to disk""" | ||||
git_object = identifiers.release_git_object(release) | git_object = identifiers.release_git_object(release) | ||||
return self.write_object(release["id"], git_object) | return self.write_object(release["id"], git_object) | ||||
def load_directories(self, obj_ids: List[Sha1Git]) -> None: | def load_directories(self, obj_ids: List[Sha1Git]) -> None: | ||||
for obj_id in obj_ids: | for obj_id in obj_ids: | ||||
▲ Show 20 Lines • Show All 91 Lines • Show Last 20 Lines |