diff --git a/swh/vault/cli.py b/swh/vault/cli.py index f1f649b..8cc4e35 100644 --- a/swh/vault/cli.py +++ b/swh/vault/cli.py @@ -1,167 +1,176 @@ # Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations # WARNING: do not import unnecessary things here to keep cli startup time under # control import logging from typing import TYPE_CHECKING, Optional import click from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup from swh.core.cli import swh as swh_cli_group if TYPE_CHECKING: import io from swh.model.identifiers import CoreSWHID class SwhidParamType(click.ParamType): name = "swhid" def convert(self, value, param, ctx): from swh.model.exceptions import ValidationError from swh.model.identifiers import CoreSWHID try: return CoreSWHID.from_string(value) except ValidationError: self.fail(f"expected core SWHID, got {value!r}", param, ctx) @swh_cli_group.group(name="vault", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup) @click.pass_context def vault(ctx): """Software Heritage Vault tools.""" @vault.command() @click.option( "--config-file", "-C", default=None, metavar="CONFIGFILE", type=click.Path(exists=True, dir_okay=False,), help="Configuration file.", ) @click.argument("swhid", type=SwhidParamType()) @click.argument("outfile", type=click.File("wb")) @click.option( "--cooker-type", type=click.Choice(["flat", "gitfast", "git_bare"]), help="Selects which cooker to use, when there is more than one available " "for the given object type.", ) @click.pass_context def cook( ctx, config_file: str, swhid: CoreSWHID, outfile: io.RawIOBase, cooker_type: Optional[str], ): """ Runs a vault cooker for a single object (identified by a SWHID), and outputs it to the given file. """ from swh.core import config from swh.graph.client import RemoteGraphClient + from swh.objstorage.factory import get_objstorage from swh.storage import get_storage from .cookers import COOKER_TYPES, get_cooker_cls from .in_memory_backend import InMemoryVaultBackend conf = config.read(config_file) supported_object_types = {name.split("_")[0] for name in COOKER_TYPES} if swhid.object_type.name.lower() not in supported_object_types: raise click.ClickException( f"No cooker available for {swhid.object_type.name} objects." ) cooker_name = swhid.object_type.name.lower() if cooker_type: cooker_name = f"{cooker_name}_{cooker_type}" if cooker_name not in COOKER_TYPES: raise click.ClickException( f"{swhid.object_type.name.lower()} objects do not have " f"a {cooker_type} cooker." ) else: if cooker_name not in COOKER_TYPES: raise click.ClickException( f"{swhid.object_type.name.lower()} objects need " f"an explicit --cooker-type." ) backend = InMemoryVaultBackend() storage = get_storage(**conf["storage"]) + objstorage = get_objstorage(**conf["objstorage"]) if "objstorage" in conf else None graph = RemoteGraphClient(**conf["graph"]) if "graph" in conf else None cooker_cls = get_cooker_cls(cooker_name) - cooker = cooker_cls(cooker_name, swhid.object_id, backend, storage, graph) + cooker = cooker_cls( + obj_type=cooker_name, + obj_id=swhid.object_id, + backend=backend, + storage=storage, + graph=graph, + objstorage=objstorage, + ) cooker.cook() bundle = backend.fetch(cooker_name, swhid.object_id) assert bundle, "Cooker did not write a bundle to the backend." outfile.write(bundle) @vault.command(name="rpc-serve") @click.option( "--config-file", "-C", default=None, metavar="CONFIGFILE", type=click.Path(exists=True, dir_okay=False,), help="Configuration file.", ) @click.option( "--host", default="0.0.0.0", metavar="IP", show_default=True, help="Host ip address to bind the server on", ) @click.option( "--port", default=5005, type=click.INT, metavar="PORT", help="Binding port of the server", ) @click.option( "--debug/--no-debug", default=True, help="Indicates if the server should run in debug mode", ) @click.pass_context def serve(ctx, config_file, host, port, debug): """Software Heritage Vault RPC server.""" import aiohttp from swh.vault.api.server import make_app_from_configfile ctx.ensure_object(dict) try: app = make_app_from_configfile(config_file, debug=debug) except EnvironmentError as e: click.echo(e.msg, err=True) ctx.exit(1) aiohttp.web.run_app(app, host=host, port=int(port)) def main(): logging.basicConfig() return serve(auto_envvar_prefix="SWH_VAULT") if __name__ == "__main__": main() diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py index e36341e..f88f49f 100644 --- a/swh/vault/cookers/base.py +++ b/swh/vault/cookers/base.py @@ -1,147 +1,149 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import io import logging from typing import Optional from psycopg2.extensions import QueryCanceledError from swh.model import hashutil MAX_BUNDLE_SIZE = 2 ** 29 # 512 MiB DEFAULT_CONFIG_PATH = "vault/cooker" DEFAULT_CONFIG = { "max_bundle_size": ("int", MAX_BUNDLE_SIZE), } class PolicyError(Exception): """Raised when the bundle violates the cooking policy.""" pass class BundleTooLargeError(PolicyError): """Raised when the bundle is too large to be cooked.""" pass class BytesIOBundleSizeLimit(io.BytesIO): def __init__(self, *args, size_limit=None, **kwargs): super().__init__(*args, **kwargs) self.size_limit = size_limit def write(self, chunk): if ( self.size_limit is not None and self.getbuffer().nbytes + len(chunk) > self.size_limit ): raise BundleTooLargeError( "The requested bundle exceeds the maximum allowed " "size of {} bytes.".format(self.size_limit) ) return super().write(chunk) class BaseVaultCooker(metaclass=abc.ABCMeta): """Abstract base class for the vault's bundle creators This class describes a common API for the cookers. To define a new cooker, inherit from this class and override: - CACHE_TYPE_KEY: key to use for the bundle to reference in cache - def cook(): cook the object into a bundle """ CACHE_TYPE_KEY = None # type: Optional[str] def __init__( self, obj_type, obj_id, backend, storage, graph=None, + objstorage=None, max_bundle_size=MAX_BUNDLE_SIZE, ): """Initialize the cooker. The type of the object represented by the id depends on the concrete class. Very likely, each type of bundle will have its own cooker class. Args: obj_type: type of the object to be cooked into a bundle (directory, revision_flat or revision_gitfast; see swh.vault.cooker.COOKER_TYPES). obj_id: id of the object to be cooked into a bundle. backend: the vault backend (swh.vault.backend.VaultBackend). """ self.obj_type = obj_type self.obj_id = hashutil.hash_to_bytes(obj_id) self.backend = backend self.storage = storage + self.objstorage = objstorage self.graph = graph self.max_bundle_size = max_bundle_size @abc.abstractmethod def check_exists(self): """Checks that the requested object exists and can be cooked. Override this in the cooker implementation. """ raise NotImplementedError @abc.abstractmethod def prepare_bundle(self): """Implementation of the cooker. Yields chunks of the bundle bytes. Override this with the cooker implementation. """ raise NotImplementedError def cache_type_key(self) -> str: assert self.CACHE_TYPE_KEY return self.CACHE_TYPE_KEY def write(self, chunk): self.fileobj.write(chunk) def cook(self): """Cook the requested object into a bundle """ self.backend.set_status(self.obj_type, self.obj_id, "pending") self.backend.set_progress(self.obj_type, self.obj_id, "Processing...") self.fileobj = BytesIOBundleSizeLimit(size_limit=self.max_bundle_size) try: try: self.prepare_bundle() except QueryCanceledError: raise PolicyError( "Timeout reached while assembling the requested bundle" ) bundle = self.fileobj.getvalue() # TODO: use proper content streaming instead of put_bundle() self.backend.put_bundle(self.cache_type_key(), self.obj_id, bundle) except PolicyError as e: self.backend.set_status(self.obj_type, self.obj_id, "failed") self.backend.set_progress(self.obj_type, self.obj_id, str(e)) except Exception: self.backend.set_status(self.obj_type, self.obj_id, "failed") self.backend.set_progress( self.obj_type, self.obj_id, "Internal Server Error. This incident will be reported.", ) logging.exception("Bundle cooking failed.") else: self.backend.set_status(self.obj_type, self.obj_id, "done") self.backend.set_progress(self.obj_type, self.obj_id, None) finally: self.backend.send_notif(self.obj_type, self.obj_id) diff --git a/swh/vault/cookers/git_bare.py b/swh/vault/cookers/git_bare.py index 5c378bc..82f48d8 100644 --- a/swh/vault/cookers/git_bare.py +++ b/swh/vault/cookers/git_bare.py @@ -1,283 +1,289 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """ This cooker creates tarballs containing a bare .git directory, that can be unpacked and cloned like any git repository. It works in three steps: 1. Write objects one by one in :file:`.git/objects/` 2. Calls ``git repack`` to pack all these objects into git packfiles. 3. Creates a tarball of the resulting repository It keeps a set of all written (or about-to-be-written) object hashes in memory to avoid downloading and writing the same objects twice. """ import datetime import os.path import subprocess import tarfile import tempfile from typing import Any, Dict, Iterable, List, Set import zlib from swh.core.api.classes import stream_results from swh.graph.client import GraphArgumentException from swh.model import identifiers from swh.model.hashutil import hash_to_bytehex, hash_to_hex from swh.model.model import ( Person, Revision, RevisionType, Sha1Git, TimestampWithTimezone, ) from swh.storage.algos.revisions_walker import DFSRevisionsWalker from swh.vault.cookers.base import BaseVaultCooker REVISION_BATCH_SIZE = 10000 DIRECTORY_BATCH_SIZE = 10000 CONTENT_BATCH_SIZE = 100 class GitBareCooker(BaseVaultCooker): use_fsck = False def cache_type_key(self) -> str: return self.obj_type def check_exists(self): obj_type = self.obj_type.split("_")[0] if obj_type == "revision": return not list(self.storage.revision_missing([self.obj_id])) elif obj_type == "directory": return not list(self.storage.directory_missing([self.obj_id])) else: raise NotImplementedError(f"GitBareCooker for {obj_type}") def obj_swhid(self) -> identifiers.CoreSWHID: obj_type = self.obj_type.split("_")[0] return identifiers.CoreSWHID( object_type=identifiers.ObjectType[obj_type.upper()], object_id=self.obj_id, ) def _push(self, stack: List[Sha1Git], obj_ids: Iterable[Sha1Git]) -> None: assert not isinstance(obj_ids, bytes) revision_ids = [id_ for id_ in obj_ids if id_ not in self._seen] self._seen.update(revision_ids) stack.extend(revision_ids) def _pop(self, stack: List[Sha1Git], n: int) -> List[Sha1Git]: obj_ids = stack[-n:] stack[-n:] = [] return obj_ids def prepare_bundle(self): # Objects we will visit soon: self._rev_stack: List[Sha1Git] = [] self._dir_stack: List[Sha1Git] = [] self._cnt_stack: List[Sha1Git] = [] # Set of objects already in any of the stacks: self._seen: Set[Sha1Git] = set() with tempfile.TemporaryDirectory(prefix="swh-vault-gitbare-") as workdir: # Initialize a Git directory self.workdir = workdir self.gitdir = os.path.join(workdir, "clone.git") os.mkdir(self.gitdir) self.init_git() # Add the root object to the stack of objects to visit self.push_subgraph(self.obj_type.split("_")[0], self.obj_id) # Load and write all the objects to disk self.load_objects() # Write the root object as a ref. # This must be done before repacking; git-repack ignores orphan objects. self.write_refs() self.repack() self.write_archive() def init_git(self) -> None: subprocess.run(["git", "-C", self.gitdir, "init", "--bare"], check=True) # Create all possible dirs ahead of time, so we don't have to check for # existence every time. for byte in range(256): os.mkdir(os.path.join(self.gitdir, "objects", f"{byte:02x}")) def repack(self) -> None: if self.use_fsck: subprocess.run(["git", "-C", self.gitdir, "fsck"], check=True) # Add objects we wrote in a pack subprocess.run(["git", "-C", self.gitdir, "repack"], check=True) # Remove their non-packed originals subprocess.run(["git", "-C", self.gitdir, "prune-packed"], check=True) def write_refs(self): obj_type = self.obj_type.split("_")[0] if obj_type == "directory": # We need a synthetic revision pointing to the directory author = Person.from_fullname( b"swh-vault, git-bare cooker " ) dt = datetime.datetime.now(tz=datetime.timezone.utc) dt = dt.replace(microsecond=0) # not supported by git date = TimestampWithTimezone.from_datetime(dt) revision = Revision( author=author, committer=author, date=date, committer_date=date, message=b"Initial commit", type=RevisionType.GIT, directory=self.obj_id, synthetic=True, ) self.write_revision_node(revision.to_dict()) head = revision.id elif obj_type == "revision": head = self.obj_id else: assert False, obj_type with open(os.path.join(self.gitdir, "refs", "heads", "master"), "wb") as fd: fd.write(hash_to_bytehex(head)) def write_archive(self): with tarfile.TarFile(mode="w", fileobj=self.fileobj) as tf: tf.add(self.gitdir, arcname=f"{self.obj_swhid()}.git", recursive=True) def _obj_path(self, obj_id: Sha1Git): obj_id_hex = hash_to_hex(obj_id) directory = obj_id_hex[0:2] filename = obj_id_hex[2:] return os.path.join(self.gitdir, "objects", directory, filename) def object_exists(self, obj_id: Sha1Git) -> bool: return os.path.exists(self._obj_path(obj_id)) def write_object(self, obj_id: Sha1Git, obj: bytes) -> bool: """Writes a git object on disk. Returns whether it was already written.""" # Git requires objects to be zlib-compressed; but repacking decompresses and # removes them, so we don't need to compress them too much. data = zlib.compress(obj, level=1) with open(self._obj_path(obj_id), "wb") as fd: fd.write(data) return True def push_subgraph(self, obj_type, obj_id) -> None: if obj_type == "revision": self.push_revision_subgraph(obj_id) elif obj_type == "directory": self._push(self._dir_stack, [obj_id]) else: raise NotImplementedError( f"GitBareCooker.queue_subgraph({obj_type!r}, ...)" ) def load_objects(self) -> None: while self._rev_stack or self._dir_stack or self._cnt_stack: revision_ids = self._pop(self._rev_stack, REVISION_BATCH_SIZE) self.load_revisions(revision_ids) directory_ids = self._pop(self._dir_stack, DIRECTORY_BATCH_SIZE) self.load_directories(directory_ids) content_ids = self._pop(self._cnt_stack, CONTENT_BATCH_SIZE) self.load_contents(content_ids) def push_revision_subgraph(self, obj_id: Sha1Git) -> None: """Fetches a revision and all its children, and writes them to disk""" loaded_from_graph = False if self.graph: # First, try to cook using swh-graph, as it is more efficient than # swh-storage for querying the history obj_swhid = identifiers.CoreSWHID( object_type=identifiers.ObjectType.REVISION, object_id=obj_id, ) try: revision_ids = ( swhid.object_id for swhid in map( identifiers.CoreSWHID.from_string, self.graph.visit_nodes(str(obj_swhid), edges="rev:rev"), ) ) self._push(self._rev_stack, revision_ids) except GraphArgumentException: # Revision not found in the graph pass else: loaded_from_graph = True if not loaded_from_graph: # If swh-graph is not available, or the revision is not yet in # swh-graph, fall back to self.storage.revision_log. # self.storage.revision_log also gives us the full revisions, # so we load them right now instead of just pushing them on the stack. walker = DFSRevisionsWalker(self.storage, obj_id) for revision in walker: self.write_revision_node(revision) self._push(self._dir_stack, [revision["directory"]]) def load_revisions(self, obj_ids: List[Sha1Git]) -> None: """Given a list of revision ids, loads these revisions and their directories; but not their parent revisions.""" revisions = self.storage.revision_get(obj_ids) for revision in revisions: self.write_revision_node(revision.to_dict()) self._push(self._dir_stack, (rev.directory for rev in revisions)) def write_revision_node(self, revision: Dict[str, Any]) -> bool: """Writes a revision object to disk""" git_object = identifiers.revision_git_object(revision) return self.write_object(revision["id"], git_object) def load_directories(self, obj_ids: List[Sha1Git]) -> None: for obj_id in obj_ids: self.load_directory(obj_id) def load_directory(self, obj_id: Sha1Git) -> None: # Load the directory entries = [ entry.to_dict() for entry in stream_results(self.storage.directory_get_entries, obj_id) ] directory = {"id": obj_id, "entries": entries} git_object = identifiers.directory_git_object(directory) self.write_object(obj_id, git_object) # Add children to the stack entry_loaders: Dict[str, List[Sha1Git]] = { "file": self._cnt_stack, "dir": self._dir_stack, "rev": self._rev_stack, } for entry in directory["entries"]: stack = entry_loaders[entry["type"]] self._push(stack, [entry["target"]]) def load_contents(self, obj_ids: List[Sha1Git]) -> None: # TODO: add support of filtered objects, somehow? # It's tricky, because, by definition, we can't write a git object with # the expected hash, so git-fsck *will* choke on it. contents = self.storage.content_get(obj_ids, "sha1_git") - for (obj_id, content) in zip(obj_ids, contents): - assert obj_id == content.sha1_git # just to be sure - content = self.storage.content_get_data(content.sha1) - self.write_object( - obj_id, f"blob {len(content)}\0".encode("ascii") + content - ) + + if self.objstorage is None: + for content in contents: + data = self.storage.content_get_data(content.sha1) + self.write_content(content.sha1_git, data) + else: + content_data = self.objstorage.get_batch(c.sha1 for c in contents) + for (content, data) in zip(contents, content_data): + self.write_content(content.sha1_git, data) + + def write_content(self, obj_id: Sha1Git, content: bytes) -> None: + self.write_object(obj_id, f"blob {len(content)}\0".encode("ascii") + content) diff --git a/swh/vault/tests/test_cli.py b/swh/vault/tests/test_cli.py index 4c35ff6..7a375a2 100644 --- a/swh/vault/tests/test_cli.py +++ b/swh/vault/tests/test_cli.py @@ -1,103 +1,104 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tempfile from unittest.mock import MagicMock import click import click.testing import pytest from swh.vault.cli import vault as vault_cli_group from swh.vault.cookers.base import BaseVaultCooker from swh.vault.in_memory_backend import InMemoryVaultBackend def test_cook_unsupported_swhid(): runner = click.testing.CliRunner() result = runner.invoke(vault_cli_group, ["cook", "swh:1:dir:f00b4r", "-"]) assert isinstance(result.exception, SystemExit) assert "expected core SWHID" in result.stdout result = runner.invoke(vault_cli_group, ["cook", "swh:1:ori:" + "0" * 40, "-"]) assert isinstance(result.exception, SystemExit) assert "expected core SWHID" in result.stdout result = runner.invoke(vault_cli_group, ["cook", "swh:1:cnt:" + "0" * 40, "-"]) assert isinstance(result.exception, SystemExit) assert "No cooker available for CONTENT" in result.stdout def test_cook_unknown_cooker(): runner = click.testing.CliRunner() result = runner.invoke( vault_cli_group, ["cook", "swh:1:dir:" + "0" * 40, "-", "--cooker-type", "gitfast"], ) assert isinstance(result.exception, SystemExit) assert "do not have a gitfast cooker" in result.stdout result = runner.invoke(vault_cli_group, ["cook", "swh:1:rev:" + "0" * 40, "-"]) assert isinstance(result.exception, SystemExit) assert "explicit --cooker-type" in result.stdout @pytest.mark.parametrize( "obj_type,cooker_name_suffix,swhid_type", [("directory", "", "dir"), ("revision", "gitfast", "rev"),], ) def test_cook_directory(obj_type, cooker_name_suffix, swhid_type, mocker): storage = object() mocker.patch("swh.storage.get_storage", return_value=storage) backend = MagicMock(spec=InMemoryVaultBackend) backend.fetch.return_value = b"bundle content" mocker.patch( "swh.vault.in_memory_backend.InMemoryVaultBackend", return_value=backend ) cooker = MagicMock(spec=BaseVaultCooker) cooker_cls = MagicMock(return_value=cooker) mocker.patch("swh.vault.cookers.get_cooker_cls", return_value=cooker_cls) runner = click.testing.CliRunner() with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: config_fd.write('{"storage": {}}') config_fd.seek(0) if cooker_name_suffix: result = runner.invoke( vault_cli_group, [ "cook", f"swh:1:{swhid_type}:{'0'*40}", "-", "-C", config_fd.name, "--cooker-type", cooker_name_suffix, ], ) else: result = runner.invoke( vault_cli_group, ["cook", f"swh:1:{swhid_type}:{'0'*40}", "-", "-C", config_fd.name], ) if result.exception is not None: raise result.exception cooker_cls.assert_called_once_with( - f"{obj_type}_{cooker_name_suffix}" if cooker_name_suffix else obj_type, - b"\x00" * 20, - backend, - storage, - None, + obj_type=f"{obj_type}_{cooker_name_suffix}" if cooker_name_suffix else obj_type, + obj_id=b"\x00" * 20, + backend=backend, + storage=storage, + graph=None, + objstorage=None, ) cooker.cook.assert_called_once_with() assert result.stdout_bytes == b"bundle content" diff --git a/swh/vault/tests/test_cookers.py b/swh/vault/tests/test_cookers.py index 6f1f5a1..3a41699 100644 --- a/swh/vault/tests/test_cookers.py +++ b/swh/vault/tests/test_cookers.py @@ -1,694 +1,753 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import contextlib import datetime import glob import gzip import io import os import pathlib import shutil import subprocess import tarfile import tempfile import unittest import unittest.mock import dulwich.fastexport import dulwich.index import dulwich.objects import dulwich.porcelain import dulwich.repo import pytest from swh.loader.git.from_disk import GitLoaderFromDisk from swh.model import from_disk, hashutil from swh.model.model import ( Directory, DirectoryEntry, Person, Revision, RevisionType, TimestampWithTimezone, ) from swh.vault.cookers import DirectoryCooker, GitBareCooker, RevisionGitfastCooker from swh.vault.tests.vault_testing import hash_content from swh.vault.to_disk import HIDDEN_MESSAGE, SKIPPED_MESSAGE class TestRepo: """A tiny context manager for a test git repository, with some utility functions to perform basic git stuff. """ def __enter__(self): self.tmp_dir = tempfile.TemporaryDirectory(prefix="tmp-vault-repo-") self.repo_dir = self.tmp_dir.__enter__() self.repo = dulwich.repo.Repo.init(self.repo_dir) self.author_name = b"Test Author" self.author_email = b"test@softwareheritage.org" self.author = b"%s <%s>" % (self.author_name, self.author_email) self.base_date = 258244200 self.counter = 0 return pathlib.Path(self.repo_dir) def __exit__(self, exc, value, tb): self.tmp_dir.__exit__(exc, value, tb) def checkout(self, rev_sha): rev = self.repo[rev_sha] dulwich.index.build_index_from_tree( self.repo_dir, self.repo.index_path(), self.repo.object_store, rev.tree ) def git_shell(self, *cmd, stdout=subprocess.DEVNULL, **kwargs): name = self.author_name email = self.author_email date = "%d +0000" % (self.base_date + self.counter) env = { # Set git commit format "GIT_AUTHOR_NAME": name, "GIT_AUTHOR_EMAIL": email, "GIT_AUTHOR_DATE": date, "GIT_COMMITTER_NAME": name, "GIT_COMMITTER_EMAIL": email, "GIT_COMMITTER_DATE": date, # Ignore all the system-wide and user configurations "GIT_CONFIG_NOSYSTEM": "1", "HOME": str(self.tmp_dir), "XDG_CONFIG_HOME": str(self.tmp_dir), } kwargs.setdefault("env", {}).update(env) subprocess.check_call( ("git", "-C", self.repo_dir) + cmd, stdout=stdout, **kwargs ) def commit(self, message="Commit test\n", ref=b"HEAD"): """Commit the current working tree in a new commit with message on the branch 'ref'. At the end of the commit, the reference should stay the same and the index should be clean. """ paths = [ os.path.relpath(path, self.repo_dir) for path in glob.glob(self.repo_dir + "/**/*", recursive=True) ] self.repo.stage(paths) message = message.encode() + b"\n" ret = self.repo.do_commit( message=message, committer=self.author, commit_timestamp=self.base_date + self.counter, commit_timezone=0, ref=ref, ) self.counter += 1 # committing on another branch leaves # dangling files in index if ref != b"HEAD": # XXX this should work (but does not) # dulwich.porcelain.reset(self.repo, 'hard') self.git_shell("reset", "--hard", "HEAD") return ret def merge(self, parent_sha_list, message="Merge branches."): self.git_shell( "merge", "--allow-unrelated-histories", "-m", message, *[p.decode() for p in parent_sha_list], ) self.counter += 1 return self.repo.refs[b"HEAD"] def print_debug_graph(self, reflog=False): args = ["log", "--all", "--graph", "--decorate"] if reflog: args.append("--reflog") self.git_shell(*args, stdout=None) @pytest.fixture def git_loader(swh_storage,): """Instantiate a Git Loader using the storage instance as storage. """ def _create_loader(directory): return GitLoaderFromDisk( swh_storage, "fake_origin", directory=directory, visit_date=datetime.datetime.now(datetime.timezone.utc), ) return _create_loader @contextlib.contextmanager def cook_extract_directory_dircooker(storage, obj_id, fsck=True): """Context manager that cooks a directory and extract it.""" backend = unittest.mock.MagicMock() backend.storage = storage cooker = DirectoryCooker("directory", obj_id, backend=backend, storage=storage) cooker.fileobj = io.BytesIO() assert cooker.check_exists() cooker.prepare_bundle() cooker.fileobj.seek(0) with tempfile.TemporaryDirectory(prefix="tmp-vault-extract-") as td: with tarfile.open(fileobj=cooker.fileobj, mode="r") as tar: tar.extractall(td) yield pathlib.Path(td) / hashutil.hash_to_hex(obj_id) cooker.storage = None @contextlib.contextmanager def cook_extract_directory_gitfast(storage, obj_id, fsck=True): """Context manager that cooks a revision containing a directory and extract it, using RevisionGitfastCooker""" test_repo = TestRepo() with test_repo as p: date = TimestampWithTimezone.from_datetime( datetime.datetime.now(datetime.timezone.utc) ) revision = Revision( directory=obj_id, message=b"dummy message", author=Person.from_fullname(b"someone"), committer=Person.from_fullname(b"someone"), date=date, committer_date=date, type=RevisionType.GIT, synthetic=False, ) storage.revision_add([revision]) with cook_stream_revision_gitfast(storage, revision.id) as stream, test_repo as p: processor = dulwich.fastexport.GitImportProcessor(test_repo.repo) processor.import_stream(stream) test_repo.checkout(b"HEAD") shutil.rmtree(p / ".git") yield p @contextlib.contextmanager -def cook_extract_directory_git_bare(storage, obj_id, fsck=True): +def cook_extract_directory_git_bare( + storage, obj_id, fsck=True, direct_objstorage=False +): """Context manager that cooks a revision and extract it, using GitBareCooker""" backend = unittest.mock.MagicMock() backend.storage = storage # Cook the object - cooker = GitBareCooker("directory", obj_id, backend=backend, storage=storage) + cooker = GitBareCooker( + "directory", + obj_id, + backend=backend, + storage=storage, + objstorage=storage.objstorage if direct_objstorage else None, + ) cooker.use_fsck = fsck # Some tests try edge-cases that git-fsck rejects cooker.fileobj = io.BytesIO() assert cooker.check_exists() cooker.prepare_bundle() cooker.fileobj.seek(0) # Extract it with tempfile.TemporaryDirectory(prefix="tmp-vault-extract-") as td: with tarfile.open(fileobj=cooker.fileobj, mode="r") as tar: tar.extractall(td) # Clone it with Dulwich test_repo = TestRepo() with test_repo as p: test_repo.git_shell( "pull", os.path.join(td, f"swh:1:dir:{obj_id.hex()}.git") ) shutil.rmtree(p / ".git") yield p @pytest.fixture( scope="module", params=[ cook_extract_directory_dircooker, cook_extract_directory_gitfast, cook_extract_directory_git_bare, ], ) def cook_extract_directory(request): """A fixture that is instantiated as either cook_extract_directory_dircooker or cook_extract_directory_git_bare.""" return request.param @contextlib.contextmanager def cook_stream_revision_gitfast(storage, obj_id): """Context manager that cooks a revision and stream its fastexport.""" backend = unittest.mock.MagicMock() backend.storage = storage cooker = RevisionGitfastCooker( "revision_gitfast", obj_id, backend=backend, storage=storage ) cooker.fileobj = io.BytesIO() assert cooker.check_exists() cooker.prepare_bundle() cooker.fileobj.seek(0) fastexport_stream = gzip.GzipFile(fileobj=cooker.fileobj) yield fastexport_stream cooker.storage = None @contextlib.contextmanager def cook_extract_revision_gitfast(storage, obj_id, fsck=True): """Context manager that cooks a revision and extract it, using RevisionGitfastCooker""" test_repo = TestRepo() with cook_stream_revision_gitfast(storage, obj_id) as stream, test_repo as p: processor = dulwich.fastexport.GitImportProcessor(test_repo.repo) processor.import_stream(stream) yield test_repo, p @contextlib.contextmanager def cook_extract_revision_git_bare(storage, obj_id, fsck=True): """Context manager that cooks a revision and extract it, using GitBareCooker""" backend = unittest.mock.MagicMock() backend.storage = storage # Cook the object cooker = GitBareCooker("revision", obj_id, backend=backend, storage=storage) cooker.use_fsck = fsck # Some tests try edge-cases that git-fsck rejects cooker.fileobj = io.BytesIO() assert cooker.check_exists() cooker.prepare_bundle() cooker.fileobj.seek(0) # Extract it with tempfile.TemporaryDirectory(prefix="tmp-vault-extract-") as td: with tarfile.open(fileobj=cooker.fileobj, mode="r") as tar: tar.extractall(td) # Clone it with Dulwich test_repo = TestRepo() with test_repo as p: test_repo.git_shell( "pull", os.path.join(td, f"swh:1:rev:{obj_id.hex()}.git") ) yield test_repo, p @pytest.fixture( scope="module", params=[cook_extract_revision_gitfast, cook_extract_revision_git_bare], ) def cook_extract_revision(request): """A fixture that is instantiated as either cook_extract_revision_gitfast or cook_extract_revision_git_bare.""" return request.param TEST_CONTENT = ( " test content\n" "and unicode \N{BLACK HEART SUIT}\n" " and trailing spaces " ) TEST_EXECUTABLE = b"\x42\x40\x00\x00\x05" class TestDirectoryCooker: def test_directory_simple(self, git_loader, cook_extract_directory): repo = TestRepo() with repo as rp: (rp / "file").write_text(TEST_CONTENT) (rp / "executable").write_bytes(TEST_EXECUTABLE) (rp / "executable").chmod(0o755) (rp / "link").symlink_to("file") (rp / "dir1/dir2").mkdir(parents=True) (rp / "dir1/dir2/file").write_text(TEST_CONTENT) c = repo.commit() loader = git_loader(str(rp)) loader.load() obj_id_hex = repo.repo[c].tree.decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) with cook_extract_directory(loader.storage, obj_id) as p: assert (p / "file").stat().st_mode == 0o100644 assert (p / "file").read_text() == TEST_CONTENT assert (p / "executable").stat().st_mode == 0o100755 assert (p / "executable").read_bytes() == TEST_EXECUTABLE - assert (p / "link").is_symlink + assert (p / "link").is_symlink() assert os.readlink(str(p / "link")) == "file" assert (p / "dir1/dir2/file").stat().st_mode == 0o100644 assert (p / "dir1/dir2/file").read_text() == TEST_CONTENT directory = from_disk.Directory.from_disk(path=bytes(p)) assert obj_id_hex == hashutil.hash_to_hex(directory.hash) def test_directory_filtered_objects(self, git_loader, cook_extract_directory): if cook_extract_directory is cook_extract_directory_git_bare: pytest.xfail("GitBareCooker does not support filtered objects (yet?)") repo = TestRepo() with repo as rp: file_1, id_1 = hash_content(b"test1") file_2, id_2 = hash_content(b"test2") file_3, id_3 = hash_content(b"test3") (rp / "file").write_bytes(file_1) (rp / "hidden_file").write_bytes(file_2) (rp / "absent_file").write_bytes(file_3) c = repo.commit() loader = git_loader(str(rp)) loader.load() obj_id_hex = repo.repo[c].tree.decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) # FIXME: storage.content_update() should be changed to allow things # like that with loader.storage.get_db().transaction() as cur: cur.execute( """update content set status = 'visible' where sha1 = %s""", (id_1,), ) cur.execute( """update content set status = 'hidden' where sha1 = %s""", (id_2,), ) cur.execute( """update content set status = 'absent' where sha1 = %s""", (id_3,), ) with cook_extract_directory(loader.storage, obj_id) as p: assert (p / "file").read_bytes() == b"test1" assert (p / "hidden_file").read_bytes() == HIDDEN_MESSAGE assert (p / "absent_file").read_bytes() == SKIPPED_MESSAGE def test_directory_bogus_perms(self, git_loader, cook_extract_directory): # Some early git repositories have 664/775 permissions... let's check # if all the weird modes are properly normalized in the directory # cooker. repo = TestRepo() with repo as rp: (rp / "file").write_text(TEST_CONTENT) (rp / "file").chmod(0o664) (rp / "executable").write_bytes(TEST_EXECUTABLE) (rp / "executable").chmod(0o775) (rp / "wat").write_text(TEST_CONTENT) (rp / "wat").chmod(0o604) # Disable mode cleanup with unittest.mock.patch("dulwich.index.cleanup_mode", lambda mode: mode): c = repo.commit() # Make sure Dulwich didn't normalize the permissions itself. # (if it did, then the test can't check the cooker normalized them) tree_id = repo.repo[c].tree assert {entry.mode for entry in repo.repo[tree_id].items()} == { 0o100775, 0o100664, 0o100604, } # Disable mode checks with unittest.mock.patch("dulwich.objects.Tree.check", lambda self: None): loader = git_loader(str(rp)) loader.load() # Make sure swh-loader didn't normalize them either dir_entries = loader.storage.directory_ls(hashutil.bytehex_to_hash(tree_id)) assert {entry["perms"] for entry in dir_entries} == { 0o100664, 0o100775, 0o100604, } obj_id_hex = repo.repo[c].tree.decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) with cook_extract_directory(loader.storage, obj_id) as p: assert (p / "file").stat().st_mode == 0o100644 assert (p / "executable").stat().st_mode == 0o100755 assert (p / "wat").stat().st_mode == 0o100644 + @pytest.mark.parametrize("direct_objstorage", [True, False]) + def test_directory_objstorage( + self, swh_storage, git_loader, mocker, direct_objstorage + ): + """Like test_directory_simple, but using swh_objstorage directly, without + going through swh_storage.content_get_data()""" + repo = TestRepo() + with repo as rp: + (rp / "file").write_text(TEST_CONTENT) + (rp / "executable").write_bytes(TEST_EXECUTABLE) + (rp / "executable").chmod(0o755) + (rp / "link").symlink_to("file") + (rp / "dir1/dir2").mkdir(parents=True) + (rp / "dir1/dir2/file").write_text(TEST_CONTENT) + c = repo.commit() + loader = git_loader(str(rp)) + loader.load() + + obj_id_hex = repo.repo[c].tree.decode() + obj_id = hashutil.hash_to_bytes(obj_id_hex) + + # Set-up spies + storage_content_get_data = mocker.patch.object( + swh_storage, "content_get_data", wraps=swh_storage.content_get_data + ) + objstorage_content_batch = mocker.patch.object( + swh_storage.objstorage, "get_batch", wraps=swh_storage.objstorage.get_batch + ) + + with cook_extract_directory_git_bare( + loader.storage, obj_id, direct_objstorage=direct_objstorage + ) as p: + assert (p / "file").stat().st_mode == 0o100644 + assert (p / "file").read_text() == TEST_CONTENT + assert (p / "executable").stat().st_mode == 0o100755 + assert (p / "executable").read_bytes() == TEST_EXECUTABLE + assert (p / "link").is_symlink() + assert os.readlink(str(p / "link")) == "file" + assert (p / "dir1/dir2/file").stat().st_mode == 0o100644 + assert (p / "dir1/dir2/file").read_text() == TEST_CONTENT + + directory = from_disk.Directory.from_disk(path=bytes(p)) + assert obj_id_hex == hashutil.hash_to_hex(directory.hash) + + if direct_objstorage: + storage_content_get_data.assert_not_called() + objstorage_content_batch.assert_called() + else: + storage_content_get_data.assert_called() + objstorage_content_batch.assert_not_called() + def test_directory_revision_data(self, swh_storage): target_rev = "0e8a3ad980ec179856012b7eecf4327e99cd44cd" dir = Directory( entries=( DirectoryEntry( name=b"submodule", type="rev", target=hashutil.hash_to_bytes(target_rev), perms=0o100644, ), ), ) swh_storage.directory_add([dir]) with cook_extract_directory_dircooker(swh_storage, dir.id, fsck=False) as p: assert (p / "submodule").is_symlink() assert os.readlink(str(p / "submodule")) == target_rev class TestRevisionCooker: def test_revision_simple(self, git_loader, cook_extract_revision): # # 1--2--3--4--5--6--7 # repo = TestRepo() with repo as rp: (rp / "file1").write_text(TEST_CONTENT) repo.commit("add file1") (rp / "file2").write_text(TEST_CONTENT) repo.commit("add file2") (rp / "dir1/dir2").mkdir(parents=True) (rp / "dir1/dir2/file").write_text(TEST_CONTENT) repo.commit("add dir1/dir2/file") (rp / "bin1").write_bytes(TEST_EXECUTABLE) (rp / "bin1").chmod(0o755) repo.commit("add bin1") (rp / "link1").symlink_to("file1") repo.commit("link link1 to file1") (rp / "file2").unlink() repo.commit("remove file2") (rp / "bin1").rename(rp / "bin") repo.commit("rename bin1 to bin") loader = git_loader(str(rp)) loader.load() obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) with cook_extract_revision(loader.storage, obj_id) as (ert, p): ert.checkout(b"HEAD") assert (p / "file1").stat().st_mode == 0o100644 assert (p / "file1").read_text() == TEST_CONTENT - assert (p / "link1").is_symlink + assert (p / "link1").is_symlink() assert os.readlink(str(p / "link1")) == "file1" assert (p / "bin").stat().st_mode == 0o100755 assert (p / "bin").read_bytes() == TEST_EXECUTABLE assert (p / "dir1/dir2/file").read_text() == TEST_CONTENT assert (p / "dir1/dir2/file").stat().st_mode == 0o100644 assert ert.repo.refs[b"HEAD"].decode() == obj_id_hex def test_revision_two_roots(self, git_loader, cook_extract_revision): # # 1----3---4 # / # 2---- # repo = TestRepo() with repo as rp: (rp / "file1").write_text(TEST_CONTENT) c1 = repo.commit("Add file1") del repo.repo.refs[b"refs/heads/master"] # git update-ref -d HEAD (rp / "file2").write_text(TEST_CONTENT) repo.commit("Add file2") repo.merge([c1]) (rp / "file3").write_text(TEST_CONTENT) repo.commit("add file3") obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) loader = git_loader(str(rp)) loader.load() with cook_extract_revision(loader.storage, obj_id) as (ert, p): assert ert.repo.refs[b"HEAD"].decode() == obj_id_hex def test_revision_two_double_fork_merge(self, git_loader, cook_extract_revision): # # 2---4---6 # / / / # 1---3---5 # repo = TestRepo() with repo as rp: (rp / "file1").write_text(TEST_CONTENT) c1 = repo.commit("Add file1") repo.repo.refs[b"refs/heads/c1"] = c1 (rp / "file2").write_text(TEST_CONTENT) repo.commit("Add file2") (rp / "file3").write_text(TEST_CONTENT) c3 = repo.commit("Add file3", ref=b"refs/heads/c1") repo.repo.refs[b"refs/heads/c3"] = c3 repo.merge([c3]) (rp / "file5").write_text(TEST_CONTENT) c5 = repo.commit("Add file3", ref=b"refs/heads/c3") repo.merge([c5]) obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) loader = git_loader(str(rp)) loader.load() with cook_extract_revision(loader.storage, obj_id) as (ert, p): assert ert.repo.refs[b"HEAD"].decode() == obj_id_hex def test_revision_triple_merge(self, git_loader, cook_extract_revision): # # .---.---5 # / / / # 2 3 4 # / / / # 1---.---. # repo = TestRepo() with repo as rp: (rp / "file1").write_text(TEST_CONTENT) c1 = repo.commit("Commit 1") repo.repo.refs[b"refs/heads/b1"] = c1 repo.repo.refs[b"refs/heads/b2"] = c1 repo.commit("Commit 2") c3 = repo.commit("Commit 3", ref=b"refs/heads/b1") c4 = repo.commit("Commit 4", ref=b"refs/heads/b2") repo.merge([c3, c4]) obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) loader = git_loader(str(rp)) loader.load() with cook_extract_revision(loader.storage, obj_id) as (ert, p): assert ert.repo.refs[b"HEAD"].decode() == obj_id_hex def test_revision_filtered_objects(self, git_loader, cook_extract_revision): if cook_extract_revision is cook_extract_revision_git_bare: pytest.xfail("GitBareCooker does not support filtered objects (yet?)") repo = TestRepo() with repo as rp: file_1, id_1 = hash_content(b"test1") file_2, id_2 = hash_content(b"test2") file_3, id_3 = hash_content(b"test3") (rp / "file").write_bytes(file_1) (rp / "hidden_file").write_bytes(file_2) (rp / "absent_file").write_bytes(file_3) repo.commit() obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) loader = git_loader(str(rp)) loader.load() # FIXME: storage.content_update() should be changed to allow things # like that with loader.storage.get_db().transaction() as cur: cur.execute( """update content set status = 'visible' where sha1 = %s""", (id_1,), ) cur.execute( """update content set status = 'hidden' where sha1 = %s""", (id_2,), ) cur.execute( """update content set status = 'absent' where sha1 = %s""", (id_3,), ) with cook_extract_revision(loader.storage, obj_id) as (ert, p): ert.checkout(b"HEAD") assert (p / "file").read_bytes() == b"test1" assert (p / "hidden_file").read_bytes() == HIDDEN_MESSAGE assert (p / "absent_file").read_bytes() == SKIPPED_MESSAGE def test_revision_null_fields(self, git_loader, cook_extract_revision): # Our schema doesn't enforce a lot of non-null revision fields. We need # to check these cases don't break the cooker. repo = TestRepo() with repo as rp: (rp / "file").write_text(TEST_CONTENT) c = repo.commit("initial commit") loader = git_loader(str(rp)) loader.load() repo.repo.refs[b"HEAD"].decode() dir_id_hex = repo.repo[c].tree.decode() dir_id = hashutil.hash_to_bytes(dir_id_hex) test_revision = Revision( message=b"", author=Person(name=None, email=None, fullname=b""), date=None, committer=Person(name=None, email=None, fullname=b""), committer_date=None, parents=(), type=RevisionType.GIT, directory=dir_id, metadata={}, synthetic=True, ) storage = loader.storage storage.revision_add([test_revision]) with cook_extract_revision(storage, test_revision.id, fsck=False) as (ert, p): ert.checkout(b"HEAD") assert (p / "file").stat().st_mode == 0o100644 def test_revision_revision_data(self, swh_storage): target_rev = "0e8a3ad980ec179856012b7eecf4327e99cd44cd" dir = Directory( entries=( DirectoryEntry( name=b"submodule", type="rev", target=hashutil.hash_to_bytes(target_rev), perms=0o100644, ), ), ) swh_storage.directory_add([dir]) rev = Revision( message=b"", author=Person(name=None, email=None, fullname=b""), date=None, committer=Person(name=None, email=None, fullname=b""), committer_date=None, parents=(), type=RevisionType.GIT, directory=dir.id, metadata={}, synthetic=True, ) swh_storage.revision_add([rev]) with cook_stream_revision_gitfast(swh_storage, rev.id) as stream: pattern = "M 160000 {} submodule".format(target_rev).encode() assert pattern in stream.read()