Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/cookers/git_bare.py
Show All 19 Lines | |||||
import datetime | import datetime | ||||
import enum | import enum | ||||
import logging | import logging | ||||
import os.path | import os.path | ||||
import re | import re | ||||
import subprocess | import subprocess | ||||
import tarfile | import tarfile | ||||
import tempfile | import tempfile | ||||
from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple | from typing import Any, Dict, Iterable, Iterator, List, NoReturn, Optional, Set, Tuple | ||||
import zlib | import zlib | ||||
from swh.core.api.classes import stream_results_optional | from swh.core.api.classes import stream_results_optional | ||||
from swh.model import identifiers | from swh.model import identifiers | ||||
from swh.model.hashutil import hash_to_bytehex, hash_to_hex | from swh.model.hashutil import hash_to_bytehex, hash_to_hex | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Content, | Content, | ||||
DirectoryEntry, | DirectoryEntry, | ||||
ObjectType, | ObjectType, | ||||
Person, | Person, | ||||
Release, | Release, | ||||
Revision, | Revision, | ||||
RevisionType, | RevisionType, | ||||
Sha1Git, | Sha1Git, | ||||
Snapshot, | |||||
SnapshotBranch, | |||||
TargetType, | TargetType, | ||||
TimestampWithTimezone, | TimestampWithTimezone, | ||||
) | ) | ||||
from swh.storage.algos.revisions_walker import DFSRevisionsWalker | from swh.storage.algos.revisions_walker import DFSRevisionsWalker | ||||
from swh.storage.algos.snapshot import snapshot_get_all_branches | from swh.storage.algos.snapshot import snapshot_get_all_branches | ||||
from swh.vault.cookers.base import BaseVaultCooker | from swh.vault.cookers.base import BaseVaultCooker | ||||
from swh.vault.to_disk import HIDDEN_MESSAGE, SKIPPED_MESSAGE | from swh.vault.to_disk import HIDDEN_MESSAGE, SKIPPED_MESSAGE | ||||
RELEASE_BATCH_SIZE = 10000 | RELEASE_BATCH_SIZE = 10000 | ||||
REVISION_BATCH_SIZE = 10000 | REVISION_BATCH_SIZE = 10000 | ||||
DIRECTORY_BATCH_SIZE = 10000 | DIRECTORY_BATCH_SIZE = 10000 | ||||
CONTENT_BATCH_SIZE = 100 | CONTENT_BATCH_SIZE = 100 | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
class RootObjectType(enum.Enum): | class RootObjectType(enum.Enum): | ||||
DIRECTORY = "directory" | DIRECTORY = "directory" | ||||
REVISION = "revision" | REVISION = "revision" | ||||
SNAPSHOT = "snapshot" | SNAPSHOT = "snapshot" | ||||
def assert_never(value: NoReturn, msg) -> NoReturn: | |||||
"""mypy makes sure this function is never called, through exhaustive checking | |||||
of ``value`` in the parent function. | |||||
See https://mypy.readthedocs.io/en/latest/literal_types.html#exhaustive-checks | |||||
for details. | |||||
""" | |||||
assert False, msg | |||||
class GitBareCooker(BaseVaultCooker): | class GitBareCooker(BaseVaultCooker): | ||||
use_fsck = True | use_fsck = True | ||||
obj_type: RootObjectType | |||||
def __init__(self, *args, **kwargs): | def __init__(self, *args, **kwargs): | ||||
super().__init__(*args, **kwargs) | super().__init__(*args, **kwargs) | ||||
self.obj_type = RootObjectType(self.bundle_type.split("_")[0]) | self.obj_type = RootObjectType(self.bundle_type.split("_")[0]) | ||||
def cache_type_key(self) -> str: | def cache_type_key(self) -> str: | ||||
return self.bundle_type | return self.bundle_type | ||||
def check_exists(self): | def check_exists(self) -> bool: | ||||
if self.obj_type == RootObjectType.REVISION: | if self.obj_type is RootObjectType.REVISION: | ||||
return not list(self.storage.revision_missing([self.obj_id])) | return not list(self.storage.revision_missing([self.obj_id])) | ||||
elif self.obj_type == RootObjectType.DIRECTORY: | elif self.obj_type is RootObjectType.DIRECTORY: | ||||
return not list(self.storage.directory_missing([self.obj_id])) | return not list(self.storage.directory_missing([self.obj_id])) | ||||
elif self.obj_type == RootObjectType.SNAPSHOT: | elif self.obj_type is RootObjectType.SNAPSHOT: | ||||
return not list(self.storage.snapshot_missing([self.obj_id])) | return not list(self.storage.snapshot_missing([self.obj_id])) | ||||
else: | else: | ||||
assert False, f"Unexpected root object type: {self.obj_type}" | assert_never(self.obj_type, f"Unexpected root object type: {self.obj_type}") | ||||
def obj_swhid(self) -> identifiers.CoreSWHID: | def obj_swhid(self) -> identifiers.CoreSWHID: | ||||
return identifiers.CoreSWHID( | return identifiers.CoreSWHID( | ||||
object_type=identifiers.ObjectType[self.obj_type.name], | object_type=identifiers.ObjectType[self.obj_type.name], | ||||
object_id=self.obj_id, | object_id=self.obj_id, | ||||
) | ) | ||||
def _push(self, stack: List[Sha1Git], obj_ids: Iterable[Sha1Git]) -> None: | def _push(self, stack: List[Sha1Git], obj_ids: Iterable[Sha1Git]) -> None: | ||||
▲ Show 20 Lines • Show All 128 Lines • ▼ Show 20 Lines | def write_refs(self, snapshot=None): | ||||
branch_name: ( | branch_name: ( | ||||
b"ref: " + branch.target | b"ref: " + branch.target | ||||
if branch.target_type == TargetType.ALIAS | if branch.target_type == TargetType.ALIAS | ||||
else hash_to_bytehex(branch.target) | else hash_to_bytehex(branch.target) | ||||
) | ) | ||||
for (branch_name, branch) in branches | for (branch_name, branch) in branches | ||||
} | } | ||||
else: | else: | ||||
assert False, f"Unexpected root object type: {self.obj_type}" | assert_never(self.obj_type, f"Unexpected root object type: {self.obj_type}") | ||||
for (ref_name, ref_target) in refs.items(): | for (ref_name, ref_target) in refs.items(): | ||||
path = os.path.join(self.gitdir.encode(), ref_name) | path = os.path.join(self.gitdir.encode(), ref_name) | ||||
os.makedirs(os.path.dirname(path), exist_ok=True) | os.makedirs(os.path.dirname(path), exist_ok=True) | ||||
with open(path, "wb") as fd: | with open(path, "wb") as fd: | ||||
fd.write(ref_target) | fd.write(ref_target) | ||||
def write_archive(self): | def write_archive(self): | ||||
Show All 20 Lines | def write_object(self, obj_id: Sha1Git, obj: bytes) -> bool: | ||||
# removes them, so we don't need to compress them too much. | # removes them, so we don't need to compress them too much. | ||||
data = zlib.compress(obj, level=1) | data = zlib.compress(obj, level=1) | ||||
with open(self._obj_path(obj_id), "wb") as fd: | with open(self._obj_path(obj_id), "wb") as fd: | ||||
fd.write(data) | fd.write(data) | ||||
return True | return True | ||||
def push_subgraph(self, obj_type: RootObjectType, obj_id) -> None: | def push_subgraph(self, obj_type: RootObjectType, obj_id) -> None: | ||||
if self.obj_type == RootObjectType.REVISION: | if self.obj_type is RootObjectType.REVISION: | ||||
self.push_revision_subgraph(obj_id) | self.push_revision_subgraph(obj_id) | ||||
elif self.obj_type == RootObjectType.DIRECTORY: | elif self.obj_type is RootObjectType.DIRECTORY: | ||||
self._push(self._dir_stack, [obj_id]) | self._push(self._dir_stack, [obj_id]) | ||||
elif self.obj_type == RootObjectType.SNAPSHOT: | elif self.obj_type is RootObjectType.SNAPSHOT: | ||||
self.push_snapshot_subgraph(obj_id) | self.push_snapshot_subgraph(obj_id) | ||||
else: | else: | ||||
assert False, f"Unexpected root object type: {self.obj_type}" | assert_never(self.obj_type, f"Unexpected root object type: {self.obj_type}") | ||||
def load_objects(self) -> None: | def load_objects(self) -> None: | ||||
while self._rel_stack or self._rev_stack or self._dir_stack or self._cnt_stack: | while self._rel_stack or self._rev_stack or self._dir_stack or self._cnt_stack: | ||||
release_ids = self._pop(self._rel_stack, RELEASE_BATCH_SIZE) | release_ids = self._pop(self._rel_stack, RELEASE_BATCH_SIZE) | ||||
if release_ids: | if release_ids: | ||||
self.load_releases(release_ids) | self.load_releases(release_ids) | ||||
revision_ids = self._pop(self._rev_stack, REVISION_BATCH_SIZE) | revision_ids = self._pop(self._rev_stack, REVISION_BATCH_SIZE) | ||||
▲ Show 20 Lines • Show All 65 Lines • ▼ Show 20 Lines | def push_snapshot_subgraph(self, obj_id: Sha1Git) -> None: | ||||
from swh.graph.client import GraphArgumentException | from swh.graph.client import GraphArgumentException | ||||
# First, try to cook using swh-graph, as it is more efficient than | # First, try to cook using swh-graph, as it is more efficient than | ||||
# swh-storage for querying the history | # swh-storage for querying the history | ||||
obj_swhid = identifiers.CoreSWHID( | obj_swhid = identifiers.CoreSWHID( | ||||
object_type=identifiers.ObjectType.SNAPSHOT, object_id=obj_id, | object_type=identifiers.ObjectType.SNAPSHOT, object_id=obj_id, | ||||
) | ) | ||||
try: | try: | ||||
swhids = map( | swhids: Iterable[identifiers.CoreSWHID] = map( | ||||
identifiers.CoreSWHID.from_string, | identifiers.CoreSWHID.from_string, | ||||
self.graph.visit_nodes(str(obj_swhid), edges="snp:*,rel:*,rev:rev"), | self.graph.visit_nodes(str(obj_swhid), edges="snp:*,rel:*,rev:rev"), | ||||
) | ) | ||||
for swhid in swhids: | for swhid in swhids: | ||||
if swhid.object_type == identifiers.ObjectType.REVISION: | if swhid.object_type is identifiers.ObjectType.REVISION: | ||||
revision_ids.append(swhid.object_id) | revision_ids.append(swhid.object_id) | ||||
elif swhid.object_type == identifiers.ObjectType.RELEASE: | elif swhid.object_type is identifiers.ObjectType.RELEASE: | ||||
release_ids.append(swhid.object_id) | release_ids.append(swhid.object_id) | ||||
elif swhid.object_type == identifiers.ObjectType.DIRECTORY: | elif swhid.object_type is identifiers.ObjectType.DIRECTORY: | ||||
directory_ids.append(swhid.object_id) | directory_ids.append(swhid.object_id) | ||||
elif swhid.object_type == identifiers.ObjectType.CONTENT: | elif swhid.object_type is identifiers.ObjectType.CONTENT: | ||||
content_ids.append(swhid.object_id) | content_ids.append(swhid.object_id) | ||||
elif swhid.object_type == identifiers.ObjectType.SNAPSHOT: | elif swhid.object_type is identifiers.ObjectType.SNAPSHOT: | ||||
assert ( | assert ( | ||||
swhid.object_id == obj_id | swhid.object_id == obj_id | ||||
), f"Snapshot {obj_id.hex()} references a different snapshot" | ), f"Snapshot {obj_id.hex()} references a different snapshot" | ||||
else: | else: | ||||
assert False, f"Unexpected SWHID object type: {swhid}" | assert_never( | ||||
swhid.object_type, f"Unexpected SWHID object type: {swhid}" | |||||
) | |||||
except GraphArgumentException as e: | except GraphArgumentException as e: | ||||
logger.info( | logger.info( | ||||
"Snapshot %s not found in swh-graph, falling back to fetching " | "Snapshot %s not found in swh-graph, falling back to fetching " | ||||
"history for each branch. %s", | "history for each branch. %s", | ||||
hash_to_hex(obj_id), | hash_to_hex(obj_id), | ||||
e.args[0], | e.args[0], | ||||
) | ) | ||||
else: | else: | ||||
self._push(self._rev_stack, revision_ids) | self._push(self._rev_stack, revision_ids) | ||||
self._push(self._rel_stack, release_ids) | self._push(self._rel_stack, release_ids) | ||||
self._push(self._dir_stack, directory_ids) | self._push(self._dir_stack, directory_ids) | ||||
self._push(self._cnt_stack, content_ids) | self._push(self._cnt_stack, content_ids) | ||||
loaded_from_graph = True | loaded_from_graph = True | ||||
# TODO: when self.graph is available and supports edge labels, use it | # TODO: when self.graph is available and supports edge labels, use it | ||||
# directly to get branch names. | # directly to get branch names. | ||||
snapshot = snapshot_get_all_branches(self.storage, obj_id) | snapshot: Optional[Snapshot] = snapshot_get_all_branches(self.storage, obj_id) | ||||
assert snapshot, "Unknown snapshot" # should have been caught by check_exists() | assert snapshot, "Unknown snapshot" # should have been caught by check_exists() | ||||
for branch in snapshot.branches.values(): | for branch in snapshot.branches.values(): | ||||
if not loaded_from_graph: | if not loaded_from_graph: | ||||
if branch is None: | if branch is None: | ||||
logging.warning("Dangling branch: %r", branch) | logging.warning("Dangling branch: %r", branch) | ||||
elif branch.target_type == TargetType.REVISION: | continue | ||||
assert isinstance(branch, SnapshotBranch) # for mypy | |||||
if branch.target_type is TargetType.REVISION: | |||||
self.push_revision_subgraph(branch.target) | self.push_revision_subgraph(branch.target) | ||||
elif branch.target_type == TargetType.RELEASE: | elif branch.target_type is TargetType.RELEASE: | ||||
self.push_releases_subgraphs([branch.target]) | self.push_releases_subgraphs([branch.target]) | ||||
elif branch.target_type == TargetType.ALIAS: | elif branch.target_type is TargetType.ALIAS: | ||||
# Nothing to do, this for loop also iterates on the target branch | # Nothing to do, this for loop also iterates on the target branch | ||||
# (if it exists) | # (if it exists) | ||||
pass | pass | ||||
elif branch.target_type == TargetType.DIRECTORY: | elif branch.target_type is TargetType.DIRECTORY: | ||||
self._push(self._dir_stack, [branch.target]) | self._push(self._dir_stack, [branch.target]) | ||||
elif branch.target_type == TargetType.CONTENT: | elif branch.target_type is TargetType.CONTENT: | ||||
self._push(self._cnt_stack, [branch.target]) | self._push(self._cnt_stack, [branch.target]) | ||||
elif branch.target_type is TargetType.SNAPSHOT: | |||||
if swhid.object_id != obj_id: | |||||
raise NotImplementedError( | |||||
f"{swhid} has a snapshot as a branch." | |||||
) | |||||
else: | else: | ||||
raise NotImplementedError(f"{branch.target_type} branches") | assert_never( | ||||
branch.target_type, f"Unexpected target type: {self.obj_type}" | |||||
) | |||||
self.write_refs(snapshot=snapshot) | self.write_refs(snapshot=snapshot) | ||||
def load_revisions(self, obj_ids: List[Sha1Git]) -> None: | def load_revisions(self, obj_ids: List[Sha1Git]) -> None: | ||||
"""Given a list of revision ids, loads these revisions and their directories; | """Given a list of revision ids, loads these revisions and their directories; | ||||
but not their parent revisions.""" | but not their parent revisions.""" | ||||
ret: List[Optional[Revision]] = self.storage.revision_get(obj_ids) | ret: List[Optional[Revision]] = self.storage.revision_get(obj_ids) | ||||
Show All 22 Lines | def load_releases(self, obj_ids: List[Sha1Git]) -> List[Release]: | ||||
self.write_release_node(release.to_dict()) | self.write_release_node(release.to_dict()) | ||||
return releases | return releases | ||||
def push_releases_subgraphs(self, obj_ids: List[Sha1Git]) -> None: | def push_releases_subgraphs(self, obj_ids: List[Sha1Git]) -> None: | ||||
"""Given a list of release ids, loads these releases and adds their | """Given a list of release ids, loads these releases and adds their | ||||
target to the list of objects to visit""" | target to the list of objects to visit""" | ||||
for release in self.load_releases(obj_ids): | for release in self.load_releases(obj_ids): | ||||
if release.target_type == ObjectType.REVISION: | if release.target_type is ObjectType.REVISION: | ||||
assert release.target, "{release.swhid(}) has no target" | assert release.target, "{release.swhid(}) has no target" | ||||
self.push_revision_subgraph(release.target) | self.push_revision_subgraph(release.target) | ||||
elif release.target_type == ObjectType.DIRECTORY: | elif release.target_type is ObjectType.DIRECTORY: | ||||
assert release.target, "{release.swhid(}) has no target" | assert release.target, "{release.swhid(}) has no target" | ||||
self._push(self._dir_stack, [release.target]) | self._push(self._dir_stack, [release.target]) | ||||
else: | elif release.target_type is ObjectType.CONTENT: | ||||
raise NotImplementedError( | |||||
f"{release.swhid()} targets a content: {release.target!r}" | |||||
) | |||||
elif release.target_type is ObjectType.RELEASE: | |||||
raise NotImplementedError( | raise NotImplementedError( | ||||
f"{release.swhid()} targets {release.target_type}" | f"{release.swhid()} targets another release: {release.target!r}" | ||||
) | |||||
elif release.target_type is ObjectType.SNAPSHOT: | |||||
raise NotImplementedError( | |||||
f"{release.swhid()} targets a snapshot: {release.target!r}" | |||||
) | |||||
else: | |||||
assert_never( | |||||
release.target_type, | |||||
f"Unexpected release target type: {release.target_type}", | |||||
) | ) | ||||
def write_release_node(self, release: Dict[str, Any]) -> bool: | def write_release_node(self, release: Dict[str, Any]) -> bool: | ||||
"""Writes a release object to disk""" | """Writes a release object to disk""" | ||||
git_object = identifiers.release_git_object(release) | git_object = identifiers.release_git_object(release) | ||||
return self.write_object(release["id"], git_object) | return self.write_object(release["id"], git_object) | ||||
def load_directories(self, obj_ids: List[Sha1Git]) -> None: | def load_directories(self, obj_ids: List[Sha1Git]) -> None: | ||||
Show All 37 Lines | def load_contents(self, obj_ids: List[Sha1Git]) -> None: | ||||
# FIXME: this may also happen for missing content | # FIXME: this may also happen for missing content | ||||
self.write_content(obj_id, SKIPPED_MESSAGE) | self.write_content(obj_id, SKIPPED_MESSAGE) | ||||
self._expect_mismatched_object_error(obj_id) | self._expect_mismatched_object_error(obj_id) | ||||
elif content.status == "visible": | elif content.status == "visible": | ||||
visible_contents.append(content) | visible_contents.append(content) | ||||
elif content.status == "hidden": | elif content.status == "hidden": | ||||
self.write_content(obj_id, HIDDEN_MESSAGE) | self.write_content(obj_id, HIDDEN_MESSAGE) | ||||
self._expect_mismatched_object_error(obj_id) | self._expect_mismatched_object_error(obj_id) | ||||
elif content.status == "absent": | |||||
assert False, f"content_get returned absent content {content.swhid()}" | |||||
else: | else: | ||||
assert False, ( | # TODO: When content.status will have type Literal, replace this with | ||||
f"unexpected status {content.status!r} " | # assert_never | ||||
f"for content {hash_to_hex(content.sha1_git)}" | assert False, f"{content.swhid} has status: {content.status!r}" | ||||
) | |||||
contents_and_data: Iterator[Tuple[Content, Optional[bytes]]] | contents_and_data: Iterator[Tuple[Content, Optional[bytes]]] | ||||
if self.objstorage is None: | if self.objstorage is None: | ||||
contents_and_data = ( | contents_and_data = ( | ||||
(content, self.storage.content_get_data(content.sha1)) | (content, self.storage.content_get_data(content.sha1)) | ||||
for content in visible_contents | for content in visible_contents | ||||
) | ) | ||||
else: | else: | ||||
Show All 34 Lines |