diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,4 +1,4 @@ -swh.core[db,http] >= 0.14.0 +swh.core[db,http] >= v0.14.4 swh.model >= 0.3 swh.objstorage >= 0.0.17 swh.scheduler >= 0.7.0 diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py --- a/swh/vault/cookers/base.py +++ b/swh/vault/cookers/base.py @@ -11,6 +11,8 @@ from psycopg2.extensions import QueryCanceledError from swh.model import hashutil +from swh.model.model import Sha1Git +from swh.storage.interface import StorageInterface MAX_BUNDLE_SIZE = 2 ** 29 # 512 MiB DEFAULT_CONFIG_PATH = "vault/cooker" @@ -62,13 +64,13 @@ def __init__( self, - obj_type, - obj_id, + obj_type: str, + obj_id: Sha1Git, backend, - storage, + storage: StorageInterface, graph=None, objstorage=None, - max_bundle_size=MAX_BUNDLE_SIZE, + max_bundle_size: int = MAX_BUNDLE_SIZE, ): """Initialize the cooker. diff --git a/swh/vault/cookers/git_bare.py b/swh/vault/cookers/git_bare.py --- a/swh/vault/cookers/git_bare.py +++ b/swh/vault/cookers/git_bare.py @@ -18,18 +18,20 @@ """ import datetime +import logging import os.path import re import subprocess import tarfile import tempfile -from typing import Any, Dict, Iterable, List, Optional, Set +from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple import zlib -from swh.core.api.classes import stream_results +from swh.core.api.classes import stream_results_optional from swh.model import identifiers from swh.model.hashutil import hash_to_bytehex, hash_to_hex from swh.model.model import ( + Content, ObjectType, Person, Revision, @@ -49,6 +51,9 @@ CONTENT_BATCH_SIZE = 100 +logger = logging.getLogger(__name__) + + class GitBareCooker(BaseVaultCooker): use_fsck = True @@ -361,9 +366,15 @@ def load_revisions(self, obj_ids: List[Sha1Git]) -> None: """Given a list of revision ids, loads these revisions and their directories; but not their parent revisions.""" - revisions = self.storage.revision_get(obj_ids) + ret: List[Optional[Revision]] = self.storage.revision_get(obj_ids) + + revisions = list(filter(None, ret)) + if len(ret) != len(revisions): + logger.error("Missing revision(s), ignoring them.") + for revision in revisions: - self.write_revision_node(revision.to_dict()) + if revision is not None: + self.write_revision_node(revision.to_dict()) self._push(self._dir_stack, (rev.directory for rev in revisions)) def write_revision_node(self, revision: Dict[str, Any]) -> bool: @@ -374,11 +385,17 @@ def push_releases_subgraphs(self, obj_ids: List[Sha1Git]) -> None: """Given a list of release ids, loads these releases and adds their target to the list of objects to visit""" - releases = self.storage.release_get(obj_ids) + ret = self.storage.release_get(obj_ids) + + releases = list(filter(None, ret)) + if len(ret) != len(releases): + logger.error("Missing release(s), ignoring them.") + revision_ids: List[Sha1Git] = [] for release in releases: self.write_release_node(release.to_dict()) if release.target_type == ObjectType.REVISION: + assert release.target, "{release.swhid(}) has no target" self.push_revision_subgraph(release.target) else: raise NotImplementedError(f"{release.target_type} release targets") @@ -395,10 +412,13 @@ def load_directory(self, obj_id: Sha1Git) -> None: # Load the directory - entries = [ - entry.to_dict() - for entry in stream_results(self.storage.directory_get_entries, obj_id) - ] + entries_it = stream_results_optional(self.storage.directory_get_entries, obj_id) + + if entries_it is None: + logger.error("Missing swh:1:dir:%s, ignoring.", hash_to_hex(obj_id)) + return + + entries = [entry.to_dict() for entry in entries_it] directory = {"id": obj_id, "entries": entries} git_object = identifiers.directory_git_object(directory) self.write_object(obj_id, git_object) @@ -436,14 +456,25 @@ f"for content {hash_to_hex(content.sha1_git)}" ) + contents_and_data: Iterator[Tuple[Content, Optional[bytes]]] if self.objstorage is None: - for content in visible_contents: - data = self.storage.content_get_data(content.sha1) - self.write_content(content.sha1_git, data) + contents_and_data = ( + (content, self.storage.content_get_data(content.sha1)) + for content in visible_contents + ) else: - content_data = self.objstorage.get_batch(c.sha1 for c in visible_contents) - for (content, data) in zip(contents, content_data): - self.write_content(content.sha1_git, data) + contents_and_data = zip( + visible_contents, + self.objstorage.get_batch(c.sha1 for c in visible_contents), + ) + + for (content, datum) in contents_and_data: + if datum is None: + logger.error( + "{content.swhid()} is visible, but is missing data. Skipping." + ) + continue + self.write_content(content.sha1_git, datum) def write_content(self, obj_id: Sha1Git, content: bytes) -> None: header = identifiers.git_object_header("blob", len(content))