diff --git a/requirements-swh.txt b/requirements-swh.txt index 969da2e..4534fd9 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,5 +1,5 @@ -swh.core[db,http] >= 0.14.0 +swh.core[db,http] >= 0.14.4 swh.model >= 0.3 swh.objstorage >= 0.0.17 swh.scheduler >= 0.7.0 swh.storage >= 0.29.0 diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py index f88f49f..5473ff6 100644 --- a/swh/vault/cookers/base.py +++ b/swh/vault/cookers/base.py @@ -1,149 +1,151 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import io import logging from typing import Optional from psycopg2.extensions import QueryCanceledError from swh.model import hashutil +from swh.model.model import Sha1Git +from swh.storage.interface import StorageInterface MAX_BUNDLE_SIZE = 2 ** 29 # 512 MiB DEFAULT_CONFIG_PATH = "vault/cooker" DEFAULT_CONFIG = { "max_bundle_size": ("int", MAX_BUNDLE_SIZE), } class PolicyError(Exception): """Raised when the bundle violates the cooking policy.""" pass class BundleTooLargeError(PolicyError): """Raised when the bundle is too large to be cooked.""" pass class BytesIOBundleSizeLimit(io.BytesIO): def __init__(self, *args, size_limit=None, **kwargs): super().__init__(*args, **kwargs) self.size_limit = size_limit def write(self, chunk): if ( self.size_limit is not None and self.getbuffer().nbytes + len(chunk) > self.size_limit ): raise BundleTooLargeError( "The requested bundle exceeds the maximum allowed " "size of {} bytes.".format(self.size_limit) ) return super().write(chunk) class BaseVaultCooker(metaclass=abc.ABCMeta): """Abstract base class for the vault's bundle creators This class describes a common API for the cookers. To define a new cooker, inherit from this class and override: - CACHE_TYPE_KEY: key to use for the bundle to reference in cache - def cook(): cook the object into a bundle """ CACHE_TYPE_KEY = None # type: Optional[str] def __init__( self, - obj_type, - obj_id, + obj_type: str, + obj_id: Sha1Git, backend, - storage, + storage: StorageInterface, graph=None, objstorage=None, - max_bundle_size=MAX_BUNDLE_SIZE, + max_bundle_size: int = MAX_BUNDLE_SIZE, ): """Initialize the cooker. The type of the object represented by the id depends on the concrete class. Very likely, each type of bundle will have its own cooker class. Args: obj_type: type of the object to be cooked into a bundle (directory, revision_flat or revision_gitfast; see swh.vault.cooker.COOKER_TYPES). obj_id: id of the object to be cooked into a bundle. backend: the vault backend (swh.vault.backend.VaultBackend). """ self.obj_type = obj_type self.obj_id = hashutil.hash_to_bytes(obj_id) self.backend = backend self.storage = storage self.objstorage = objstorage self.graph = graph self.max_bundle_size = max_bundle_size @abc.abstractmethod def check_exists(self): """Checks that the requested object exists and can be cooked. Override this in the cooker implementation. """ raise NotImplementedError @abc.abstractmethod def prepare_bundle(self): """Implementation of the cooker. Yields chunks of the bundle bytes. Override this with the cooker implementation. """ raise NotImplementedError def cache_type_key(self) -> str: assert self.CACHE_TYPE_KEY return self.CACHE_TYPE_KEY def write(self, chunk): self.fileobj.write(chunk) def cook(self): """Cook the requested object into a bundle """ self.backend.set_status(self.obj_type, self.obj_id, "pending") self.backend.set_progress(self.obj_type, self.obj_id, "Processing...") self.fileobj = BytesIOBundleSizeLimit(size_limit=self.max_bundle_size) try: try: self.prepare_bundle() except QueryCanceledError: raise PolicyError( "Timeout reached while assembling the requested bundle" ) bundle = self.fileobj.getvalue() # TODO: use proper content streaming instead of put_bundle() self.backend.put_bundle(self.cache_type_key(), self.obj_id, bundle) except PolicyError as e: self.backend.set_status(self.obj_type, self.obj_id, "failed") self.backend.set_progress(self.obj_type, self.obj_id, str(e)) except Exception: self.backend.set_status(self.obj_type, self.obj_id, "failed") self.backend.set_progress( self.obj_type, self.obj_id, "Internal Server Error. This incident will be reported.", ) logging.exception("Bundle cooking failed.") else: self.backend.set_status(self.obj_type, self.obj_id, "done") self.backend.set_progress(self.obj_type, self.obj_id, None) finally: self.backend.send_notif(self.obj_type, self.obj_id) diff --git a/swh/vault/cookers/git_bare.py b/swh/vault/cookers/git_bare.py index d6f8ae7..379f55a 100644 --- a/swh/vault/cookers/git_bare.py +++ b/swh/vault/cookers/git_bare.py @@ -1,468 +1,501 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """ This cooker creates tarballs containing a bare .git directory, that can be unpacked and cloned like any git repository. It works in three steps: 1. Write objects one by one in :file:`.git/objects/` 2. Calls ``git repack`` to pack all these objects into git packfiles. 3. Creates a tarball of the resulting repository It keeps a set of all written (or about-to-be-written) object hashes in memory to avoid downloading and writing the same objects twice. """ import datetime +import logging import os.path import re import subprocess import tarfile import tempfile -from typing import Any, Dict, Iterable, List, Optional, Set +from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple import zlib -from swh.core.api.classes import stream_results +from swh.core.api.classes import stream_results_optional from swh.model import identifiers from swh.model.hashutil import hash_to_bytehex, hash_to_hex from swh.model.model import ( + Content, + DirectoryEntry, ObjectType, Person, Revision, RevisionType, Sha1Git, TargetType, TimestampWithTimezone, ) from swh.storage.algos.revisions_walker import DFSRevisionsWalker from swh.storage.algos.snapshot import snapshot_get_all_branches from swh.vault.cookers.base import BaseVaultCooker from swh.vault.to_disk import HIDDEN_MESSAGE, SKIPPED_MESSAGE RELEASE_BATCH_SIZE = 10000 REVISION_BATCH_SIZE = 10000 DIRECTORY_BATCH_SIZE = 10000 CONTENT_BATCH_SIZE = 100 +logger = logging.getLogger(__name__) + + class GitBareCooker(BaseVaultCooker): use_fsck = True def cache_type_key(self) -> str: return self.obj_type def check_exists(self): obj_type = self.obj_type.split("_")[0] if obj_type == "revision": return not list(self.storage.revision_missing([self.obj_id])) elif obj_type == "directory": return not list(self.storage.directory_missing([self.obj_id])) if obj_type == "snapshot": return not list(self.storage.snapshot_missing([self.obj_id])) else: raise NotImplementedError(f"GitBareCooker for {obj_type}") def obj_swhid(self) -> identifiers.CoreSWHID: obj_type = self.obj_type.split("_")[0] return identifiers.CoreSWHID( object_type=identifiers.ObjectType[obj_type.upper()], object_id=self.obj_id, ) def _push(self, stack: List[Sha1Git], obj_ids: Iterable[Sha1Git]) -> None: assert not isinstance(obj_ids, bytes) revision_ids = [id_ for id_ in obj_ids if id_ not in self._seen] self._seen.update(revision_ids) stack.extend(revision_ids) def _pop(self, stack: List[Sha1Git], n: int) -> List[Sha1Git]: obj_ids = stack[-n:] stack[-n:] = [] return obj_ids def prepare_bundle(self): # Objects we will visit soon: self._rel_stack: List[Sha1Git] = [] self._rev_stack: List[Sha1Git] = [] self._dir_stack: List[Sha1Git] = [] self._cnt_stack: List[Sha1Git] = [] # Set of objects already in any of the stacks: self._seen: Set[Sha1Git] = set() self._walker_state: Optional[Any] = None # Set of errors we expect git-fsck to raise at the end: self._expected_fsck_errors = set() with tempfile.TemporaryDirectory(prefix="swh-vault-gitbare-") as workdir: # Initialize a Git directory self.workdir = workdir self.gitdir = os.path.join(workdir, "clone.git") os.mkdir(self.gitdir) self.init_git() # Add the root object to the stack of objects to visit self.push_subgraph(self.obj_type.split("_")[0], self.obj_id) # Load and write all the objects to disk self.load_objects() # Write the root object as a ref (this step is skipped if it's a snapshot) # This must be done before repacking; git-repack ignores orphan objects. self.write_refs() self.repack() self.write_archive() def init_git(self) -> None: subprocess.run(["git", "-C", self.gitdir, "init", "--bare"], check=True) # Create all possible dirs ahead of time, so we don't have to check for # existence every time. for byte in range(256): os.mkdir(os.path.join(self.gitdir, "objects", f"{byte:02x}")) def repack(self) -> None: if self.use_fsck: self.git_fsck() # Add objects we wrote in a pack subprocess.run(["git", "-C", self.gitdir, "repack"], check=True) # Remove their non-packed originals subprocess.run(["git", "-C", self.gitdir, "prune-packed"], check=True) def git_fsck(self) -> None: proc = subprocess.run( ["git", "-C", self.gitdir, "fsck"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env={"LANG": "C.utf8"}, ) # Split on newlines not followed by a space errors = re.split("\n(?! )", proc.stdout.decode()) errors = [ error for error in errors if error and not error.startswith("warning ") ] unexpected_errors = set(errors) - self._expected_fsck_errors if unexpected_errors: raise Exception( "\n".join( ["Unexpected errors from git-fsck:"] + sorted(unexpected_errors) ) ) def write_refs(self, snapshot=None): refs: Dict[bytes, bytes] # ref name -> target obj_type = self.obj_type.split("_")[0] if obj_type == "directory": # We need a synthetic revision pointing to the directory author = Person.from_fullname( b"swh-vault, git-bare cooker " ) dt = datetime.datetime.now(tz=datetime.timezone.utc) dt = dt.replace(microsecond=0) # not supported by git date = TimestampWithTimezone.from_datetime(dt) revision = Revision( author=author, committer=author, date=date, committer_date=date, message=b"Initial commit", type=RevisionType.GIT, directory=self.obj_id, synthetic=True, ) self.write_revision_node(revision.to_dict()) refs = {b"refs/heads/master": hash_to_bytehex(revision.id)} elif obj_type == "revision": refs = {b"refs/heads/master": hash_to_bytehex(self.obj_id)} elif obj_type == "snapshot": if snapshot is None: # refs were already written in a previous step return refs = { branch_name: ( b"ref: " + branch.target if branch.target_type == TargetType.ALIAS else hash_to_bytehex(branch.target) ) for (branch_name, branch) in snapshot.branches.items() } else: assert False, obj_type for (ref_name, ref_target) in refs.items(): path = os.path.join(self.gitdir.encode(), ref_name) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "wb") as fd: fd.write(ref_target) def write_archive(self): with tarfile.TarFile(mode="w", fileobj=self.fileobj) as tf: tf.add(self.gitdir, arcname=f"{self.obj_swhid()}.git", recursive=True) def _obj_path(self, obj_id: Sha1Git): return os.path.join(self.gitdir, self._obj_relative_path(obj_id)) def _obj_relative_path(self, obj_id: Sha1Git): obj_id_hex = hash_to_hex(obj_id) directory = obj_id_hex[0:2] filename = obj_id_hex[2:] return os.path.join("objects", directory, filename) def object_exists(self, obj_id: Sha1Git) -> bool: return os.path.exists(self._obj_path(obj_id)) def write_object(self, obj_id: Sha1Git, obj: bytes) -> bool: """Writes a git object on disk. Returns whether it was already written.""" # Git requires objects to be zlib-compressed; but repacking decompresses and # removes them, so we don't need to compress them too much. data = zlib.compress(obj, level=1) with open(self._obj_path(obj_id), "wb") as fd: fd.write(data) return True def push_subgraph(self, obj_type, obj_id) -> None: if obj_type == "revision": self.push_revision_subgraph(obj_id) elif obj_type == "directory": self._push(self._dir_stack, [obj_id]) elif obj_type == "snapshot": self.push_snapshot_subgraph(obj_id) else: raise NotImplementedError( f"GitBareCooker.queue_subgraph({obj_type!r}, ...)" ) def load_objects(self) -> None: while self._rel_stack or self._rev_stack or self._dir_stack or self._cnt_stack: release_ids = self._pop(self._rel_stack, RELEASE_BATCH_SIZE) self.push_releases_subgraphs(release_ids) revision_ids = self._pop(self._rev_stack, REVISION_BATCH_SIZE) self.load_revisions(revision_ids) directory_ids = self._pop(self._dir_stack, DIRECTORY_BATCH_SIZE) self.load_directories(directory_ids) content_ids = self._pop(self._cnt_stack, CONTENT_BATCH_SIZE) self.load_contents(content_ids) def push_revision_subgraph(self, obj_id: Sha1Git) -> None: """Fetches a revision and all its children, and writes them to disk""" loaded_from_graph = False if self.graph: from swh.graph.client import GraphArgumentException # First, try to cook using swh-graph, as it is more efficient than # swh-storage for querying the history obj_swhid = identifiers.CoreSWHID( object_type=identifiers.ObjectType.REVISION, object_id=obj_id, ) try: revision_ids = ( swhid.object_id for swhid in map( identifiers.CoreSWHID.from_string, self.graph.visit_nodes(str(obj_swhid), edges="rev:rev"), ) ) self._push(self._rev_stack, revision_ids) except GraphArgumentException: # Revision not found in the graph pass else: loaded_from_graph = True if not loaded_from_graph: # If swh-graph is not available, or the revision is not yet in # swh-graph, fall back to self.storage.revision_log. # self.storage.revision_log also gives us the full revisions, # so we load them right now instead of just pushing them on the stack. walker = DFSRevisionsWalker(self.storage, obj_id, state=self._walker_state) for revision in walker: self.write_revision_node(revision) self._push(self._dir_stack, [revision["directory"]]) # Save the state, so the next call to the walker won't return the same # revisions self._walker_state = walker.export_state() def push_snapshot_subgraph(self, obj_id: Sha1Git) -> None: """Fetches a snapshot and all its children, and writes them to disk""" loaded_from_graph = False if self.graph: revision_ids = [] release_ids = [] from swh.graph.client import GraphArgumentException # First, try to cook using swh-graph, as it is more efficient than # swh-storage for querying the history obj_swhid = identifiers.CoreSWHID( object_type=identifiers.ObjectType.SNAPSHOT, object_id=obj_id, ) try: swhids = map( identifiers.CoreSWHID.from_string, self.graph.visit_nodes(str(obj_swhid), edges="snp:*,rel:*,rev:rev"), ) for swhid in swhids: if swhid.object_type == identifiers.ObjectType.REVISION: revision_ids.append(swhid.object_id) elif swhid.object_type == identifiers.ObjectType.RELEASE: release_ids.append(swhid.object_id) elif swhid.object_type == identifiers.ObjectType.SNAPSHOT: assert ( swhid.object_id == obj_id ), f"Snapshot {obj_id.hex()} references a different snapshot" else: raise NotImplementedError( f"{swhid.object_type} objects in snapshot subgraphs." ) except GraphArgumentException: # Revision not found in the graph pass else: self._push(self._rev_stack, revision_ids) self._push(self._rel_stack, release_ids) loaded_from_graph = True # TODO: when self.graph is available and supports edge labels, use it # directly to get branch names. snapshot = snapshot_get_all_branches(self.storage, obj_id) assert snapshot, "Unknown snapshot" # should have been caught by check_exists() for branch in snapshot.branches.values(): if not loaded_from_graph: if branch.target_type == TargetType.REVISION: self.push_revision_subgraph(branch.target) elif branch.target_type == TargetType.RELEASE: self.push_releases_subgraphs([branch.target]) elif branch.target_type == TargetType.ALIAS: # Nothing to do, this for loop also iterates on the target branch # (if it exists) pass else: raise NotImplementedError(f"{branch.target_type} branches") self.write_refs(snapshot=snapshot) def load_revisions(self, obj_ids: List[Sha1Git]) -> None: """Given a list of revision ids, loads these revisions and their directories; but not their parent revisions.""" - revisions = self.storage.revision_get(obj_ids) + ret: List[Optional[Revision]] = self.storage.revision_get(obj_ids) + + revisions: List[Revision] = list(filter(None, ret)) + if len(ret) != len(revisions): + logger.error("Missing revision(s), ignoring them.") + for revision in revisions: self.write_revision_node(revision.to_dict()) self._push(self._dir_stack, (rev.directory for rev in revisions)) def write_revision_node(self, revision: Dict[str, Any]) -> bool: """Writes a revision object to disk""" git_object = identifiers.revision_git_object(revision) return self.write_object(revision["id"], git_object) def push_releases_subgraphs(self, obj_ids: List[Sha1Git]) -> None: """Given a list of release ids, loads these releases and adds their target to the list of objects to visit""" - releases = self.storage.release_get(obj_ids) + ret = self.storage.release_get(obj_ids) + + releases = list(filter(None, ret)) + if len(ret) != len(releases): + logger.error("Missing release(s), ignoring them.") + revision_ids: List[Sha1Git] = [] for release in releases: self.write_release_node(release.to_dict()) if release.target_type == ObjectType.REVISION: + assert release.target, "{release.swhid(}) has no target" self.push_revision_subgraph(release.target) else: raise NotImplementedError(f"{release.target_type} release targets") self._push(self._rev_stack, revision_ids) def write_release_node(self, release: Dict[str, Any]) -> bool: """Writes a release object to disk""" git_object = identifiers.release_git_object(release) return self.write_object(release["id"], git_object) def load_directories(self, obj_ids: List[Sha1Git]) -> None: for obj_id in obj_ids: self.load_directory(obj_id) def load_directory(self, obj_id: Sha1Git) -> None: # Load the directory - entries = [ - entry.to_dict() - for entry in stream_results(self.storage.directory_get_entries, obj_id) - ] + entries_it: Optional[Iterable[DirectoryEntry]] = stream_results_optional( + self.storage.directory_get_entries, obj_id + ) + + if entries_it is None: + logger.error("Missing swh:1:dir:%s, ignoring.", hash_to_hex(obj_id)) + return + + entries = [entry.to_dict() for entry in entries_it] directory = {"id": obj_id, "entries": entries} git_object = identifiers.directory_git_object(directory) self.write_object(obj_id, git_object) # Add children to the stack entry_loaders: Dict[str, List[Sha1Git]] = { "file": self._cnt_stack, "dir": self._dir_stack, "rev": self._rev_stack, } for entry in directory["entries"]: stack = entry_loaders[entry["type"]] self._push(stack, [entry["target"]]) def load_contents(self, obj_ids: List[Sha1Git]) -> None: # TODO: add support of filtered objects, somehow? # It's tricky, because, by definition, we can't write a git object with # the expected hash, so git-fsck *will* choke on it. contents = self.storage.content_get(obj_ids, "sha1_git") visible_contents = [] for (obj_id, content) in zip(obj_ids, contents): if content is None: # FIXME: this may also happen for missing content self.write_content(obj_id, SKIPPED_MESSAGE) self._expect_mismatched_object_error(obj_id) elif content.status == "visible": visible_contents.append(content) elif content.status == "hidden": self.write_content(obj_id, HIDDEN_MESSAGE) self._expect_mismatched_object_error(obj_id) else: assert False, ( f"unexpected status {content.status!r} " f"for content {hash_to_hex(content.sha1_git)}" ) + contents_and_data: Iterator[Tuple[Content, Optional[bytes]]] if self.objstorage is None: - for content in visible_contents: - data = self.storage.content_get_data(content.sha1) - self.write_content(content.sha1_git, data) + contents_and_data = ( + (content, self.storage.content_get_data(content.sha1)) + for content in visible_contents + ) else: - content_data = self.objstorage.get_batch(c.sha1 for c in visible_contents) - for (content, data) in zip(contents, content_data): - self.write_content(content.sha1_git, data) + contents_and_data = zip( + visible_contents, + self.objstorage.get_batch(c.sha1 for c in visible_contents), + ) + + for (content, datum) in contents_and_data: + if datum is None: + logger.error( + "{content.swhid()} is visible, but is missing data. Skipping." + ) + continue + self.write_content(content.sha1_git, datum) def write_content(self, obj_id: Sha1Git, content: bytes) -> None: header = identifiers.git_object_header("blob", len(content)) self.write_object(obj_id, header + content) def _expect_mismatched_object_error(self, obj_id): obj_id_hex = hash_to_hex(obj_id) obj_path = self._obj_relative_path(obj_id) # For Git < 2.21: self._expected_fsck_errors.add( f"error: sha1 mismatch for ./{obj_path} (expected {obj_id_hex})" ) # For Git >= 2.21: self._expected_fsck_errors.add( f"error: hash mismatch for ./{obj_path} (expected {obj_id_hex})" ) self._expected_fsck_errors.add( f"error: {obj_id_hex}: object corrupt or missing: ./{obj_path}" ) self._expected_fsck_errors.add(f"missing blob {obj_id_hex}")