Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/cookers/git_bare.py
Show All 12 Lines | |||||||||||||||||
2. Calls ``git repack`` to pack all these objects into git packfiles. | 2. Calls ``git repack`` to pack all these objects into git packfiles. | ||||||||||||||||
3. Creates a tarball of the resulting repository | 3. Creates a tarball of the resulting repository | ||||||||||||||||
It keeps a set of all written (or about-to-be-written) object hashes in memory | It keeps a set of all written (or about-to-be-written) object hashes in memory | ||||||||||||||||
to avoid downloading and writing the same objects twice. | to avoid downloading and writing the same objects twice. | ||||||||||||||||
""" | """ | ||||||||||||||||
import datetime | import datetime | ||||||||||||||||
import logging | |||||||||||||||||
import os.path | import os.path | ||||||||||||||||
import re | import re | ||||||||||||||||
import subprocess | import subprocess | ||||||||||||||||
import tarfile | import tarfile | ||||||||||||||||
import tempfile | import tempfile | ||||||||||||||||
from typing import Any, Dict, Iterable, List, Optional, Set | from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple | ||||||||||||||||
import zlib | import zlib | ||||||||||||||||
from swh.core.api.classes import stream_results | from swh.core.api.classes import stream_results_optional | ||||||||||||||||
ardumont: it's not only typing changes here, ain't it? | |||||||||||||||||
from swh.model import identifiers | from swh.model import identifiers | ||||||||||||||||
from swh.model.hashutil import hash_to_bytehex, hash_to_hex | from swh.model.hashutil import hash_to_bytehex, hash_to_hex | ||||||||||||||||
from swh.model.model import ( | from swh.model.model import ( | ||||||||||||||||
Content, | |||||||||||||||||
DirectoryEntry, | |||||||||||||||||
ObjectType, | ObjectType, | ||||||||||||||||
Person, | Person, | ||||||||||||||||
Revision, | Revision, | ||||||||||||||||
RevisionType, | RevisionType, | ||||||||||||||||
Sha1Git, | Sha1Git, | ||||||||||||||||
TargetType, | TargetType, | ||||||||||||||||
TimestampWithTimezone, | TimestampWithTimezone, | ||||||||||||||||
) | ) | ||||||||||||||||
from swh.storage.algos.revisions_walker import DFSRevisionsWalker | from swh.storage.algos.revisions_walker import DFSRevisionsWalker | ||||||||||||||||
from swh.storage.algos.snapshot import snapshot_get_all_branches | from swh.storage.algos.snapshot import snapshot_get_all_branches | ||||||||||||||||
from swh.vault.cookers.base import BaseVaultCooker | from swh.vault.cookers.base import BaseVaultCooker | ||||||||||||||||
from swh.vault.to_disk import HIDDEN_MESSAGE, SKIPPED_MESSAGE | from swh.vault.to_disk import HIDDEN_MESSAGE, SKIPPED_MESSAGE | ||||||||||||||||
RELEASE_BATCH_SIZE = 10000 | RELEASE_BATCH_SIZE = 10000 | ||||||||||||||||
REVISION_BATCH_SIZE = 10000 | REVISION_BATCH_SIZE = 10000 | ||||||||||||||||
DIRECTORY_BATCH_SIZE = 10000 | DIRECTORY_BATCH_SIZE = 10000 | ||||||||||||||||
CONTENT_BATCH_SIZE = 100 | CONTENT_BATCH_SIZE = 100 | ||||||||||||||||
logger = logging.getLogger(__name__) | |||||||||||||||||
class GitBareCooker(BaseVaultCooker): | class GitBareCooker(BaseVaultCooker): | ||||||||||||||||
use_fsck = True | use_fsck = True | ||||||||||||||||
def cache_type_key(self) -> str: | def cache_type_key(self) -> str: | ||||||||||||||||
return self.obj_type | return self.obj_type | ||||||||||||||||
def check_exists(self): | def check_exists(self): | ||||||||||||||||
obj_type = self.obj_type.split("_")[0] | obj_type = self.obj_type.split("_")[0] | ||||||||||||||||
▲ Show 20 Lines • Show All 296 Lines • ▼ Show 20 Lines | def push_snapshot_subgraph(self, obj_id: Sha1Git) -> None: | ||||||||||||||||
else: | else: | ||||||||||||||||
raise NotImplementedError(f"{branch.target_type} branches") | raise NotImplementedError(f"{branch.target_type} branches") | ||||||||||||||||
self.write_refs(snapshot=snapshot) | self.write_refs(snapshot=snapshot) | ||||||||||||||||
def load_revisions(self, obj_ids: List[Sha1Git]) -> None: | def load_revisions(self, obj_ids: List[Sha1Git]) -> None: | ||||||||||||||||
"""Given a list of revision ids, loads these revisions and their directories; | """Given a list of revision ids, loads these revisions and their directories; | ||||||||||||||||
but not their parent revisions.""" | but not their parent revisions.""" | ||||||||||||||||
revisions = self.storage.revision_get(obj_ids) | ret: List[Optional[Revision]] = self.storage.revision_get(obj_ids) | ||||||||||||||||
revisions: List[Revision] = list(filter(None, ret)) | |||||||||||||||||
if len(ret) != len(revisions): | |||||||||||||||||
Not Done Inline Actionslogger.warning instead? You don't stop the process so i'm not sure error qualifies. ardumont: logger.warning instead?
You don't stop the process so i'm not sure error qualifies. | |||||||||||||||||
logger.error("Missing revision(s), ignoring them.") | |||||||||||||||||
for revision in revisions: | for revision in revisions: | ||||||||||||||||
self.write_revision_node(revision.to_dict()) | self.write_revision_node(revision.to_dict()) | ||||||||||||||||
Done Inline Actions
i found the following clearer. ardumont: i found the following clearer. | |||||||||||||||||
self._push(self._dir_stack, (rev.directory for rev in revisions)) | self._push(self._dir_stack, (rev.directory for rev in revisions)) | ||||||||||||||||
def write_revision_node(self, revision: Dict[str, Any]) -> bool: | def write_revision_node(self, revision: Dict[str, Any]) -> bool: | ||||||||||||||||
"""Writes a revision object to disk""" | """Writes a revision object to disk""" | ||||||||||||||||
git_object = identifiers.revision_git_object(revision) | git_object = identifiers.revision_git_object(revision) | ||||||||||||||||
return self.write_object(revision["id"], git_object) | return self.write_object(revision["id"], git_object) | ||||||||||||||||
def push_releases_subgraphs(self, obj_ids: List[Sha1Git]) -> None: | def push_releases_subgraphs(self, obj_ids: List[Sha1Git]) -> None: | ||||||||||||||||
"""Given a list of release ids, loads these releases and adds their | """Given a list of release ids, loads these releases and adds their | ||||||||||||||||
target to the list of objects to visit""" | target to the list of objects to visit""" | ||||||||||||||||
releases = self.storage.release_get(obj_ids) | ret = self.storage.release_get(obj_ids) | ||||||||||||||||
releases = list(filter(None, ret)) | |||||||||||||||||
if len(ret) != len(releases): | |||||||||||||||||
logger.error("Missing release(s), ignoring them.") | |||||||||||||||||
revision_ids: List[Sha1Git] = [] | revision_ids: List[Sha1Git] = [] | ||||||||||||||||
for release in releases: | for release in releases: | ||||||||||||||||
self.write_release_node(release.to_dict()) | self.write_release_node(release.to_dict()) | ||||||||||||||||
if release.target_type == ObjectType.REVISION: | if release.target_type == ObjectType.REVISION: | ||||||||||||||||
assert release.target, "{release.swhid(}) has no target" | |||||||||||||||||
self.push_revision_subgraph(release.target) | self.push_revision_subgraph(release.target) | ||||||||||||||||
else: | else: | ||||||||||||||||
raise NotImplementedError(f"{release.target_type} release targets") | raise NotImplementedError(f"{release.target_type} release targets") | ||||||||||||||||
self._push(self._rev_stack, revision_ids) | self._push(self._rev_stack, revision_ids) | ||||||||||||||||
def write_release_node(self, release: Dict[str, Any]) -> bool: | def write_release_node(self, release: Dict[str, Any]) -> bool: | ||||||||||||||||
"""Writes a release object to disk""" | """Writes a release object to disk""" | ||||||||||||||||
git_object = identifiers.release_git_object(release) | git_object = identifiers.release_git_object(release) | ||||||||||||||||
return self.write_object(release["id"], git_object) | return self.write_object(release["id"], git_object) | ||||||||||||||||
def load_directories(self, obj_ids: List[Sha1Git]) -> None: | def load_directories(self, obj_ids: List[Sha1Git]) -> None: | ||||||||||||||||
for obj_id in obj_ids: | for obj_id in obj_ids: | ||||||||||||||||
self.load_directory(obj_id) | self.load_directory(obj_id) | ||||||||||||||||
def load_directory(self, obj_id: Sha1Git) -> None: | def load_directory(self, obj_id: Sha1Git) -> None: | ||||||||||||||||
# Load the directory | # Load the directory | ||||||||||||||||
entries = [ | entries_it: Optional[Iterable[DirectoryEntry]] = stream_results_optional( | ||||||||||||||||
entry.to_dict() | self.storage.directory_get_entries, obj_id | ||||||||||||||||
for entry in stream_results(self.storage.directory_get_entries, obj_id) | ) | ||||||||||||||||
] | |||||||||||||||||
if entries_it is None: | |||||||||||||||||
logger.error("Missing swh:1:dir:%s, ignoring.", hash_to_hex(obj_id)) | |||||||||||||||||
return | |||||||||||||||||
entries = [entry.to_dict() for entry in entries_it] | |||||||||||||||||
Done Inline Actionsexpliciting the type of entries_it might help. ardumont: expliciting the type of entries_it might help. | |||||||||||||||||
directory = {"id": obj_id, "entries": entries} | directory = {"id": obj_id, "entries": entries} | ||||||||||||||||
git_object = identifiers.directory_git_object(directory) | git_object = identifiers.directory_git_object(directory) | ||||||||||||||||
self.write_object(obj_id, git_object) | self.write_object(obj_id, git_object) | ||||||||||||||||
# Add children to the stack | # Add children to the stack | ||||||||||||||||
entry_loaders: Dict[str, List[Sha1Git]] = { | entry_loaders: Dict[str, List[Sha1Git]] = { | ||||||||||||||||
"file": self._cnt_stack, | "file": self._cnt_stack, | ||||||||||||||||
"dir": self._dir_stack, | "dir": self._dir_stack, | ||||||||||||||||
Show All 21 Lines | def load_contents(self, obj_ids: List[Sha1Git]) -> None: | ||||||||||||||||
self.write_content(obj_id, HIDDEN_MESSAGE) | self.write_content(obj_id, HIDDEN_MESSAGE) | ||||||||||||||||
self._expect_mismatched_object_error(obj_id) | self._expect_mismatched_object_error(obj_id) | ||||||||||||||||
else: | else: | ||||||||||||||||
assert False, ( | assert False, ( | ||||||||||||||||
f"unexpected status {content.status!r} " | f"unexpected status {content.status!r} " | ||||||||||||||||
f"for content {hash_to_hex(content.sha1_git)}" | f"for content {hash_to_hex(content.sha1_git)}" | ||||||||||||||||
) | ) | ||||||||||||||||
contents_and_data: Iterator[Tuple[Content, Optional[bytes]]] | |||||||||||||||||
if self.objstorage is None: | if self.objstorage is None: | ||||||||||||||||
for content in visible_contents: | contents_and_data = ( | ||||||||||||||||
data = self.storage.content_get_data(content.sha1) | (content, self.storage.content_get_data(content.sha1)) | ||||||||||||||||
self.write_content(content.sha1_git, data) | for content in visible_contents | ||||||||||||||||
) | |||||||||||||||||
else: | else: | ||||||||||||||||
content_data = self.objstorage.get_batch(c.sha1 for c in visible_contents) | contents_and_data = zip( | ||||||||||||||||
for (content, data) in zip(contents, content_data): | visible_contents, | ||||||||||||||||
self.write_content(content.sha1_git, data) | self.objstorage.get_batch(c.sha1 for c in visible_contents), | ||||||||||||||||
) | |||||||||||||||||
for (content, datum) in contents_and_data: | |||||||||||||||||
if datum is None: | |||||||||||||||||
logger.error( | |||||||||||||||||
"{content.swhid()} is visible, but is missing data. Skipping." | |||||||||||||||||
) | |||||||||||||||||
continue | |||||||||||||||||
self.write_content(content.sha1_git, datum) | |||||||||||||||||
def write_content(self, obj_id: Sha1Git, content: bytes) -> None: | def write_content(self, obj_id: Sha1Git, content: bytes) -> None: | ||||||||||||||||
header = identifiers.git_object_header("blob", len(content)) | header = identifiers.git_object_header("blob", len(content)) | ||||||||||||||||
self.write_object(obj_id, header + content) | self.write_object(obj_id, header + content) | ||||||||||||||||
def _expect_mismatched_object_error(self, obj_id): | def _expect_mismatched_object_error(self, obj_id): | ||||||||||||||||
obj_id_hex = hash_to_hex(obj_id) | obj_id_hex = hash_to_hex(obj_id) | ||||||||||||||||
obj_path = self._obj_relative_path(obj_id) | obj_path = self._obj_relative_path(obj_id) | ||||||||||||||||
Show All 14 Lines |
it's not only typing changes here, ain't it?