diff --git a/swh/vault/cookers/git_bare.py b/swh/vault/cookers/git_bare.py index ff88d36..147d589 100644 --- a/swh/vault/cookers/git_bare.py +++ b/swh/vault/cookers/git_bare.py @@ -1,680 +1,687 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """ This cooker creates tarballs containing a bare .git directory, that can be unpacked and cloned like any git repository. It works in three steps: 1. Write objects one by one in :file:`.git/objects/` 2. Calls ``git repack`` to pack all these objects into git packfiles. 3. Creates a tarball of the resulting repository It keeps a set of all written (or about-to-be-written) object hashes in memory to avoid downloading and writing the same objects twice. The first step is the most complex. When swh-graph is available, this roughly does the following: 1. Find all the revisions and releases in the induced subgraph, adds them to todo-lists 2. Grab a batch from (release/revision/directory/content) todo-lists, and load them. Add directory and content objects they reference to the todo-list 3. If any todo-list is not empty, goto 1 When swh-graph is not available, steps 1 and 2 are merged, because revisions need to be loaded in order to compute the subgraph. """ import datetime import enum import glob import logging import multiprocessing.dummy import os.path import re import subprocess import tarfile import tempfile from typing import Any, Dict, Iterable, Iterator, List, NoReturn, Optional, Set, Tuple import zlib from swh.core.api.classes import stream_results_optional from swh.model import git_objects from swh.model.hashutil import hash_to_bytehex, hash_to_hex from swh.model.model import ( Person, Release, Revision, RevisionType, Sha1Git, Snapshot, SnapshotBranch, TargetType, TimestampWithTimezone, ) from swh.model.model import Content, Directory, DirectoryEntry from swh.model.model import ObjectType as ModelObjectType from swh.model.swhids import CoreSWHID, ObjectType from swh.storage.algos.revisions_walker import DFSRevisionsWalker from swh.storage.algos.snapshot import snapshot_get_all_branches from swh.vault.cookers.base import BaseVaultCooker from swh.vault.to_disk import HIDDEN_MESSAGE, SKIPPED_MESSAGE RELEASE_BATCH_SIZE = 10000 REVISION_BATCH_SIZE = 10000 DIRECTORY_BATCH_SIZE = 10000 CONTENT_BATCH_SIZE = 100 logger = logging.getLogger(__name__) class RootObjectType(enum.Enum): DIRECTORY = "directory" REVISION = "revision" SNAPSHOT = "snapshot" def assert_never(value: NoReturn, msg) -> NoReturn: """mypy makes sure this function is never called, through exhaustive checking of ``value`` in the parent function. See https://mypy.readthedocs.io/en/latest/literal_types.html#exhaustive-checks for details. """ assert False, msg class GitBareCooker(BaseVaultCooker): BUNDLE_TYPE = "git_bare" SUPPORTED_OBJECT_TYPES = {ObjectType[obj_type.name] for obj_type in RootObjectType} use_fsck = True obj_type: RootObjectType def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.obj_type = RootObjectType[self.swhid.object_type.name] def check_exists(self) -> bool: """Returns whether the root object is present in the archive.""" if self.obj_type is RootObjectType.REVISION: return not list(self.storage.revision_missing([self.obj_id])) elif self.obj_type is RootObjectType.DIRECTORY: return not list(self.storage.directory_missing([self.obj_id])) elif self.obj_type is RootObjectType.SNAPSHOT: return not list(self.storage.snapshot_missing([self.obj_id])) else: assert_never(self.obj_type, f"Unexpected root object type: {self.obj_type}") def _push(self, stack: List[Sha1Git], obj_ids: Iterable[Sha1Git]) -> None: """Adds all the given ``obj_ids`` to the given ``stack``, unless they are already in ``self._seen``, and adds them to ``self._seen``.""" assert not isinstance(obj_ids, bytes) revision_ids = [id_ for id_ in obj_ids if id_ not in self._seen] self._seen.update(revision_ids) stack.extend(revision_ids) def _pop(self, stack: List[Sha1Git], n: int) -> List[Sha1Git]: """Removes ``n`` object from the ``stack`` and returns them.""" obj_ids = stack[-n:] stack[-n:] = [] return obj_ids def prepare_bundle(self): """Main entry point. Initializes the state, creates the bundle, and sends it to the backend.""" # Objects we will visit soon (aka. "todo-lists"): self._rel_stack: List[Sha1Git] = [] self._rev_stack: List[Sha1Git] = [] self._dir_stack: List[Sha1Git] = [] self._cnt_stack: List[Sha1Git] = [] # Set of objects already in any of the stacks: self._seen: Set[Sha1Git] = set() self._walker_state: Optional[Any] = None # Set of errors we expect git-fsck to raise at the end: self._expected_fsck_errors = set() with tempfile.TemporaryDirectory(prefix="swh-vault-gitbare-") as workdir: # Initialize a Git directory self.workdir = workdir self.gitdir = os.path.join(workdir, "clone.git") os.mkdir(self.gitdir) self.init_git() self.nb_loaded = 0 # Add the root object to the stack of objects to visit self.push_subgraph(self.obj_type, self.obj_id) # Load and write all the objects to disk self.load_objects() self.backend.set_progress( self.BUNDLE_TYPE, self.swhid, "Writing references..." ) # Write the root object as a ref (this step is skipped if it's a snapshot) # This must be done before repacking; git-repack ignores orphan objects. self.write_refs() self.backend.set_progress( self.BUNDLE_TYPE, self.swhid, "Checking content integrity" ) if self.use_fsck: self.git_fsck() self.backend.set_progress( self.BUNDLE_TYPE, self.swhid, "Creating final bundle" ) self.repack() self.write_archive() self.backend.set_progress(self.BUNDLE_TYPE, self.swhid, "Uploading bundle") def init_git(self) -> None: """Creates an empty :file:`.git` directory.""" subprocess.run(["git", "-C", self.gitdir, "init", "--bare"], check=True) self.create_object_dirs() # Remove example hooks; they take ~40KB and we don't use them for filename in glob.glob(os.path.join(self.gitdir, "hooks", "*.sample")): os.unlink(filename) def create_object_dirs(self) -> None: """Creates all possible subdirectories of :file:`.git/objects/`""" # Create all possible dirs ahead of time, so we don't have to check for # existence every time. for byte in range(256): try: os.mkdir(os.path.join(self.gitdir, "objects", f"{byte:02x}")) except FileExistsError: pass def repack(self) -> None: """Moves all objects from :file:`.git/objects/` to a packfile.""" try: subprocess.run(["git", "-C", self.gitdir, "repack", "-d"], check=True) except subprocess.CalledProcessError: logging.exception("git-repack failed with:") # Remove their non-packed originals subprocess.run(["git", "-C", self.gitdir, "prune-packed"], check=True) def git_fsck(self) -> None: """Runs git-fsck and ignores expected errors (eg. because of missing objects).""" proc = subprocess.run( ["git", "-C", self.gitdir, "fsck"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env={"LANG": "C.utf8"}, ) # Split on newlines not followed by a space errors = re.split("\n(?! )", proc.stdout.decode()) errors = [ error for error in errors if error and not error.startswith("warning ") ] unexpected_errors = set(errors) - self._expected_fsck_errors if unexpected_errors: logging.error( "Unexpected errors from git-fsck after cooking %s: %s", self.swhid, "\n".join(sorted(unexpected_errors)), ) def write_refs(self, snapshot=None): """Writes all files in :file:`.git/refs/`. For non-snapshot objects, this is only ``master``.""" refs: Dict[bytes, bytes] # ref name -> target if self.obj_type == RootObjectType.DIRECTORY: # We need a synthetic revision pointing to the directory author = Person.from_fullname( b"swh-vault, git-bare cooker " ) dt = datetime.datetime.now(tz=datetime.timezone.utc) dt = dt.replace(microsecond=0) # not supported by git date = TimestampWithTimezone.from_datetime(dt) revision = Revision( author=author, committer=author, date=date, committer_date=date, message=b"Initial commit", type=RevisionType.GIT, directory=self.obj_id, synthetic=True, ) self.write_revision_node(revision) refs = {b"refs/heads/master": hash_to_bytehex(revision.id)} elif self.obj_type == RootObjectType.REVISION: refs = {b"refs/heads/master": hash_to_bytehex(self.obj_id)} elif self.obj_type == RootObjectType.SNAPSHOT: if snapshot is None: # refs were already written in a previous step return branches = [] for (branch_name, branch) in snapshot.branches.items(): if branch is None: logging.error( "%s has dangling branch: %r", snapshot.swhid(), branch_name ) else: branches.append((branch_name, branch)) refs = { branch_name: ( b"ref: " + branch.target if branch.target_type == TargetType.ALIAS else hash_to_bytehex(branch.target) ) for (branch_name, branch) in branches } else: assert_never(self.obj_type, f"Unexpected root object type: {self.obj_type}") for (ref_name, ref_target) in refs.items(): path = os.path.join(self.gitdir.encode(), ref_name) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "wb") as fd: fd.write(ref_target) def write_archive(self): """Creates the final .tar file.""" with tarfile.TarFile(mode="w", fileobj=self.fileobj) as tf: tf.add(self.gitdir, arcname=f"{self.swhid}.git", recursive=True) def _obj_path(self, obj_id: Sha1Git): """Returns the absolute path of file (in :file:`.git/objects/`) that will contain the git object identified by the ``obj_id``.""" return os.path.join(self.gitdir, self._obj_relative_path(obj_id)) def _obj_relative_path(self, obj_id: Sha1Git): """Same as :meth:`_obj_path`, but relative.""" obj_id_hex = hash_to_hex(obj_id) directory = obj_id_hex[0:2] filename = obj_id_hex[2:] return os.path.join("objects", directory, filename) def object_exists(self, obj_id: Sha1Git) -> bool: """Returns whether the object identified by the given ``obj_id`` was already written to a file in :file:`.git/object/`. This function ignores objects contained in a git pack.""" return os.path.exists(self._obj_path(obj_id)) def write_object(self, obj_id: Sha1Git, obj: bytes) -> bool: """Writes a git object on disk. Returns whether it was already written.""" # Git requires objects to be zlib-compressed; but repacking decompresses and # removes them, so we don't need to compress them too much. data = zlib.compress(obj, level=1) with open(self._obj_path(obj_id), "wb") as fd: fd.write(data) return True def push_subgraph(self, obj_type: RootObjectType, obj_id) -> None: """Adds graph induced by the given ``obj_id`` without recursing through directories, to the todo-lists. If swh-graph is not available, this immediately loads revisions, as they need to be fetched in order to compute the subgraph, and fetching them immediately avoids duplicate fetches.""" if self.obj_type is RootObjectType.REVISION: self.push_revision_subgraph(obj_id) elif self.obj_type is RootObjectType.DIRECTORY: self._push(self._dir_stack, [obj_id]) elif self.obj_type is RootObjectType.SNAPSHOT: self.push_snapshot_subgraph(obj_id) else: assert_never(self.obj_type, f"Unexpected root object type: {self.obj_type}") def load_objects(self) -> None: """Repeatedly loads objects in the todo-lists, until all lists are empty.""" while self._rel_stack or self._rev_stack or self._dir_stack or self._cnt_stack: nb_remaining = ( len(self._rel_stack) + len(self._rev_stack) + len(self._dir_stack) + len(self._cnt_stack) ) # We assume assume nb_remaining is a lower bound. # When the snapshot was loaded with swh-graph, this should be the exact # value, though. self.backend.set_progress( self.BUNDLE_TYPE, self.swhid, f"Processing... {self.nb_loaded} objects processed\n" f"Over {nb_remaining} remaining", ) release_ids = self._pop(self._rel_stack, RELEASE_BATCH_SIZE) if release_ids: self.load_releases(release_ids) self.nb_loaded += len(release_ids) revision_ids = self._pop(self._rev_stack, REVISION_BATCH_SIZE) if revision_ids: self.load_revisions(revision_ids) self.nb_loaded += len(revision_ids) directory_ids = self._pop(self._dir_stack, DIRECTORY_BATCH_SIZE) if directory_ids: self.load_directories(directory_ids) self.nb_loaded += len(directory_ids) content_ids = self._pop(self._cnt_stack, CONTENT_BATCH_SIZE) if content_ids: self.load_contents(content_ids) self.nb_loaded += len(content_ids) def push_revision_subgraph(self, obj_id: Sha1Git) -> None: """Fetches the graph of revisions induced by the given ``obj_id`` and adds them to ``self._rev_stack``. If swh-graph is not available, this requires fetching the revisions themselves, so they are directly loaded instead.""" loaded_from_graph = False if self.graph: from swh.graph.client import GraphArgumentException # First, try to cook using swh-graph, as it is more efficient than # swh-storage for querying the history obj_swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=obj_id,) try: revision_ids = ( swhid.object_id for swhid in map( CoreSWHID.from_string, self.graph.visit_nodes(str(obj_swhid), edges="rev:rev"), ) ) self._push(self._rev_stack, revision_ids) except GraphArgumentException as e: logger.info( "Revision %s not found in swh-graph, falling back to fetching " "history using swh-storage. %s", hash_to_hex(obj_id), e.args[0], ) else: loaded_from_graph = True if not loaded_from_graph: # If swh-graph is not available, or the revision is not yet in # swh-graph, fall back to self.storage.revision_log. # self.storage.revision_log also gives us the full revisions, # so we load them right now instead of just pushing them on the stack. walker = DFSRevisionsWalker(self.storage, obj_id, state=self._walker_state) for revision in walker: self.write_revision_node(Revision.from_dict(revision)) self.nb_loaded += 1 self._push(self._dir_stack, [revision["directory"]]) # Save the state, so the next call to the walker won't return the same # revisions self._walker_state = walker.export_state() def push_snapshot_subgraph(self, obj_id: Sha1Git) -> None: """Fetches a snapshot and all its children, excluding directories and contents, and pushes them to the todo-lists. Also loads revisions if swh-graph is not available, see :meth:`push_revision_subgraph`.""" loaded_from_graph = False if self.graph: revision_ids = [] release_ids = [] directory_ids = [] content_ids = [] from swh.graph.client import GraphArgumentException # First, try to cook using swh-graph, as it is more efficient than # swh-storage for querying the history obj_swhid = CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=obj_id,) try: swhids: Iterable[CoreSWHID] = map( CoreSWHID.from_string, self.graph.visit_nodes(str(obj_swhid), edges="snp:*,rel:*,rev:rev"), ) for swhid in swhids: if swhid.object_type is ObjectType.REVISION: revision_ids.append(swhid.object_id) elif swhid.object_type is ObjectType.RELEASE: release_ids.append(swhid.object_id) elif swhid.object_type is ObjectType.DIRECTORY: directory_ids.append(swhid.object_id) elif swhid.object_type is ObjectType.CONTENT: content_ids.append(swhid.object_id) elif swhid.object_type is ObjectType.SNAPSHOT: assert ( swhid.object_id == obj_id ), f"Snapshot {obj_id.hex()} references a different snapshot" else: assert_never( swhid.object_type, f"Unexpected SWHID object type: {swhid}" ) except GraphArgumentException as e: logger.info( "Snapshot %s not found in swh-graph, falling back to fetching " "history for each branch. %s", hash_to_hex(obj_id), e.args[0], ) else: self._push(self._rev_stack, revision_ids) self._push(self._rel_stack, release_ids) self._push(self._dir_stack, directory_ids) self._push(self._cnt_stack, content_ids) loaded_from_graph = True # TODO: when self.graph is available and supports edge labels, use it # directly to get branch names. snapshot: Optional[Snapshot] = snapshot_get_all_branches(self.storage, obj_id) assert snapshot, "Unknown snapshot" # should have been caught by check_exists() for branch in snapshot.branches.values(): if not loaded_from_graph: if branch is None: logging.warning("Dangling branch: %r", branch) continue assert isinstance(branch, SnapshotBranch) # for mypy if branch.target_type is TargetType.REVISION: self.push_revision_subgraph(branch.target) elif branch.target_type is TargetType.RELEASE: self.push_releases_subgraphs([branch.target]) elif branch.target_type is TargetType.ALIAS: # Nothing to do, this for loop also iterates on the target branch # (if it exists) pass elif branch.target_type is TargetType.DIRECTORY: self._push(self._dir_stack, [branch.target]) elif branch.target_type is TargetType.CONTENT: self._push(self._cnt_stack, [branch.target]) elif branch.target_type is TargetType.SNAPSHOT: if swhid.object_id != obj_id: raise NotImplementedError( f"{swhid} has a snapshot as a branch." ) else: assert_never( branch.target_type, f"Unexpected target type: {self.obj_type}" ) self.write_refs(snapshot=snapshot) def load_revisions(self, obj_ids: List[Sha1Git]) -> None: """Given a list of revision ids, loads these revisions and their directories; but not their parent revisions (ie. this is not recursive).""" ret: List[Optional[Revision]] = self.storage.revision_get(obj_ids) revisions: List[Revision] = list(filter(None, ret)) if len(ret) != len(revisions): logger.error("Missing revision(s), ignoring them.") for revision in revisions: self.write_revision_node(revision) self._push(self._dir_stack, (rev.directory for rev in revisions)) def write_revision_node(self, revision: Revision) -> bool: """Writes a revision object to disk""" - git_object = git_objects.revision_git_object(revision) + git_object = revision.raw_manifest or git_objects.revision_git_object(revision) return self.write_object(revision.id, git_object) def load_releases(self, obj_ids: List[Sha1Git]) -> List[Release]: """Loads release objects, and returns them.""" ret = self.storage.release_get(obj_ids) releases = list(filter(None, ret)) if len(ret) != len(releases): logger.error("Missing release(s), ignoring them.") for release in releases: self.write_release_node(release) return releases def push_releases_subgraphs(self, obj_ids: List[Sha1Git]) -> None: """Given a list of release ids, loads these releases and adds their target to the list of objects to visit""" for release in self.load_releases(obj_ids): self.nb_loaded += 1 assert release.target, "{release.swhid(}) has no target" if release.target_type is ModelObjectType.REVISION: self.push_revision_subgraph(release.target) elif release.target_type is ModelObjectType.DIRECTORY: self._push(self._dir_stack, [release.target]) elif release.target_type is ModelObjectType.CONTENT: self._push(self._cnt_stack, [release.target]) elif release.target_type is ModelObjectType.RELEASE: self.push_releases_subgraphs([release.target]) elif release.target_type is ModelObjectType.SNAPSHOT: raise NotImplementedError( f"{release.swhid()} targets a snapshot: {release.target!r}" ) else: assert_never( release.target_type, f"Unexpected release target type: {release.target_type}", ) def write_release_node(self, release: Release) -> bool: """Writes a release object to disk""" - git_object = git_objects.release_git_object(release) + git_object = release.raw_manifest or git_objects.release_git_object(release) return self.write_object(release.id, git_object) def load_directories(self, obj_ids: List[Sha1Git]) -> None: if not obj_ids: return + raw_manifests = self.storage.directory_get_raw_manifest(obj_ids) + with multiprocessing.dummy.Pool(min(self.thread_pool_size, len(obj_ids))) as p: - for _ in p.imap_unordered(self.load_directory, obj_ids): + for _ in p.imap_unordered( + lambda obj_id: self.load_directory(obj_id, raw_manifests.get(obj_id)), + obj_ids, + ): pass - def load_directory(self, obj_id: Sha1Git) -> None: + def load_directory(self, obj_id: Sha1Git, raw_manifest: Optional[bytes]) -> None: # Load the directory entries_it: Optional[Iterable[DirectoryEntry]] = stream_results_optional( self.storage.directory_get_entries, obj_id ) if entries_it is None: logger.error("Missing swh:1:dir:%s, ignoring.", hash_to_hex(obj_id)) return - directory = Directory(id=obj_id, entries=tuple(entries_it)) - git_object = git_objects.directory_git_object(directory) + directory = Directory( + id=obj_id, entries=tuple(entries_it), raw_manifest=raw_manifest + ) + git_object = raw_manifest or git_objects.directory_git_object(directory) self.write_object(obj_id, git_object) # Add children to the stack entry_loaders: Dict[str, Optional[List[Sha1Git]]] = { "file": self._cnt_stack, "dir": self._dir_stack, "rev": None, # Do not include submodule targets (rejected by git-fsck) } for entry in directory.entries: stack = entry_loaders[entry.type] if stack is not None: self._push(stack, [entry.target]) def load_contents(self, obj_ids: List[Sha1Git]) -> None: # TODO: add support of filtered objects, somehow? # It's tricky, because, by definition, we can't write a git object with # the expected hash, so git-fsck *will* choke on it. contents = self.storage.content_get(obj_ids, "sha1_git") visible_contents = [] for (obj_id, content) in zip(obj_ids, contents): if content is None: # FIXME: this may also happen for missing content self.write_content(obj_id, SKIPPED_MESSAGE) self._expect_mismatched_object_error(obj_id) elif content.status == "visible": visible_contents.append(content) elif content.status == "hidden": self.write_content(obj_id, HIDDEN_MESSAGE) self._expect_mismatched_object_error(obj_id) elif content.status == "absent": assert False, f"content_get returned absent content {content.swhid()}" else: # TODO: When content.status will have type Literal, replace this with # assert_never assert False, f"{content.swhid} has status: {content.status!r}" contents_and_data: Iterator[Tuple[Content, Optional[bytes]]] if self.objstorage is None: contents_and_data = ( (content, self.storage.content_get_data(content.sha1)) for content in visible_contents ) else: contents_and_data = zip( visible_contents, self.objstorage.get_batch(c.sha1 for c in visible_contents), ) for (content, datum) in contents_and_data: if datum is None: logger.error( "%s is visible, but is missing data. Skipping.", content.swhid() ) continue self.write_content(content.sha1_git, datum) def write_content(self, obj_id: Sha1Git, content: bytes) -> None: header = git_objects.git_object_header("blob", len(content)) self.write_object(obj_id, header + content) def _expect_mismatched_object_error(self, obj_id): obj_id_hex = hash_to_hex(obj_id) obj_path = self._obj_relative_path(obj_id) # For Git < 2.21: self._expected_fsck_errors.add( f"error: sha1 mismatch for ./{obj_path} (expected {obj_id_hex})" ) # For Git >= 2.21: self._expected_fsck_errors.add( f"error: hash mismatch for ./{obj_path} (expected {obj_id_hex})" ) self._expected_fsck_errors.add( f"error: {obj_id_hex}: object corrupt or missing: ./{obj_path}" ) self._expected_fsck_errors.add(f"missing blob {obj_id_hex}") diff --git a/swh/vault/tests/test_cookers.py b/swh/vault/tests/test_cookers.py index 2121962..e810deb 100644 --- a/swh/vault/tests/test_cookers.py +++ b/swh/vault/tests/test_cookers.py @@ -1,1078 +1,1189 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import contextlib import datetime import glob import gzip import io import os import pathlib import shutil import subprocess import tarfile import tempfile import unittest import unittest.mock import dulwich.fastexport import dulwich.index import dulwich.objects import dulwich.porcelain import dulwich.repo import pytest from swh.loader.git.from_disk import GitLoaderFromDisk from swh.model import from_disk, hashutil from swh.model.model import ( - Directory, - DirectoryEntry, Person, + Release, Revision, RevisionType, + Snapshot, + SnapshotBranch, + TargetType, + Timestamp, TimestampWithTimezone, ) +from swh.model.model import Content, Directory, DirectoryEntry +from swh.model.model import ObjectType as ModelObjectType from swh.model.swhids import CoreSWHID, ObjectType from swh.vault.cookers import DirectoryCooker, GitBareCooker, RevisionGitfastCooker from swh.vault.tests.vault_testing import hash_content from swh.vault.to_disk import HIDDEN_MESSAGE, SKIPPED_MESSAGE class TestRepo: """A tiny context manager for a test git repository, with some utility functions to perform basic git stuff. """ def __init__(self, repo_dir=None): self.repo_dir = repo_dir def __enter__(self): if self.repo_dir: self.tmp_dir = None self.repo = dulwich.repo.Repo(self.repo_dir) else: self.tmp_dir = tempfile.TemporaryDirectory(prefix="tmp-vault-repo-") self.repo_dir = self.tmp_dir.__enter__() self.repo = dulwich.repo.Repo.init(self.repo_dir) self.author_name = b"Test Author" self.author_email = b"test@softwareheritage.org" self.author = b"%s <%s>" % (self.author_name, self.author_email) self.base_date = 258244200 self.counter = 0 return pathlib.Path(self.repo_dir) def __exit__(self, exc, value, tb): if self.tmp_dir is not None: self.tmp_dir.__exit__(exc, value, tb) self.repo_dir = None def checkout(self, rev_sha): rev = self.repo[rev_sha] dulwich.index.build_index_from_tree( str(self.repo_dir), self.repo.index_path(), self.repo.object_store, rev.tree ) def git_shell(self, *cmd, stdout=subprocess.DEVNULL, **kwargs): name = self.author_name email = self.author_email date = "%d +0000" % (self.base_date + self.counter) env = { # Set git commit format "GIT_AUTHOR_NAME": name, "GIT_AUTHOR_EMAIL": email, "GIT_AUTHOR_DATE": date, "GIT_COMMITTER_NAME": name, "GIT_COMMITTER_EMAIL": email, "GIT_COMMITTER_DATE": date, # Ignore all the system-wide and user configurations "GIT_CONFIG_NOSYSTEM": "1", "HOME": str(self.tmp_dir), "XDG_CONFIG_HOME": str(self.tmp_dir), } kwargs.setdefault("env", {}).update(env) subprocess.check_call( ("git", "-C", self.repo_dir) + cmd, stdout=stdout, **kwargs ) def commit(self, message="Commit test\n", ref=b"HEAD"): """Commit the current working tree in a new commit with message on the branch 'ref'. At the end of the commit, the reference should stay the same and the index should be clean. """ paths = [ os.path.relpath(path, self.repo_dir) for path in glob.glob(self.repo_dir + "/**/*", recursive=True) ] self.repo.stage(paths) message = message.encode() + b"\n" ret = self.repo.do_commit( message=message, committer=self.author, commit_timestamp=self.base_date + self.counter, commit_timezone=0, ref=ref, ) self.counter += 1 # committing on another branch leaves # dangling files in index if ref != b"HEAD": # XXX this should work (but does not) # dulwich.porcelain.reset(self.repo, 'hard') self.git_shell("reset", "--hard", "HEAD") return ret def tag(self, name, target=b"HEAD", message=None): dulwich.porcelain.tag_create( self.repo, name, message=message, annotated=message is not None, objectish=target, ) def merge(self, parent_sha_list, message="Merge branches."): self.git_shell( "merge", "--allow-unrelated-histories", "-m", message, *[p.decode() for p in parent_sha_list], ) self.counter += 1 return self.repo.refs[b"HEAD"] def print_debug_graph(self, reflog=False): args = ["log", "--all", "--graph", "--decorate"] if reflog: args.append("--reflog") self.git_shell(*args, stdout=None) @pytest.fixture def git_loader(swh_storage,): """Instantiate a Git Loader using the storage instance as storage. """ def _create_loader(directory): return GitLoaderFromDisk( swh_storage, "fake_origin", directory=directory, visit_date=datetime.datetime.now(datetime.timezone.utc), ) return _create_loader @contextlib.contextmanager def cook_extract_directory_dircooker(storage, swhid, fsck=True): """Context manager that cooks a directory and extract it.""" backend = unittest.mock.MagicMock() backend.storage = storage cooker = DirectoryCooker(swhid, backend=backend, storage=storage) cooker.fileobj = io.BytesIO() assert cooker.check_exists() cooker.prepare_bundle() cooker.fileobj.seek(0) with tempfile.TemporaryDirectory(prefix="tmp-vault-extract-") as td: with tarfile.open(fileobj=cooker.fileobj, mode="r") as tar: tar.extractall(td) yield pathlib.Path(td) / str(swhid) cooker.storage = None @contextlib.contextmanager def cook_extract_directory_gitfast(storage, swhid, fsck=True): """Context manager that cooks a revision containing a directory and extract it, using RevisionGitfastCooker""" test_repo = TestRepo() with test_repo as p: date = TimestampWithTimezone.from_datetime( datetime.datetime.now(datetime.timezone.utc) ) revision = Revision( directory=swhid.object_id, message=b"dummy message", author=Person.from_fullname(b"someone"), committer=Person.from_fullname(b"someone"), date=date, committer_date=date, type=RevisionType.GIT, synthetic=False, ) storage.revision_add([revision]) with cook_stream_revision_gitfast( storage, revision.swhid() ) as stream, test_repo as p: processor = dulwich.fastexport.GitImportProcessor(test_repo.repo) processor.import_stream(stream) test_repo.checkout(b"HEAD") shutil.rmtree(p / ".git") yield p @contextlib.contextmanager def cook_extract_directory_git_bare(storage, swhid, fsck=True, direct_objstorage=False): """Context manager that cooks a revision and extract it, using GitBareCooker""" backend = unittest.mock.MagicMock() backend.storage = storage # Cook the object cooker = GitBareCooker( swhid, backend=backend, storage=storage, objstorage=storage.objstorage if direct_objstorage else None, ) cooker.use_fsck = fsck # Some tests try edge-cases that git-fsck rejects cooker.fileobj = io.BytesIO() assert cooker.check_exists() cooker.prepare_bundle() cooker.fileobj.seek(0) # Extract it with tempfile.TemporaryDirectory(prefix="tmp-vault-extract-") as td: with tarfile.open(fileobj=cooker.fileobj, mode="r") as tar: tar.extractall(td) # Clone it with Dulwich with tempfile.TemporaryDirectory(prefix="tmp-vault-clone-") as clone_dir: clone_dir = pathlib.Path(clone_dir) subprocess.check_call( ["git", "clone", os.path.join(td, f"{swhid}.git"), clone_dir,] ) shutil.rmtree(clone_dir / ".git") yield clone_dir @pytest.fixture( scope="module", params=[ cook_extract_directory_dircooker, cook_extract_directory_gitfast, cook_extract_directory_git_bare, ], ) def cook_extract_directory(request): """A fixture that is instantiated as either cook_extract_directory_dircooker or cook_extract_directory_git_bare.""" return request.param @contextlib.contextmanager def cook_stream_revision_gitfast(storage, swhid): """Context manager that cooks a revision and stream its fastexport.""" backend = unittest.mock.MagicMock() backend.storage = storage cooker = RevisionGitfastCooker(swhid, backend=backend, storage=storage) cooker.fileobj = io.BytesIO() assert cooker.check_exists() cooker.prepare_bundle() cooker.fileobj.seek(0) fastexport_stream = gzip.GzipFile(fileobj=cooker.fileobj) yield fastexport_stream cooker.storage = None @contextlib.contextmanager def cook_extract_revision_gitfast(storage, swhid, fsck=True): """Context manager that cooks a revision and extract it, using RevisionGitfastCooker""" test_repo = TestRepo() with cook_stream_revision_gitfast(storage, swhid) as stream, test_repo as p: processor = dulwich.fastexport.GitImportProcessor(test_repo.repo) processor.import_stream(stream) yield test_repo, p @contextlib.contextmanager def cook_extract_git_bare(storage, swhid, fsck=True): """Context manager that cooks a revision and extract it, using GitBareCooker""" backend = unittest.mock.MagicMock() backend.storage = storage # Cook the object cooker = GitBareCooker(swhid, backend=backend, storage=storage) cooker.use_fsck = fsck # Some tests try edge-cases that git-fsck rejects cooker.fileobj = io.BytesIO() assert cooker.check_exists() cooker.prepare_bundle() cooker.fileobj.seek(0) # Extract it with tempfile.TemporaryDirectory(prefix="tmp-vault-extract-") as td: with tarfile.open(fileobj=cooker.fileobj, mode="r") as tar: tar.extractall(td) # Clone it with Dulwich with tempfile.TemporaryDirectory(prefix="tmp-vault-clone-") as clone_dir: clone_dir = pathlib.Path(clone_dir) subprocess.check_call( ["git", "clone", os.path.join(td, f"{swhid}.git"), clone_dir,] ) test_repo = TestRepo(clone_dir) with test_repo: yield test_repo, clone_dir @contextlib.contextmanager def cook_extract_revision_git_bare(storage, swhid, fsck=True): with cook_extract_git_bare(storage, swhid, fsck=fsck,) as res: yield res @pytest.fixture( scope="module", params=[cook_extract_revision_gitfast, cook_extract_revision_git_bare], ) def cook_extract_revision(request): """A fixture that is instantiated as either cook_extract_revision_gitfast or cook_extract_revision_git_bare.""" return request.param @contextlib.contextmanager def cook_extract_snapshot_git_bare(storage, swhid, fsck=True): with cook_extract_git_bare(storage, swhid, fsck=fsck,) as res: yield res @pytest.fixture( scope="module", params=[cook_extract_snapshot_git_bare], ) def cook_extract_snapshot(request): """Equivalent to cook_extract_snapshot_git_bare; but analogous to cook_extract_revision in case we ever have more cookers supporting snapshots""" return request.param TEST_CONTENT = ( " test content\n" "and unicode \N{BLACK HEART SUIT}\n" " and trailing spaces " ) TEST_EXECUTABLE = b"\x42\x40\x00\x00\x05" class TestDirectoryCooker: def test_directory_simple(self, git_loader, cook_extract_directory): repo = TestRepo() with repo as rp: (rp / "file").write_text(TEST_CONTENT) (rp / "executable").write_bytes(TEST_EXECUTABLE) (rp / "executable").chmod(0o755) (rp / "link").symlink_to("file") (rp / "dir1/dir2").mkdir(parents=True) (rp / "dir1/dir2/file").write_text(TEST_CONTENT) c = repo.commit() loader = git_loader(str(rp)) loader.load() obj_id_hex = repo.repo[c].tree.decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) swhid = CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=obj_id) with cook_extract_directory(loader.storage, swhid) as p: assert (p / "file").stat().st_mode == 0o100644 assert (p / "file").read_text() == TEST_CONTENT assert (p / "executable").stat().st_mode == 0o100755 assert (p / "executable").read_bytes() == TEST_EXECUTABLE assert (p / "link").is_symlink() assert os.readlink(str(p / "link")) == "file" assert (p / "dir1/dir2/file").stat().st_mode == 0o100644 assert (p / "dir1/dir2/file").read_text() == TEST_CONTENT directory = from_disk.Directory.from_disk(path=bytes(p)) assert obj_id_hex == hashutil.hash_to_hex(directory.hash) def test_directory_filtered_objects(self, git_loader, cook_extract_directory): repo = TestRepo() with repo as rp: file_1, id_1 = hash_content(b"test1") file_2, id_2 = hash_content(b"test2") file_3, id_3 = hash_content(b"test3") (rp / "file").write_bytes(file_1) (rp / "hidden_file").write_bytes(file_2) (rp / "absent_file").write_bytes(file_3) c = repo.commit() loader = git_loader(str(rp)) loader.load() obj_id_hex = repo.repo[c].tree.decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) swhid = CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=obj_id) # FIXME: storage.content_update() should be changed to allow things # like that with loader.storage.get_db().transaction() as cur: cur.execute( """update content set status = 'visible' where sha1 = %s""", (id_1,), ) cur.execute( """update content set status = 'hidden' where sha1 = %s""", (id_2,), ) cur.execute( """ insert into skipped_content (sha1, sha1_git, sha256, blake2s256, length, reason) select sha1, sha1_git, sha256, blake2s256, length, 'no reason' from content where sha1 = %s """, (id_3,), ) cur.execute("delete from content where sha1 = %s", (id_3,)) with cook_extract_directory(loader.storage, swhid) as p: assert (p / "file").read_bytes() == b"test1" assert (p / "hidden_file").read_bytes() == HIDDEN_MESSAGE assert (p / "absent_file").read_bytes() == SKIPPED_MESSAGE def test_directory_bogus_perms(self, git_loader, cook_extract_directory): # Some early git repositories have 664/775 permissions... let's check # if all the weird modes are properly normalized in the directory # cooker. repo = TestRepo() with repo as rp: (rp / "file").write_text(TEST_CONTENT) (rp / "file").chmod(0o664) (rp / "executable").write_bytes(TEST_EXECUTABLE) (rp / "executable").chmod(0o775) (rp / "wat").write_text(TEST_CONTENT) (rp / "wat").chmod(0o604) # Disable mode cleanup with unittest.mock.patch("dulwich.index.cleanup_mode", lambda mode: mode): c = repo.commit() # Make sure Dulwich didn't normalize the permissions itself. # (if it did, then the test can't check the cooker normalized them) tree_id = repo.repo[c].tree assert {entry.mode for entry in repo.repo[tree_id].items()} == { 0o100775, 0o100664, 0o100604, } # Disable mode checks with unittest.mock.patch("dulwich.objects.Tree.check", lambda self: None): loader = git_loader(str(rp)) loader.load() # Make sure swh-loader didn't normalize them either dir_entries = loader.storage.directory_ls(hashutil.bytehex_to_hash(tree_id)) assert {entry["perms"] for entry in dir_entries} == { 0o100664, 0o100775, 0o100604, } obj_id_hex = repo.repo[c].tree.decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) swhid = CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=obj_id) with cook_extract_directory(loader.storage, swhid) as p: assert (p / "file").stat().st_mode == 0o100644 assert (p / "executable").stat().st_mode == 0o100755 assert (p / "wat").stat().st_mode == 0o100644 @pytest.mark.parametrize("direct_objstorage", [True, False]) def test_directory_objstorage( self, swh_storage, git_loader, mocker, direct_objstorage ): """Like test_directory_simple, but using swh_objstorage directly, without going through swh_storage.content_get_data()""" repo = TestRepo() with repo as rp: (rp / "file").write_text(TEST_CONTENT) (rp / "executable").write_bytes(TEST_EXECUTABLE) (rp / "executable").chmod(0o755) (rp / "link").symlink_to("file") (rp / "dir1/dir2").mkdir(parents=True) (rp / "dir1/dir2/file").write_text(TEST_CONTENT) c = repo.commit() loader = git_loader(str(rp)) loader.load() obj_id_hex = repo.repo[c].tree.decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) swhid = CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=obj_id) # Set-up spies storage_content_get_data = mocker.patch.object( swh_storage, "content_get_data", wraps=swh_storage.content_get_data ) objstorage_content_batch = mocker.patch.object( swh_storage.objstorage, "get_batch", wraps=swh_storage.objstorage.get_batch ) with cook_extract_directory_git_bare( loader.storage, swhid, direct_objstorage=direct_objstorage ) as p: assert (p / "file").stat().st_mode == 0o100644 assert (p / "file").read_text() == TEST_CONTENT assert (p / "executable").stat().st_mode == 0o100755 assert (p / "executable").read_bytes() == TEST_EXECUTABLE assert (p / "link").is_symlink() assert os.readlink(str(p / "link")) == "file" assert (p / "dir1/dir2/file").stat().st_mode == 0o100644 assert (p / "dir1/dir2/file").read_text() == TEST_CONTENT directory = from_disk.Directory.from_disk(path=bytes(p)) assert obj_id_hex == hashutil.hash_to_hex(directory.hash) if direct_objstorage: storage_content_get_data.assert_not_called() objstorage_content_batch.assert_called() else: storage_content_get_data.assert_called() objstorage_content_batch.assert_not_called() def test_directory_revision_data(self, swh_storage): target_rev = "0e8a3ad980ec179856012b7eecf4327e99cd44cd" dir = Directory( entries=( DirectoryEntry( name=b"submodule", type="rev", target=hashutil.hash_to_bytes(target_rev), perms=0o100644, ), ), ) swh_storage.directory_add([dir]) with cook_extract_directory_dircooker( swh_storage, dir.swhid(), fsck=False ) as p: assert (p / "submodule").is_symlink() assert os.readlink(str(p / "submodule")) == target_rev class RepoFixtures: """Shared loading and checking methods that can be reused by different types of tests.""" def load_repo_simple(self, git_loader): # # 1--2--3--4--5--6--7 # repo = TestRepo() with repo as rp: (rp / "file1").write_text(TEST_CONTENT) repo.commit("add file1") (rp / "file2").write_text(TEST_CONTENT) repo.commit("add file2") (rp / "dir1/dir2").mkdir(parents=True) (rp / "dir1/dir2/file").write_text(TEST_CONTENT) (rp / "bin1").write_bytes(TEST_EXECUTABLE) (rp / "bin1").chmod(0o755) repo.commit("add bin1") (rp / "link1").symlink_to("file1") repo.commit("link link1 to file1") (rp / "file2").unlink() repo.commit("remove file2") (rp / "bin1").rename(rp / "bin") repo.commit("rename bin1 to bin") loader = git_loader(str(rp)) loader.load() obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=obj_id) return (loader, swhid) def check_revision_simple(self, ert, p, swhid): ert.checkout(b"HEAD") assert (p / "file1").stat().st_mode == 0o100644 assert (p / "file1").read_text() == TEST_CONTENT assert (p / "link1").is_symlink() assert os.readlink(str(p / "link1")) == "file1" assert (p / "bin").stat().st_mode == 0o100755 assert (p / "bin").read_bytes() == TEST_EXECUTABLE assert (p / "dir1/dir2/file").read_text() == TEST_CONTENT assert (p / "dir1/dir2/file").stat().st_mode == 0o100644 assert ert.repo.refs[b"HEAD"].decode() == swhid.object_id.hex() def load_repo_two_roots(self, git_loader): # # 1----3---4 # / # 2---- # repo = TestRepo() with repo as rp: (rp / "file1").write_text(TEST_CONTENT) c1 = repo.commit("Add file1") del repo.repo.refs[b"refs/heads/master"] # git update-ref -d HEAD (rp / "file2").write_text(TEST_CONTENT) repo.commit("Add file2") repo.merge([c1]) (rp / "file3").write_text(TEST_CONTENT) repo.commit("add file3") obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=obj_id) loader = git_loader(str(rp)) loader.load() return (loader, swhid) def check_revision_two_roots(self, ert, p, swhid): assert ert.repo.refs[b"HEAD"].decode() == swhid.object_id.hex() (c3,) = ert.repo[hashutil.hash_to_bytehex(swhid.object_id)].parents assert len(ert.repo[c3].parents) == 2 def load_repo_two_heads(self, git_loader): # # 1---2----4 <-- master and b1 # \ # ----3 <-- b2 # repo = TestRepo() with repo as rp: (rp / "file1").write_text(TEST_CONTENT) repo.commit("Add file1") (rp / "file2").write_text(TEST_CONTENT) c2 = repo.commit("Add file2") repo.repo.refs[b"refs/heads/b2"] = c2 # branch b2 from master (rp / "file3").write_text(TEST_CONTENT) repo.commit("add file3", ref=b"refs/heads/b2") (rp / "file4").write_text(TEST_CONTENT) c4 = repo.commit("add file4", ref=b"refs/heads/master") repo.repo.refs[b"refs/heads/b1"] = c4 # branch b1 from master obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=obj_id) loader = git_loader(str(rp)) loader.load() return (loader, swhid) def check_snapshot_two_heads(self, ert, p, swhid): assert ( hashutil.hash_to_bytehex(swhid.object_id) == ert.repo.refs[b"HEAD"] == ert.repo.refs[b"refs/heads/master"] == ert.repo.refs[b"refs/remotes/origin/HEAD"] == ert.repo.refs[b"refs/remotes/origin/master"] == ert.repo.refs[b"refs/remotes/origin/b1"] ) c4_id = hashutil.hash_to_bytehex(swhid.object_id) c3_id = ert.repo.refs[b"refs/remotes/origin/b2"] assert ert.repo[c3_id].parents == ert.repo[c4_id].parents def load_repo_two_double_fork_merge(self, git_loader): # # 2---4---6 # / / / # 1---3---5 # repo = TestRepo() with repo as rp: (rp / "file1").write_text(TEST_CONTENT) c1 = repo.commit("Add file1") # create commit 1 repo.repo.refs[b"refs/heads/c1"] = c1 # branch c1 from master (rp / "file2").write_text(TEST_CONTENT) repo.commit("Add file2") # create commit 2 (rp / "file3").write_text(TEST_CONTENT) c3 = repo.commit("Add file3", ref=b"refs/heads/c1") # create commit 3 on c1 repo.repo.refs[b"refs/heads/c3"] = c3 # branch c3 from c1 repo.merge([c3]) # create commit 4 (rp / "file5").write_text(TEST_CONTENT) c5 = repo.commit("Add file3", ref=b"refs/heads/c3") # create commit 5 on c3 repo.merge([c5]) # create commit 6 obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=obj_id) loader = git_loader(str(rp)) loader.load() return (loader, swhid) def check_revision_two_double_fork_merge(self, ert, p, swhid): assert ert.repo.refs[b"HEAD"].decode() == swhid.object_id.hex() def check_snapshot_two_double_fork_merge(self, ert, p, swhid): assert ( hashutil.hash_to_bytehex(swhid.object_id) == ert.repo.refs[b"HEAD"] == ert.repo.refs[b"refs/heads/master"] == ert.repo.refs[b"refs/remotes/origin/HEAD"] == ert.repo.refs[b"refs/remotes/origin/master"] ) (c4_id, c5_id) = ert.repo[swhid.object_id.hex().encode()].parents assert c5_id == ert.repo.refs[b"refs/remotes/origin/c3"] (c2_id, c3_id) = ert.repo[c4_id].parents assert c3_id == ert.repo.refs[b"refs/remotes/origin/c1"] def load_repo_triple_merge(self, git_loader): # # .---.---5 # / / / # 2 3 4 # / / / # 1---.---. # repo = TestRepo() with repo as rp: (rp / "file1").write_text(TEST_CONTENT) c1 = repo.commit("Commit 1") repo.repo.refs[b"refs/heads/b1"] = c1 repo.repo.refs[b"refs/heads/b2"] = c1 repo.commit("Commit 2") c3 = repo.commit("Commit 3", ref=b"refs/heads/b1") c4 = repo.commit("Commit 4", ref=b"refs/heads/b2") repo.merge([c3, c4]) obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=obj_id) loader = git_loader(str(rp)) loader.load() return (loader, swhid) def check_revision_triple_merge(self, ert, p, swhid): assert ert.repo.refs[b"HEAD"].decode() == swhid.object_id.hex() def check_snapshot_triple_merge(self, ert, p, swhid): assert ( hashutil.hash_to_bytehex(swhid.object_id) == ert.repo.refs[b"HEAD"] == ert.repo.refs[b"refs/heads/master"] == ert.repo.refs[b"refs/remotes/origin/HEAD"] == ert.repo.refs[b"refs/remotes/origin/master"] ) (c2_id, c3_id, c4_id) = ert.repo[swhid.object_id.hex().encode()].parents assert c3_id == ert.repo.refs[b"refs/remotes/origin/b1"] assert c4_id == ert.repo.refs[b"refs/remotes/origin/b2"] assert ( ert.repo[c2_id].parents == ert.repo[c3_id].parents == ert.repo[c4_id].parents ) def load_repo_filtered_objects(self, git_loader): repo = TestRepo() with repo as rp: file_1, id_1 = hash_content(b"test1") file_2, id_2 = hash_content(b"test2") file_3, id_3 = hash_content(b"test3") (rp / "file").write_bytes(file_1) (rp / "hidden_file").write_bytes(file_2) (rp / "absent_file").write_bytes(file_3) repo.commit() obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=obj_id) loader = git_loader(str(rp)) loader.load() # FIXME: storage.content_update() should be changed to allow things # like that with loader.storage.get_db().transaction() as cur: cur.execute( """update content set status = 'visible' where sha1 = %s""", (id_1,), ) cur.execute( """update content set status = 'hidden' where sha1 = %s""", (id_2,), ) cur.execute( """ insert into skipped_content (sha1, sha1_git, sha256, blake2s256, length, reason) select sha1, sha1_git, sha256, blake2s256, length, 'no reason' from content where sha1 = %s """, (id_3,), ) cur.execute("delete from content where sha1 = %s", (id_3,)) return (loader, swhid) def check_revision_filtered_objects(self, ert, p, swhid): ert.checkout(b"HEAD") assert (p / "file").read_bytes() == b"test1" assert (p / "hidden_file").read_bytes() == HIDDEN_MESSAGE assert (p / "absent_file").read_bytes() == SKIPPED_MESSAGE def load_repo_null_fields(self, git_loader): # Our schema doesn't enforce a lot of non-null revision fields. We need # to check these cases don't break the cooker. repo = TestRepo() with repo as rp: (rp / "file").write_text(TEST_CONTENT) c = repo.commit("initial commit") loader = git_loader(str(rp)) loader.load() repo.repo.refs[b"HEAD"].decode() dir_id_hex = repo.repo[c].tree.decode() dir_id = hashutil.hash_to_bytes(dir_id_hex) test_revision = Revision( message=b"", author=Person(name=None, email=None, fullname=b""), date=None, committer=Person(name=None, email=None, fullname=b""), committer_date=None, parents=(), type=RevisionType.GIT, directory=dir_id, metadata={}, synthetic=True, ) storage = loader.storage storage.revision_add([test_revision]) return (loader, test_revision.swhid()) def check_revision_null_fields(self, ert, p, swhid): ert.checkout(b"HEAD") assert (p / "file").stat().st_mode == 0o100644 def load_repo_tags(self, git_loader): # v-- t2 # # 1---2----5 <-- master, t5, and t5a (annotated) # \ # ----3----4 <-- t4a (annotated) # repo = TestRepo() with repo as rp: (rp / "file1").write_text(TEST_CONTENT) repo.commit("Add file1") (rp / "file2").write_text(TEST_CONTENT) repo.commit("Add file2") # create c2 repo.tag(b"t2") (rp / "file3").write_text(TEST_CONTENT) repo.commit("add file3") (rp / "file4").write_text(TEST_CONTENT) repo.commit("add file4") repo.tag(b"t4a", message=b"tag 4") # Go back to c2 repo.git_shell("reset", "--hard", "HEAD^^") (rp / "file5").write_text(TEST_CONTENT) repo.commit("add file5") # create c5 repo.tag(b"t5") repo.tag(b"t5a", message=b"tag 5") obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=obj_id) loader = git_loader(str(rp)) loader.load() return (loader, swhid) def check_snapshot_tags(self, ert, p, swhid): assert ( hashutil.hash_to_bytehex(swhid.object_id) == ert.repo.refs[b"HEAD"] == ert.repo.refs[b"refs/heads/master"] == ert.repo.refs[b"refs/remotes/origin/HEAD"] == ert.repo.refs[b"refs/remotes/origin/master"] == ert.repo.refs[b"refs/tags/t5"] ) c2_id = ert.repo.refs[b"refs/tags/t2"] c5_id = hashutil.hash_to_bytehex(swhid.object_id) assert ert.repo[c5_id].parents == [c2_id] t5a = ert.repo[ert.repo.refs[b"refs/tags/t5a"]] # TODO: investigate why new dulwich adds \n assert t5a.message in (b"tag 5", b"tag 5\n") assert t5a.object == (dulwich.objects.Commit, c5_id) t4a = ert.repo[ert.repo.refs[b"refs/tags/t4a"]] (_, c4_id) = t4a.object assert ert.repo[c4_id].message == b"add file4\n" # TODO: ditto (c3_id,) = ert.repo[c4_id].parents assert ert.repo[c3_id].message == b"add file3\n" # TODO: ditto assert ert.repo[c3_id].parents == [c2_id] class TestRevisionCooker(RepoFixtures): def test_revision_simple(self, git_loader, cook_extract_revision): (loader, swhid) = self.load_repo_simple(git_loader) with cook_extract_revision(loader.storage, swhid) as (ert, p): self.check_revision_simple(ert, p, swhid) def test_revision_two_roots(self, git_loader, cook_extract_revision): (loader, swhid) = self.load_repo_two_roots(git_loader) with cook_extract_revision(loader.storage, swhid) as (ert, p): self.check_revision_two_roots(ert, p, swhid) def test_revision_two_double_fork_merge(self, git_loader, cook_extract_revision): (loader, swhid) = self.load_repo_two_double_fork_merge(git_loader) with cook_extract_revision(loader.storage, swhid) as (ert, p): self.check_revision_two_double_fork_merge(ert, p, swhid) def test_revision_triple_merge(self, git_loader, cook_extract_revision): (loader, swhid) = self.load_repo_triple_merge(git_loader) with cook_extract_revision(loader.storage, swhid) as (ert, p): self.check_revision_triple_merge(ert, p, swhid) def test_revision_filtered_objects(self, git_loader, cook_extract_revision): (loader, swhid) = self.load_repo_filtered_objects(git_loader) with cook_extract_revision(loader.storage, swhid) as (ert, p): self.check_revision_filtered_objects(ert, p, swhid) def test_revision_null_fields(self, git_loader, cook_extract_revision): (loader, swhid) = self.load_repo_null_fields(git_loader) with cook_extract_revision(loader.storage, swhid, fsck=False) as (ert, p): self.check_revision_null_fields(ert, p, swhid) @pytest.mark.parametrize("ingest_target_revision", [False, True]) def test_revision_submodule( self, swh_storage, cook_extract_revision, ingest_target_revision ): date = TimestampWithTimezone.from_datetime( datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0) ) target_rev = Revision( message=b"target_rev", author=Person.from_fullname(b"me "), date=date, committer=Person.from_fullname(b"me "), committer_date=date, parents=(), type=RevisionType.GIT, directory=bytes.fromhex("3333333333333333333333333333333333333333"), metadata={}, synthetic=True, ) if ingest_target_revision: swh_storage.revision_add([target_rev]) dir = Directory( entries=( DirectoryEntry( name=b"submodule", type="rev", target=target_rev.id, perms=0o160000, ), ), ) swh_storage.directory_add([dir]) rev = Revision( message=b"msg", author=Person.from_fullname(b"me "), date=date, committer=Person.from_fullname(b"me "), committer_date=date, parents=(), type=RevisionType.GIT, directory=dir.id, metadata={}, synthetic=True, ) swh_storage.revision_add([rev]) with cook_extract_revision(swh_storage, rev.swhid()) as (ert, p): ert.checkout(b"HEAD") pattern = b"160000 submodule\x00%s" % target_rev.id tree = ert.repo[b"HEAD"].tree assert pattern in ert.repo[tree].as_raw_string() class TestSnapshotCooker(RepoFixtures): def test_snapshot_simple(self, git_loader, cook_extract_snapshot): (loader, main_rev_id) = self.load_repo_simple(git_loader) snp_id = loader.loaded_snapshot_id swhid = CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=snp_id) with cook_extract_snapshot(loader.storage, swhid) as (ert, p): self.check_revision_simple(ert, p, main_rev_id) def test_snapshot_two_roots(self, git_loader, cook_extract_snapshot): (loader, main_rev_id) = self.load_repo_two_roots(git_loader) snp_id = loader.loaded_snapshot_id swhid = CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=snp_id) with cook_extract_snapshot(loader.storage, swhid) as (ert, p): self.check_revision_two_roots(ert, p, main_rev_id) def test_snapshot_two_heads(self, git_loader, cook_extract_snapshot): (loader, main_rev_id) = self.load_repo_two_heads(git_loader) snp_id = loader.loaded_snapshot_id swhid = CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=snp_id) with cook_extract_snapshot(loader.storage, swhid) as (ert, p): self.check_snapshot_two_heads(ert, p, main_rev_id) def test_snapshot_two_double_fork_merge(self, git_loader, cook_extract_snapshot): (loader, main_rev_id) = self.load_repo_two_double_fork_merge(git_loader) snp_id = loader.loaded_snapshot_id swhid = CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=snp_id) with cook_extract_snapshot(loader.storage, swhid) as (ert, p): self.check_revision_two_double_fork_merge(ert, p, main_rev_id) self.check_snapshot_two_double_fork_merge(ert, p, main_rev_id) def test_snapshot_triple_merge(self, git_loader, cook_extract_snapshot): (loader, main_rev_id) = self.load_repo_triple_merge(git_loader) snp_id = loader.loaded_snapshot_id swhid = CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=snp_id) with cook_extract_snapshot(loader.storage, swhid) as (ert, p): self.check_revision_triple_merge(ert, p, main_rev_id) self.check_snapshot_triple_merge(ert, p, main_rev_id) def test_snapshot_filtered_objects(self, git_loader, cook_extract_snapshot): (loader, main_rev_id) = self.load_repo_filtered_objects(git_loader) snp_id = loader.loaded_snapshot_id swhid = CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=snp_id) with cook_extract_snapshot(loader.storage, swhid) as (ert, p): self.check_revision_filtered_objects(ert, p, main_rev_id) def test_snapshot_tags(self, git_loader, cook_extract_snapshot): (loader, main_rev_id) = self.load_repo_tags(git_loader) snp_id = loader.loaded_snapshot_id swhid = CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=snp_id) with cook_extract_snapshot(loader.storage, swhid) as (ert, p): self.check_snapshot_tags(ert, p, main_rev_id) + + def test_original_malformed_objects(self, swh_storage, cook_extract_snapshot): + """Tests that objects that were originally malformed: + + * are still interpreted somewhat correctly (if the loader could make sense of + them), especially that they still have links to children + * have their original manifest in the bundle + """ + date = TimestampWithTimezone.from_numeric_offset( + Timestamp(1643819927, 0), 0, False + ) + + content = Content.from_data(b"foo") + swh_storage.content_add([content]) + + # disordered + # fmt: off + malformed_dir_manifest = ( + b"" + + b"100644 file2\x00" + content.sha1_git + + b"100644 file1\x00" + content.sha1_git + ) + # fmt: on + directory = Directory( + entries=( + DirectoryEntry( + name=b"file1", type="file", perms=0o100644, target=content.sha1_git + ), + DirectoryEntry( + name=b"file2", type="file", perms=0o100644, target=content.sha1_git + ), + ), + raw_manifest=f"tree {len(malformed_dir_manifest)}\x00".encode() + + malformed_dir_manifest, + ) + swh_storage.directory_add([directory]) + + # 'committer' and 'author' swapped + # fmt: off + malformed_rev_manifest = ( + b"tree " + hashutil.hash_to_bytehex(directory.id) + b"\n" + + b"committer me 1643819927 +0000\n" + + b"author me 1643819927 +0000\n" + + b"\n" + + b"rev" + ) + # fmt: on + revision = Revision( + message=b"rev", + author=Person.from_fullname(b"me "), + date=date, + committer=Person.from_fullname(b"me "), + committer_date=date, + parents=(), + type=RevisionType.GIT, + directory=directory.id, + synthetic=True, + raw_manifest=f"commit {len(malformed_rev_manifest)}\x00".encode() + + malformed_rev_manifest, + ) + swh_storage.revision_add([revision]) + + # 'tag' and 'tagger' swapped + # fmt: off + malformed_rel_manifest = ( + b"object " + hashutil.hash_to_bytehex(revision.id) + b"\n" + + b"type commit\n" + + b"tagger me 1643819927 +0000\n" + + b"tag v1.1.0\n" + ) + # fmt: on + + release = Release( + name=b"v1.1.0", + message=None, + author=Person.from_fullname(b"me "), + date=date, + target=revision.id, + target_type=ModelObjectType.REVISION, + synthetic=True, + raw_manifest=f"tag {len(malformed_rel_manifest)}\x00".encode() + + malformed_rel_manifest, + ) + swh_storage.release_add([release]) + + snapshot = Snapshot( + branches={ + b"refs/tags/v1.1.0": SnapshotBranch( + target=release.id, target_type=TargetType.RELEASE + ), + b"HEAD": SnapshotBranch( + target=revision.id, target_type=TargetType.REVISION + ), + } + ) + swh_storage.snapshot_add([snapshot]) + + with cook_extract_snapshot(swh_storage, snapshot.swhid()) as (ert, p): + tag = ert.repo[b"refs/tags/v1.1.0"] + assert tag.as_raw_string() == malformed_rel_manifest + + commit = ert.repo[tag.object[1]] + assert commit.as_raw_string() == malformed_rev_manifest + + tree = ert.repo[commit.tree] + assert tree.as_raw_string() == malformed_dir_manifest