diff --git a/dulwich/repo.py b/dulwich/repo.py index 1927bd6c..3ed88747 100644 --- a/dulwich/repo.py +++ b/dulwich/repo.py @@ -1,1223 +1,1223 @@ # repo.py -- For dealing with git repositories. # Copyright (C) 2007 James Westby # Copyright (C) 2008-2013 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Repository access. This module contains the base class for git repositories (BaseRepo) and an implementation which uses a repository on local disk (Repo). """ from io import BytesIO import errno import os import sys import stat import time from dulwich.errors import ( NoIndexPresent, NotBlobError, NotCommitError, NotGitRepository, NotTreeError, NotTagError, CommitError, RefFormatError, HookError, ) from dulwich.file import ( GitFile, ) from dulwich.object_store import ( DiskObjectStore, MemoryObjectStore, ObjectStoreGraphWalker, ) from dulwich.objects import ( check_hexsha, Blob, Commit, ShaFile, Tag, Tree, ) from dulwich.hooks import ( PreCommitShellHook, PostCommitShellHook, CommitMsgShellHook, ) from dulwich.refs import ( # noqa: F401 check_ref_format, RefsContainer, DictRefsContainer, InfoRefsContainer, DiskRefsContainer, read_packed_refs, read_packed_refs_with_peeled, write_packed_refs, SYMREF, ) import warnings CONTROLDIR = '.git' OBJECTDIR = 'objects' REFSDIR = 'refs' REFSDIR_TAGS = 'tags' REFSDIR_HEADS = 'heads' INDEX_FILENAME = "index" COMMONDIR = 'commondir' GITDIR = 'gitdir' WORKTREES = 'worktrees' BASE_DIRECTORIES = [ ["branches"], [REFSDIR], [REFSDIR, REFSDIR_TAGS], [REFSDIR, REFSDIR_HEADS], ["hooks"], ["info"] ] DEFAULT_REF = b'refs/heads/master' def parse_graftpoints(graftpoints): """Convert a list of graftpoints into a dict :param graftpoints: Iterator of graftpoint lines Each line is formatted as: []* Resulting dictionary is: : [*] https://git.wiki.kernel.org/index.php/GraftPoint """ grafts = {} for l in graftpoints: raw_graft = l.split(None, 1) commit = raw_graft[0] if len(raw_graft) == 2: parents = raw_graft[1].split() else: parents = [] for sha in [commit] + parents: check_hexsha(sha, 'Invalid graftpoint') grafts[commit] = parents return grafts def serialize_graftpoints(graftpoints): """Convert a dictionary of grafts into string The graft dictionary is: : [*] Each line is formatted as: []* https://git.wiki.kernel.org/index.php/GraftPoint """ graft_lines = [] for commit, parents in graftpoints.items(): if parents: graft_lines.append(commit + b' ' + b' '.join(parents)) else: graft_lines.append(commit) return b'\n'.join(graft_lines) class BaseRepo(object): """Base class for a git repository. :ivar object_store: Dictionary-like object for accessing the objects :ivar refs: Dictionary-like object with the refs in this repository """ def __init__(self, object_store, refs): """Open a repository. This shouldn't be called directly, but rather through one of the base classes, such as MemoryRepo or Repo. :param object_store: Object store to use :param refs: Refs container to use """ self.object_store = object_store self.refs = refs self._graftpoints = {} self.hooks = {} def _determine_file_mode(self): """Probe the file-system to determine whether permissions can be trusted. :return: True if permissions can be trusted, False otherwise. """ raise NotImplementedError(self._determine_file_mode) def _init_files(self, bare): """Initialize a default set of named files.""" from dulwich.config import ConfigFile self._put_named_file('description', b"Unnamed repository") f = BytesIO() cf = ConfigFile() cf.set("core", "repositoryformatversion", "0") if self._determine_file_mode(): cf.set("core", "filemode", True) else: cf.set("core", "filemode", False) cf.set("core", "bare", bare) cf.set("core", "logallrefupdates", True) cf.write_to_file(f) self._put_named_file('config', f.getvalue()) self._put_named_file(os.path.join('info', 'exclude'), b'') def get_named_file(self, path): """Get a file from the control dir with a specific name. Although the filename should be interpreted as a filename relative to the control dir in a disk-based Repo, the object returned need not be pointing to a file in that location. :param path: The path to the file, relative to the control dir. :return: An open file object, or None if the file does not exist. """ raise NotImplementedError(self.get_named_file) def _put_named_file(self, path, contents): """Write a file to the control dir with the given name and contents. :param path: The path to the file, relative to the control dir. :param contents: A string to write to the file. """ raise NotImplementedError(self._put_named_file) def open_index(self): """Open the index for this repository. :raise NoIndexPresent: If no index is present :return: The matching `Index` """ raise NotImplementedError(self.open_index) def fetch(self, target, determine_wants=None, progress=None): """Fetch objects into another repository. :param target: The target repository :param determine_wants: Optional function to determine what refs to fetch. :param progress: Optional progress function :return: The local refs """ if determine_wants is None: determine_wants = target.object_store.determine_wants_all target.object_store.add_objects( self.fetch_objects(determine_wants, target.get_graph_walker(), progress)) return self.get_refs() def fetch_objects(self, determine_wants, graph_walker, progress, get_tagged=None): """Fetch the missing objects required for a set of revisions. :param determine_wants: Function that takes a dictionary with heads and returns the list of heads to fetch. :param graph_walker: Object that can iterate over the list of revisions to fetch and has an "ack" method that will be called to acknowledge that a revision is present. :param progress: Simple progress function that will be called with updated progress strings. :param get_tagged: Function that returns a dict of pointed-to sha -> tag sha for including tags. :return: iterator over objects, with __len__ implemented """ wants = determine_wants(self.get_refs()) if not isinstance(wants, list): raise TypeError("determine_wants() did not return a list") shallows = getattr(graph_walker, 'shallow', frozenset()) unshallows = getattr(graph_walker, 'unshallow', frozenset()) if wants == []: # TODO(dborowitz): find a way to short-circuit that doesn't change # this interface. if shallows or unshallows: # Do not send a pack in shallow short-circuit path return None return [] # If the graph walker is set up with an implementation that can # ACK/NAK to the wire, it will write data to the client through # this call as a side-effect. haves = self.object_store.find_common_revisions(graph_walker) # Deal with shallow requests separately because the haves do # not reflect what objects are missing if shallows or unshallows: # TODO: filter the haves commits from iter_shas. the specific # commits aren't missing. haves = [] def get_parents(commit): if commit.id in shallows: return [] return self.get_parents(commit.id, commit) return self.object_store.iter_shas( self.object_store.find_missing_objects( haves, wants, progress, get_tagged, get_parents=get_parents)) def get_graph_walker(self, heads=None): """Retrieve a graph walker. A graph walker is used by a remote repository (or proxy) to find out which objects are present in this repository. :param heads: Repository heads to use (optional) :return: A graph walker object """ if heads is None: heads = self.refs.as_dict(b'refs/heads').values() return ObjectStoreGraphWalker(heads, self.get_parents) def get_refs(self): """Get dictionary with all refs. :return: A ``dict`` mapping ref names to SHA1s """ return self.refs.as_dict() def head(self): """Return the SHA1 pointed at by HEAD.""" return self.refs[b'HEAD'] def _get_object(self, sha, cls): assert len(sha) in (20, 40) ret = self.get_object(sha) if not isinstance(ret, cls): if cls is Commit: raise NotCommitError(ret) elif cls is Blob: raise NotBlobError(ret) elif cls is Tree: raise NotTreeError(ret) elif cls is Tag: raise NotTagError(ret) else: raise Exception("Type invalid: %r != %r" % ( ret.type_name, cls.type_name)) return ret def get_object(self, sha): """Retrieve the object with the specified SHA. :param sha: SHA to retrieve :return: A ShaFile object :raise KeyError: when the object can not be found """ return self.object_store[sha] def get_parents(self, sha, commit=None): """Retrieve the parents of a specific commit. If the specific commit is a graftpoint, the graft parents will be returned instead. :param sha: SHA of the commit for which to retrieve the parents :param commit: Optional commit matching the sha :return: List of parents """ try: return self._graftpoints[sha] except KeyError: if commit is None: commit = self[sha] return commit.parents def get_config(self): """Retrieve the config object. :return: `ConfigFile` object for the ``.git/config`` file. """ raise NotImplementedError(self.get_config) def get_description(self): """Retrieve the description for this repository. :return: String with the description of the repository as set by the user. """ raise NotImplementedError(self.get_description) def set_description(self, description): """Set the description for this repository. :param description: Text to set as description for this repository. """ raise NotImplementedError(self.set_description) def get_config_stack(self): """Return a config stack for this repository. This stack accesses the configuration for both this repository itself (.git/config) and the global configuration, which usually lives in ~/.gitconfig. :return: `Config` instance for this repository """ from dulwich.config import StackedConfig backends = [self.get_config()] + StackedConfig.default_backends() return StackedConfig(backends, writable=backends[0]) def get_peeled(self, ref): """Get the peeled value of a ref. :param ref: The refname to peel. :return: The fully-peeled SHA1 of a tag object, after peeling all intermediate tags; if the original ref does not point to a tag, this will equal the original SHA1. """ cached = self.refs.get_peeled(ref) if cached is not None: return cached return self.object_store.peel_sha(self.refs[ref]).id def get_walker(self, include=None, *args, **kwargs): """Obtain a walker for this repository. :param include: Iterable of SHAs of commits to include along with their ancestors. Defaults to [HEAD] :param exclude: Iterable of SHAs of commits to exclude along with their ancestors, overriding includes. :param order: ORDER_* constant specifying the order of results. Anything other than ORDER_DATE may result in O(n) memory usage. :param reverse: If True, reverse the order of output, requiring O(n) memory. :param max_entries: The maximum number of entries to yield, or None for no limit. :param paths: Iterable of file or subtree paths to show entries for. :param rename_detector: diff.RenameDetector object for detecting renames. :param follow: If True, follow path across renames/copies. Forces a default rename_detector. :param since: Timestamp to list commits after. :param until: Timestamp to list commits before. :param queue_cls: A class to use for a queue of commits, supporting the iterator protocol. The constructor takes a single argument, the Walker. :return: A `Walker` object """ from dulwich.walk import Walker if include is None: include = [self.head()] if isinstance(include, str): include = [include] kwargs['get_parents'] = lambda commit: self.get_parents( commit.id, commit) return Walker(self.object_store, include, *args, **kwargs) def __getitem__(self, name): """Retrieve a Git object by SHA1 or ref. :param name: A Git object SHA1 or a ref name :return: A `ShaFile` object, such as a Commit or Blob :raise KeyError: when the specified ref or object does not exist """ if not isinstance(name, bytes): raise TypeError("'name' must be bytestring, not %.80s" % type(name).__name__) if len(name) in (20, 40): try: return self.object_store[name] except (KeyError, ValueError): pass try: return self.object_store[self.refs[name]] except RefFormatError: raise KeyError(name) def __contains__(self, name): """Check if a specific Git object or ref is present. :param name: Git object SHA1 or ref name """ if len(name) in (20, 40): return name in self.object_store or name in self.refs else: return name in self.refs def __setitem__(self, name, value): """Set a ref. :param name: ref name :param value: Ref value - either a ShaFile object, or a hex sha """ if name.startswith(b"refs/") or name == b'HEAD': if isinstance(value, ShaFile): self.refs[name] = value.id elif isinstance(value, bytes): self.refs[name] = value else: raise TypeError(value) else: raise ValueError(name) def __delitem__(self, name): """Remove a ref. :param name: Name of the ref to remove """ if name.startswith(b"refs/") or name == b"HEAD": del self.refs[name] else: raise ValueError(name) def _get_user_identity(self): """Determine the identity to use for new commits. """ user = os.environ.get("GIT_COMMITTER_NAME") email = os.environ.get("GIT_COMMITTER_EMAIL") config = self.get_config_stack() if user is None: try: user = config.get(("user", ), "name") except KeyError: user = None if email is None: try: email = config.get(("user", ), "email") except KeyError: email = None if user is None: import getpass user = getpass.getuser().encode(sys.getdefaultencoding()) if email is None: import getpass import socket - email = ("%s@%s" % (getpass.getuser(), socket.gethostname()) - ).encode(sys.getdefaultencoding()) + email = ("{}@{}".format(getpass.getuser(), socket.gethostname()) + .encode(sys.getdefaultencoding())) return (user + b" <" + email + b">") def _add_graftpoints(self, updated_graftpoints): """Add or modify graftpoints :param updated_graftpoints: Dict of commit shas to list of parent shas """ # Simple validation for commit, parents in updated_graftpoints.items(): for sha in [commit] + parents: check_hexsha(sha, 'Invalid graftpoint') self._graftpoints.update(updated_graftpoints) def _remove_graftpoints(self, to_remove=[]): """Remove graftpoints :param to_remove: List of commit shas """ for sha in to_remove: del self._graftpoints[sha] def do_commit(self, message=None, committer=None, author=None, commit_timestamp=None, commit_timezone=None, author_timestamp=None, author_timezone=None, tree=None, encoding=None, ref=b'HEAD', merge_heads=None): """Create a new commit. :param message: Commit message :param committer: Committer fullname :param author: Author fullname (defaults to committer) :param commit_timestamp: Commit timestamp (defaults to now) :param commit_timezone: Commit timestamp timezone (defaults to GMT) :param author_timestamp: Author timestamp (defaults to commit timestamp) :param author_timezone: Author timestamp timezone (defaults to commit timestamp timezone) :param tree: SHA1 of the tree root to use (if not specified the current index will be committed). :param encoding: Encoding :param ref: Optional ref to commit to (defaults to current branch) :param merge_heads: Merge heads (defaults to .git/MERGE_HEADS) :return: New commit SHA1 """ import time c = Commit() if tree is None: index = self.open_index() c.tree = index.commit(self.object_store) else: if len(tree) != 40: raise ValueError("tree must be a 40-byte hex sha string") c.tree = tree try: self.hooks['pre-commit'].execute() except HookError as e: raise CommitError(e) except KeyError: # no hook defined, silent fallthrough pass if merge_heads is None: # FIXME: Read merge heads from .git/MERGE_HEADS merge_heads = [] if committer is None: committer = self._get_user_identity() c.committer = committer if commit_timestamp is None: # FIXME: Support GIT_COMMITTER_DATE environment variable commit_timestamp = time.time() c.commit_time = int(commit_timestamp) if commit_timezone is None: # FIXME: Use current user timezone rather than UTC commit_timezone = 0 c.commit_timezone = commit_timezone if author is None: # FIXME: Support GIT_AUTHOR_NAME/GIT_AUTHOR_EMAIL environment # variables author = committer c.author = author if author_timestamp is None: # FIXME: Support GIT_AUTHOR_DATE environment variable author_timestamp = commit_timestamp c.author_time = int(author_timestamp) if author_timezone is None: author_timezone = commit_timezone c.author_timezone = author_timezone if encoding is not None: c.encoding = encoding if message is None: # FIXME: Try to read commit message from .git/MERGE_MSG raise ValueError("No commit message specified") try: c.message = self.hooks['commit-msg'].execute(message) if c.message is None: c.message = message except HookError as e: raise CommitError(e) except KeyError: # no hook defined, message not modified c.message = message if ref is None: # Create a dangling commit c.parents = merge_heads self.object_store.add_object(c) else: try: old_head = self.refs[ref] c.parents = [old_head] + merge_heads self.object_store.add_object(c) ok = self.refs.set_if_equals( ref, old_head, c.id, message=b"commit: " + message, committer=committer, timestamp=commit_timestamp, timezone=commit_timezone) except KeyError: c.parents = merge_heads self.object_store.add_object(c) ok = self.refs.add_if_new( ref, c.id, message=b"commit: " + message, committer=committer, timestamp=commit_timestamp, timezone=commit_timezone) if not ok: # Fail if the atomic compare-and-swap failed, leaving the # commit and all its objects as garbage. raise CommitError("%s changed during commit" % (ref,)) try: self.hooks['post-commit'].execute() except HookError as e: # silent failure warnings.warn("post-commit hook failed: %s" % e, UserWarning) except KeyError: # no hook defined, silent fallthrough pass return c.id def read_gitfile(f): """Read a ``.git`` file. The first line of the file should start with "gitdir: " :param f: File-like object to read from :return: A path """ cs = f.read() if not cs.startswith("gitdir: "): raise ValueError("Expected file to start with 'gitdir: '") return cs[len("gitdir: "):].rstrip("\n") class Repo(BaseRepo): """A git repository backed by local disk. To open an existing repository, call the contructor with the path of the repository. To create a new repository, use the Repo.init class method. """ def __init__(self, root): hidden_path = os.path.join(root, CONTROLDIR) if os.path.isdir(os.path.join(hidden_path, OBJECTDIR)): self.bare = False self._controldir = hidden_path elif (os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir(os.path.join(root, REFSDIR))): self.bare = True self._controldir = root elif os.path.isfile(hidden_path): self.bare = False with open(hidden_path, 'r') as f: path = read_gitfile(f) self.bare = False self._controldir = os.path.join(root, path) else: raise NotGitRepository( "No git repository was found at %(path)s" % dict(path=root) ) commondir = self.get_named_file(COMMONDIR) if commondir is not None: with commondir: self._commondir = os.path.join( self.controldir(), commondir.read().rstrip(b"\r\n").decode( sys.getfilesystemencoding())) else: self._commondir = self._controldir self.path = root object_store = DiskObjectStore( os.path.join(self.commondir(), OBJECTDIR)) refs = DiskRefsContainer(self.commondir(), self._controldir, logger=self._write_reflog) BaseRepo.__init__(self, object_store, refs) self._graftpoints = {} graft_file = self.get_named_file(os.path.join("info", "grafts"), basedir=self.commondir()) if graft_file: with graft_file: self._graftpoints.update(parse_graftpoints(graft_file)) graft_file = self.get_named_file("shallow", basedir=self.commondir()) if graft_file: with graft_file: self._graftpoints.update(parse_graftpoints(graft_file)) self.hooks['pre-commit'] = PreCommitShellHook(self.controldir()) self.hooks['commit-msg'] = CommitMsgShellHook(self.controldir()) self.hooks['post-commit'] = PostCommitShellHook(self.controldir()) def _write_reflog(self, ref, old_sha, new_sha, committer, timestamp, timezone, message): from .reflog import format_reflog_line path = os.path.join( self.controldir(), 'logs', ref.decode(sys.getfilesystemencoding())) try: os.makedirs(os.path.dirname(path)) except OSError as e: if e.errno != errno.EEXIST: raise if committer is None: committer = self._get_user_identity() if timestamp is None: timestamp = int(time.time()) if timezone is None: timezone = 0 # FIXME with open(path, 'ab') as f: f.write(format_reflog_line(old_sha, new_sha, committer, timestamp, timezone, message) + b'\n') @classmethod def discover(cls, start='.'): """Iterate parent directories to discover a repository Return a Repo object for the first parent directory that looks like a Git repository. :param start: The directory to start discovery from (defaults to '.') """ remaining = True path = os.path.abspath(start) while remaining: try: return cls(path) except NotGitRepository: path, remaining = os.path.split(path) raise NotGitRepository( "No git repository was found at %(path)s" % dict(path=start) ) def controldir(self): """Return the path of the control directory.""" return self._controldir def commondir(self): """Return the path of the common directory. For a main working tree, it is identical to controldir(). For a linked working tree, it is the control directory of the main working tree.""" return self._commondir def _determine_file_mode(self): """Probe the file-system to determine whether permissions can be trusted. :return: True if permissions can be trusted, False otherwise. """ fname = os.path.join(self.path, '.probe-permissions') with open(fname, 'w') as f: f.write('') st1 = os.lstat(fname) os.chmod(fname, st1.st_mode ^ stat.S_IXUSR) st2 = os.lstat(fname) os.unlink(fname) mode_differs = st1.st_mode != st2.st_mode st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0 return mode_differs and st2_has_exec def _put_named_file(self, path, contents): """Write a file to the control dir with the given name and contents. :param path: The path to the file, relative to the control dir. :param contents: A string to write to the file. """ path = path.lstrip(os.path.sep) with GitFile(os.path.join(self.controldir(), path), 'wb') as f: f.write(contents) def get_named_file(self, path, basedir=None): """Get a file from the control dir with a specific name. Although the filename should be interpreted as a filename relative to the control dir in a disk-based Repo, the object returned need not be pointing to a file in that location. :param path: The path to the file, relative to the control dir. :param basedir: Optional argument that specifies an alternative to the control dir. :return: An open file object, or None if the file does not exist. """ # TODO(dborowitz): sanitize filenames, since this is used directly by # the dumb web serving code. if basedir is None: basedir = self.controldir() path = path.lstrip(os.path.sep) try: return open(os.path.join(basedir, path), 'rb') except (IOError, OSError) as e: if e.errno == errno.ENOENT: return None raise def index_path(self): """Return path to the index file.""" return os.path.join(self.controldir(), INDEX_FILENAME) def open_index(self): """Open the index for this repository. :raise NoIndexPresent: If no index is present :return: The matching `Index` """ from dulwich.index import Index if not self.has_index(): raise NoIndexPresent() return Index(self.index_path()) def has_index(self): """Check if an index is present.""" # Bare repos must never have index files; non-bare repos may have a # missing index file, which is treated as empty. return not self.bare def stage(self, fs_paths): """Stage a set of paths. :param fs_paths: List of paths, relative to the repository path """ root_path_bytes = self.path.encode(sys.getfilesystemencoding()) if not isinstance(fs_paths, list): fs_paths = [fs_paths] from dulwich.index import ( blob_from_path_and_stat, index_entry_from_stat, _fs_to_tree_path, ) index = self.open_index() for fs_path in fs_paths: if not isinstance(fs_path, bytes): fs_path = fs_path.encode(sys.getfilesystemencoding()) if os.path.isabs(fs_path): raise ValueError( "path %r should be relative to " "repository root, not absolute" % fs_path) tree_path = _fs_to_tree_path(fs_path) full_path = os.path.join(root_path_bytes, fs_path) try: st = os.lstat(full_path) except OSError: # File no longer exists try: del index[tree_path] except KeyError: pass # already removed else: if not stat.S_ISDIR(st.st_mode): blob = blob_from_path_and_stat(full_path, st) self.object_store.add_object(blob) index[tree_path] = index_entry_from_stat(st, blob.id, 0) else: try: del index[tree_path] except KeyError: pass index.write() def clone(self, target_path, mkdir=True, bare=False, origin=b"origin"): """Clone this repository. :param target_path: Target path :param mkdir: Create the target directory :param bare: Whether to create a bare repository :param origin: Base name for refs in target repository cloned from this repository :return: Created repository as `Repo` """ if not bare: target = self.init(target_path, mkdir=mkdir) else: target = self.init_bare(target_path, mkdir=mkdir) self.fetch(target) encoded_path = self.path if not isinstance(encoded_path, bytes): encoded_path = encoded_path.encode(sys.getfilesystemencoding()) ref_message = b"clone: from " + encoded_path target.refs.import_refs( b'refs/remotes/' + origin, self.refs.as_dict(b'refs/heads'), message=ref_message) target.refs.import_refs( b'refs/tags', self.refs.as_dict(b'refs/tags'), message=ref_message) try: target.refs.add_if_new( DEFAULT_REF, self.refs[DEFAULT_REF], message=ref_message) except KeyError: pass target_config = target.get_config() target_config.set(('remote', 'origin'), 'url', encoded_path) target_config.set(('remote', 'origin'), 'fetch', '+refs/heads/*:refs/remotes/origin/*') target_config.write_to_path() # Update target head head_chain, head_sha = self.refs.follow(b'HEAD') if head_chain and head_sha is not None: target.refs.set_symbolic_ref(b'HEAD', head_chain[-1], message=ref_message) target[b'HEAD'] = head_sha if not bare: # Checkout HEAD to target dir target.reset_index() return target def reset_index(self, tree=None): """Reset the index back to a specific tree. :param tree: Tree SHA to reset to, None for current HEAD tree. """ from dulwich.index import ( build_index_from_tree, validate_path_element_default, validate_path_element_ntfs, ) if tree is None: tree = self[b'HEAD'].tree config = self.get_config() honor_filemode = config.get_boolean( b'core', b'filemode', os.name != "nt") if config.get_boolean(b'core', b'core.protectNTFS', os.name == "nt"): validate_path_element = validate_path_element_ntfs else: validate_path_element = validate_path_element_default return build_index_from_tree( self.path, self.index_path(), self.object_store, tree, honor_filemode=honor_filemode, validate_path_element=validate_path_element) def get_config(self): """Retrieve the config object. :return: `ConfigFile` object for the ``.git/config`` file. """ from dulwich.config import ConfigFile path = os.path.join(self._controldir, 'config') try: return ConfigFile.from_path(path) except (IOError, OSError) as e: if e.errno != errno.ENOENT: raise ret = ConfigFile() ret.path = path return ret def get_description(self): """Retrieve the description of this repository. :return: A string describing the repository or None. """ path = os.path.join(self._controldir, 'description') try: with GitFile(path, 'rb') as f: return f.read() except (IOError, OSError) as e: if e.errno != errno.ENOENT: raise return None def __repr__(self): return "" % self.path def set_description(self, description): """Set the description for this repository. :param description: Text to set as description for this repository. """ self._put_named_file('description', description) @classmethod def _init_maybe_bare(cls, path, bare): for d in BASE_DIRECTORIES: os.mkdir(os.path.join(path, *d)) DiskObjectStore.init(os.path.join(path, OBJECTDIR)) ret = cls(path) ret.refs.set_symbolic_ref(b'HEAD', DEFAULT_REF) ret._init_files(bare) return ret @classmethod def init(cls, path, mkdir=False): """Create a new repository. :param path: Path in which to create the repository :param mkdir: Whether to create the directory :return: `Repo` instance """ if mkdir: os.mkdir(path) controldir = os.path.join(path, CONTROLDIR) os.mkdir(controldir) cls._init_maybe_bare(controldir, False) return cls(path) @classmethod def _init_new_working_directory(cls, path, main_repo, identifier=None, mkdir=False): """Create a new working directory linked to a repository. :param path: Path in which to create the working tree. :param main_repo: Main repository to reference :param identifier: Worktree identifier :param mkdir: Whether to create the directory :return: `Repo` instance """ if mkdir: os.mkdir(path) if identifier is None: identifier = os.path.basename(path) main_worktreesdir = os.path.join(main_repo.controldir(), WORKTREES) worktree_controldir = os.path.join(main_worktreesdir, identifier) gitdirfile = os.path.join(path, CONTROLDIR) with open(gitdirfile, 'wb') as f: f.write(b'gitdir: ' + worktree_controldir.encode(sys.getfilesystemencoding()) + b'\n') try: os.mkdir(main_worktreesdir) except OSError as e: if e.errno != errno.EEXIST: raise try: os.mkdir(worktree_controldir) except OSError as e: if e.errno != errno.EEXIST: raise with open(os.path.join(worktree_controldir, GITDIR), 'wb') as f: f.write(gitdirfile.encode(sys.getfilesystemencoding()) + b'\n') with open(os.path.join(worktree_controldir, COMMONDIR), 'wb') as f: f.write(b'../..\n') with open(os.path.join(worktree_controldir, 'HEAD'), 'wb') as f: f.write(main_repo.head() + b'\n') r = cls(path) r.reset_index() return r @classmethod def init_bare(cls, path, mkdir=False): """Create a new bare repository. ``path`` should already exist and be an empty directory. :param path: Path to create bare repository in :return: a `Repo` instance """ if mkdir: os.mkdir(path) return cls._init_maybe_bare(path, True) create = init_bare def close(self): """Close any files opened by this repository.""" self.object_store.close() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() class MemoryRepo(BaseRepo): """Repo that stores refs, objects, and named files in memory. MemoryRepos are always bare: they have no working tree and no index, since those have a stronger dependency on the filesystem. """ def __init__(self): from dulwich.config import ConfigFile self._reflog = [] refs_container = DictRefsContainer({}, logger=self._append_reflog) BaseRepo.__init__(self, MemoryObjectStore(), refs_container) self._named_files = {} self.bare = True self._config = ConfigFile() self._description = None def _append_reflog(self, *args): self._reflog.append(args) def set_description(self, description): self._description = description def get_description(self): return self._description def _determine_file_mode(self): """Probe the file-system to determine whether permissions can be trusted. :return: True if permissions can be trusted, False otherwise. """ return sys.platform != 'win32' def _put_named_file(self, path, contents): """Write a file to the control dir with the given name and contents. :param path: The path to the file, relative to the control dir. :param contents: A string to write to the file. """ self._named_files[path] = contents def get_named_file(self, path): """Get a file from the control dir with a specific name. Although the filename should be interpreted as a filename relative to the control dir in a disk-baked Repo, the object returned need not be pointing to a file in that location. :param path: The path to the file, relative to the control dir. :return: An open file object, or None if the file does not exist. """ contents = self._named_files.get(path, None) if contents is None: return None return BytesIO(contents) def open_index(self): """Fail to open index for this repo, since it is bare. :raise NoIndexPresent: Raised when no index is present """ raise NoIndexPresent() def get_config(self): """Retrieve the config object. :return: `ConfigFile` object. """ return self._config @classmethod def init_bare(cls, objects, refs): """Create a new bare repository in memory. :param objects: Objects for the new repository, as iterable :param refs: Refs as dictionary, mapping names to object SHA1s """ ret = cls() for obj in objects: ret.object_store.add_object(obj) for refname, sha in refs.items(): ret.refs.add_if_new(refname, sha) ret._init_files(bare=True) return ret diff --git a/dulwich/tests/test_pack.py b/dulwich/tests/test_pack.py index a68b843d..750eae71 100644 --- a/dulwich/tests/test_pack.py +++ b/dulwich/tests/test_pack.py @@ -1,1126 +1,1128 @@ # test_pack.py -- Tests for the handling of git packs. # Copyright (C) 2007 James Westby # Copyright (C) 2008 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Tests for Dulwich packs.""" from io import BytesIO from hashlib import sha1 import os import shutil import tempfile import zlib from dulwich.errors import ( ApplyDeltaError, ChecksumMismatch, ) from dulwich.file import ( GitFile, ) from dulwich.object_store import ( MemoryObjectStore, ) from dulwich.objects import ( hex_to_sha, sha_to_hex, Commit, Tree, Blob, ) from dulwich.pack import ( OFS_DELTA, REF_DELTA, MemoryPackIndex, Pack, PackData, apply_delta, create_delta, deltify_pack_objects, load_pack_index, UnpackedObject, read_zlib_chunks, write_pack_header, write_pack_index_v1, write_pack_index_v2, write_pack_object, write_pack, unpack_object, compute_file_sha, PackStreamReader, DeltaChainIterator, _delta_encode_size, _encode_copy_operation, ) from dulwich.tests import ( TestCase, ) from dulwich.tests.utils import ( make_object, build_pack, ) pack1_sha = b'bc63ddad95e7321ee734ea11a7a62d314e0d7481' a_sha = b'6f670c0fb53f9463760b7295fbb814e965fb20c8' tree_sha = b'b2a2766a2879c209ab1176e7e778b81ae422eeaa' commit_sha = b'f18faa16531ac570a3fdc8c7ca16682548dafd12' class PackTests(TestCase): """Base class for testing packs""" def setUp(self): super(PackTests, self).setUp() self.tempdir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, self.tempdir) datadir = os.path.abspath( os.path.join(os.path.dirname(__file__), 'data/packs')) def get_pack_index(self, sha): """Returns a PackIndex from the datadir with the given sha""" return load_pack_index( os.path.join(self.datadir, 'pack-%s.idx' % sha.decode('ascii'))) def get_pack_data(self, sha): """Returns a PackData object from the datadir with the given sha""" return PackData( os.path.join( self.datadir, 'pack-%s.pack' % sha.decode('ascii'))) def get_pack(self, sha): return Pack( os.path.join(self.datadir, 'pack-%s' % sha.decode('ascii'))) def assertSucceeds(self, func, *args, **kwargs): try: func(*args, **kwargs) except ChecksumMismatch as e: self.fail(e) class PackIndexTests(PackTests): """Class that tests the index of packfiles""" def test_object_index(self): """Tests that the correct object offset is returned from the index.""" p = self.get_pack_index(pack1_sha) self.assertRaises(KeyError, p.object_index, pack1_sha) self.assertEqual(p.object_index(a_sha), 178) self.assertEqual(p.object_index(tree_sha), 138) self.assertEqual(p.object_index(commit_sha), 12) def test_object_sha1(self): """Tests that the correct object offset is returned from the index.""" p = self.get_pack_index(pack1_sha) self.assertRaises(KeyError, p.object_sha1, 876) self.assertEqual(p.object_sha1(178), hex_to_sha(a_sha)) self.assertEqual(p.object_sha1(138), hex_to_sha(tree_sha)) self.assertEqual(p.object_sha1(12), hex_to_sha(commit_sha)) def test_index_len(self): p = self.get_pack_index(pack1_sha) self.assertEqual(3, len(p)) def test_get_stored_checksum(self): p = self.get_pack_index(pack1_sha) self.assertEqual(b'f2848e2ad16f329ae1c92e3b95e91888daa5bd01', sha_to_hex(p.get_stored_checksum())) self.assertEqual(b'721980e866af9a5f93ad674144e1459b8ba3e7b7', sha_to_hex(p.get_pack_checksum())) def test_index_check(self): p = self.get_pack_index(pack1_sha) self.assertSucceeds(p.check) def test_iterentries(self): p = self.get_pack_index(pack1_sha) entries = [(sha_to_hex(s), o, c) for s, o, c in p.iterentries()] self.assertEqual([ (b'6f670c0fb53f9463760b7295fbb814e965fb20c8', 178, None), (b'b2a2766a2879c209ab1176e7e778b81ae422eeaa', 138, None), (b'f18faa16531ac570a3fdc8c7ca16682548dafd12', 12, None) ], entries) def test_iter(self): p = self.get_pack_index(pack1_sha) self.assertEqual(set([tree_sha, commit_sha, a_sha]), set(p)) class TestPackDeltas(TestCase): test_string1 = b'The answer was flailing in the wind' test_string2 = b'The answer was falling down the pipe' test_string3 = b'zzzzz' test_string_empty = b'' test_string_big = b'Z' * 8192 test_string_huge = b'Z' * 100000 def _test_roundtrip(self, base, target): self.assertEqual( target, b''.join(apply_delta(base, create_delta(base, target)))) def test_nochange(self): self._test_roundtrip(self.test_string1, self.test_string1) def test_nochange_huge(self): self._test_roundtrip(self.test_string_huge, self.test_string_huge) def test_change(self): self._test_roundtrip(self.test_string1, self.test_string2) def test_rewrite(self): self._test_roundtrip(self.test_string1, self.test_string3) def test_empty_to_big(self): self._test_roundtrip(self.test_string_empty, self.test_string_big) def test_empty_to_huge(self): self._test_roundtrip(self.test_string_empty, self.test_string_huge) def test_huge_copy(self): self._test_roundtrip(self.test_string_huge + self.test_string1, self.test_string_huge + self.test_string2) def test_dest_overflow(self): self.assertRaises(ApplyDeltaError, apply_delta, b'a'*0x10000, b'\x80\x80\x04\x80\x80\x04\x80' + b'a'*0x10000) self.assertRaises( ApplyDeltaError, apply_delta, b'', b'\x00\x80\x02\xb0\x11\x11') def test_pypy_issue(self): # Test for https://github.com/jelmer/dulwich/issues/509 / # https://bitbucket.org/pypy/pypy/issues/2499/cpyext-pystring_asstring-doesnt-work chunks = [ b'tree 03207ccf58880a748188836155ceed72f03d65d6\n' b'parent 408fbab530fd4abe49249a636a10f10f44d07a21\n' b'author Victor Stinner ' b'1421355207 +0100\n' b'committer Victor Stinner ' b'1421355207 +0100\n' b'\n' b'Backout changeset 3a06020af8cf\n' b'\nStreamWriter: close() now clears the reference to the ' b'transport\n' b'\nStreamWriter now raises an exception if it is closed: ' b'write(), writelines(),\n' b'write_eof(), can_write_eof(), get_extra_info(), drain().\n'] delta = [ b'\xcd\x03\xad\x03]tree ff3c181a393d5a7270cddc01ea863818a8621ca8\n' b'parent 20a103cc90135494162e819f98d0edfc1f1fba6b\x91]7\x0510738' b'\x91\x99@\x0b10738 +0100\x93\x04\x01\xc9'] res = apply_delta(chunks, delta) expected = [ b'tree ff3c181a393d5a7270cddc01ea863818a8621ca8\n' b'parent 20a103cc90135494162e819f98d0edfc1f1fba6b', b'\nauthor Victor Stinner 14213', b'10738', b' +0100\ncommitter Victor Stinner ' b'14213', b'10738 +0100', b'\n\nStreamWriter: close() now clears the reference to the ' b'transport\n\n' b'StreamWriter now raises an exception if it is closed: ' b'write(), writelines(),\n' b'write_eof(), can_write_eof(), get_extra_info(), drain().\n'] self.assertEqual(b''.join(expected), b''.join(res)) class TestPackData(PackTests): """Tests getting the data from the packfile.""" def test_create_pack(self): self.get_pack_data(pack1_sha).close() def test_from_file(self): path = os.path.join(self.datadir, 'pack-%s.pack' % pack1_sha.decode('ascii')) with open(path, 'rb') as f: PackData.from_file(f, os.path.getsize(path)) def test_pack_len(self): with self.get_pack_data(pack1_sha) as p: self.assertEqual(3, len(p)) def test_index_check(self): with self.get_pack_data(pack1_sha) as p: self.assertSucceeds(p.check) def test_iterobjects(self): with self.get_pack_data(pack1_sha) as p: commit_data = ( b'tree b2a2766a2879c209ab1176e7e778b81ae422eeaa\n' b'author James Westby ' b'1174945067 +0100\n' b'committer James Westby ' b'1174945067 +0100\n' b'\n' b'Test commit\n') blob_sha = b'6f670c0fb53f9463760b7295fbb814e965fb20c8' tree_data = b'100644 a\0' + hex_to_sha(blob_sha) actual = [] for offset, type_num, chunks, crc32 in p.iterobjects(): actual.append((offset, type_num, b''.join(chunks), crc32)) self.assertEqual([ (12, 1, commit_data, 3775879613), (138, 2, tree_data, 912998690), (178, 3, b'test 1\n', 1373561701) ], actual) def test_iterentries(self): with self.get_pack_data(pack1_sha) as p: entries = set((sha_to_hex(s), o, c) for s, o, c in p.iterentries()) self.assertEqual(set([ (b'6f670c0fb53f9463760b7295fbb814e965fb20c8', 178, 1373561701), (b'b2a2766a2879c209ab1176e7e778b81ae422eeaa', 138, 912998690), (b'f18faa16531ac570a3fdc8c7ca16682548dafd12', 12, 3775879613), ]), entries) def test_create_index_v1(self): with self.get_pack_data(pack1_sha) as p: filename = os.path.join(self.tempdir, 'v1test.idx') p.create_index_v1(filename) idx1 = load_pack_index(filename) idx2 = self.get_pack_index(pack1_sha) self.assertEqual(idx1, idx2) def test_create_index_v2(self): with self.get_pack_data(pack1_sha) as p: filename = os.path.join(self.tempdir, 'v2test.idx') p.create_index_v2(filename) idx1 = load_pack_index(filename) idx2 = self.get_pack_index(pack1_sha) self.assertEqual(idx1, idx2) def test_compute_file_sha(self): f = BytesIO(b'abcd1234wxyz') self.assertEqual(sha1(b'abcd1234wxyz').hexdigest(), compute_file_sha(f).hexdigest()) self.assertEqual(sha1(b'abcd1234wxyz').hexdigest(), compute_file_sha(f, buffer_size=5).hexdigest()) self.assertEqual(sha1(b'abcd1234').hexdigest(), compute_file_sha(f, end_ofs=-4).hexdigest()) self.assertEqual(sha1(b'1234wxyz').hexdigest(), compute_file_sha(f, start_ofs=4).hexdigest()) self.assertEqual( sha1(b'1234').hexdigest(), compute_file_sha(f, start_ofs=4, end_ofs=-4).hexdigest()) def test_compute_file_sha_short_file(self): f = BytesIO(b'abcd1234wxyz') self.assertRaises(AssertionError, compute_file_sha, f, end_ofs=-20) self.assertRaises(AssertionError, compute_file_sha, f, end_ofs=20) self.assertRaises(AssertionError, compute_file_sha, f, start_ofs=10, end_ofs=-12) class TestPack(PackTests): def test_len(self): with self.get_pack(pack1_sha) as p: self.assertEqual(3, len(p)) def test_contains(self): with self.get_pack(pack1_sha) as p: self.assertTrue(tree_sha in p) def test_get(self): with self.get_pack(pack1_sha) as p: self.assertEqual(type(p[tree_sha]), Tree) def test_iter(self): with self.get_pack(pack1_sha) as p: self.assertEqual(set([tree_sha, commit_sha, a_sha]), set(p)) def test_iterobjects(self): with self.get_pack(pack1_sha) as p: expected = set([p[s] for s in [commit_sha, tree_sha, a_sha]]) self.assertEqual(expected, set(list(p.iterobjects()))) def test_pack_tuples(self): with self.get_pack(pack1_sha) as p: tuples = p.pack_tuples() expected = set( [(p[s], None) for s in [commit_sha, tree_sha, a_sha]]) self.assertEqual(expected, set(list(tuples))) self.assertEqual(expected, set(list(tuples))) self.assertEqual(3, len(tuples)) def test_get_object_at(self): """Tests random access for non-delta objects""" with self.get_pack(pack1_sha) as p: obj = p[a_sha] self.assertEqual(obj.type_name, b'blob') self.assertEqual(obj.sha().hexdigest().encode('ascii'), a_sha) obj = p[tree_sha] self.assertEqual(obj.type_name, b'tree') self.assertEqual(obj.sha().hexdigest().encode('ascii'), tree_sha) obj = p[commit_sha] self.assertEqual(obj.type_name, b'commit') self.assertEqual(obj.sha().hexdigest().encode('ascii'), commit_sha) def test_copy(self): with self.get_pack(pack1_sha) as origpack: self.assertSucceeds(origpack.index.check) basename = os.path.join(self.tempdir, 'Elch') write_pack(basename, origpack.pack_tuples()) with Pack(basename) as newpack: self.assertEqual(origpack, newpack) self.assertSucceeds(newpack.index.check) self.assertEqual(origpack.name(), newpack.name()) self.assertEqual(origpack.index.get_pack_checksum(), newpack.index.get_pack_checksum()) wrong_version = origpack.index.version != newpack.index.version orig_checksum = origpack.index.get_stored_checksum() new_checksum = newpack.index.get_stored_checksum() self.assertTrue(wrong_version or orig_checksum == new_checksum) def test_commit_obj(self): with self.get_pack(pack1_sha) as p: commit = p[commit_sha] self.assertEqual(b'James Westby ', commit.author) self.assertEqual([], commit.parents) def _copy_pack(self, origpack): basename = os.path.join(self.tempdir, 'somepack') write_pack(basename, origpack.pack_tuples()) return Pack(basename) def test_keep_no_message(self): with self.get_pack(pack1_sha) as p: p = self._copy_pack(p) with p: keepfile_name = p.keep() # file should exist self.assertTrue(os.path.exists(keepfile_name)) with open(keepfile_name, 'r') as f: buf = f.read() self.assertEqual('', buf) def test_keep_message(self): with self.get_pack(pack1_sha) as p: p = self._copy_pack(p) msg = b'some message' with p: keepfile_name = p.keep(msg) # file should exist self.assertTrue(os.path.exists(keepfile_name)) # and contain the right message, with a linefeed with open(keepfile_name, 'rb') as f: buf = f.read() self.assertEqual(msg + b'\n', buf) def test_name(self): with self.get_pack(pack1_sha) as p: self.assertEqual(pack1_sha, p.name()) def test_length_mismatch(self): with self.get_pack_data(pack1_sha) as data: index = self.get_pack_index(pack1_sha) Pack.from_objects(data, index).check_length_and_checksum() data._file.seek(12) bad_file = BytesIO() write_pack_header(bad_file, 9999) bad_file.write(data._file.read()) bad_file = BytesIO(bad_file.getvalue()) bad_data = PackData('', file=bad_file) bad_pack = Pack.from_lazy_objects(lambda: bad_data, lambda: index) self.assertRaises(AssertionError, lambda: bad_pack.data) self.assertRaises(AssertionError, lambda: bad_pack.check_length_and_checksum()) def test_checksum_mismatch(self): with self.get_pack_data(pack1_sha) as data: index = self.get_pack_index(pack1_sha) Pack.from_objects(data, index).check_length_and_checksum() data._file.seek(0) bad_file = BytesIO(data._file.read()[:-20] + (b'\xff' * 20)) bad_data = PackData('', file=bad_file) bad_pack = Pack.from_lazy_objects(lambda: bad_data, lambda: index) self.assertRaises(ChecksumMismatch, lambda: bad_pack.data) self.assertRaises(ChecksumMismatch, lambda: bad_pack.check_length_and_checksum()) def test_iterobjects_2(self): with self.get_pack(pack1_sha) as p: objs = dict((o.id, o) for o in p.iterobjects()) self.assertEqual(3, len(objs)) self.assertEqual(sorted(objs), sorted(p.index)) self.assertTrue(isinstance(objs[a_sha], Blob)) self.assertTrue(isinstance(objs[tree_sha], Tree)) self.assertTrue(isinstance(objs[commit_sha], Commit)) class TestThinPack(PackTests): def setUp(self): super(TestThinPack, self).setUp() self.store = MemoryObjectStore() self.blobs = {} for blob in (b'foo', b'bar', b'foo1234', b'bar2468'): self.blobs[blob] = make_object(Blob, data=blob) self.store.add_object(self.blobs[b'foo']) self.store.add_object(self.blobs[b'bar']) # Build a thin pack. 'foo' is as an external reference, 'bar' an # internal reference. self.pack_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, self.pack_dir) self.pack_prefix = os.path.join(self.pack_dir, 'pack') with open(self.pack_prefix + '.pack', 'wb') as f: build_pack(f, [ (REF_DELTA, (self.blobs[b'foo'].id, b'foo1234')), (Blob.type_num, b'bar'), (REF_DELTA, (self.blobs[b'bar'].id, b'bar2468'))], store=self.store) # Index the new pack. with self.make_pack(True) as pack: with PackData(pack._data_path) as data: data.pack = pack data.create_index(self.pack_prefix + '.idx') del self.store[self.blobs[b'bar'].id] def make_pack(self, resolve_ext_ref): return Pack( self.pack_prefix, resolve_ext_ref=self.store.get_raw if resolve_ext_ref else None) def test_get_raw(self): with self.make_pack(False) as p: self.assertRaises( KeyError, p.get_raw, self.blobs[b'foo1234'].id) with self.make_pack(True) as p: self.assertEqual( (3, b'foo1234'), p.get_raw(self.blobs[b'foo1234'].id)) def test_get_raw_unresolved(self): with self.make_pack(False) as p: self.assertEqual( - (7, b'\x19\x10(\x15f=#\xf8\xb7ZG\xe7\xa0\x19e\xdc\xdc\x96F\x8c', - [b'x\x9ccf\x9f\xc0\xccbhdl\x02\x00\x06f\x01l']), + (7, + b'\x19\x10(\x15f=#\xf8\xb7ZG\xe7\xa0\x19e\xdc\xdc\x96F\x8c', + [b'x\x9ccf\x9f\xc0\xccbhdl\x02\x00\x06f\x01l']), p.get_raw_unresolved(self.blobs[b'foo1234'].id)) with self.make_pack(True) as p: self.assertEqual( - (7, b'\x19\x10(\x15f=#\xf8\xb7ZG\xe7\xa0\x19e\xdc\xdc\x96F\x8c', - [b'x\x9ccf\x9f\xc0\xccbhdl\x02\x00\x06f\x01l']), + (7, + b'\x19\x10(\x15f=#\xf8\xb7ZG\xe7\xa0\x19e\xdc\xdc\x96F\x8c', + [b'x\x9ccf\x9f\xc0\xccbhdl\x02\x00\x06f\x01l']), p.get_raw_unresolved(self.blobs[b'foo1234'].id)) def test_iterobjects(self): with self.make_pack(False) as p: self.assertRaises(KeyError, list, p.iterobjects()) with self.make_pack(True) as p: self.assertEqual( sorted([self.blobs[b'foo1234'].id, self.blobs[b'bar'].id, self.blobs[b'bar2468'].id]), sorted(o.id for o in p.iterobjects())) class WritePackTests(TestCase): def test_write_pack_header(self): f = BytesIO() write_pack_header(f, 42) self.assertEqual(b'PACK\x00\x00\x00\x02\x00\x00\x00*', f.getvalue()) def test_write_pack_object(self): f = BytesIO() f.write(b'header') offset = f.tell() crc32 = write_pack_object(f, Blob.type_num, b'blob') self.assertEqual(crc32, zlib.crc32(f.getvalue()[6:]) & 0xffffffff) f.write(b'x') # unpack_object needs extra trailing data. f.seek(offset) unpacked, unused = unpack_object(f.read, compute_crc32=True) self.assertEqual(Blob.type_num, unpacked.pack_type_num) self.assertEqual(Blob.type_num, unpacked.obj_type_num) self.assertEqual([b'blob'], unpacked.decomp_chunks) self.assertEqual(crc32, unpacked.crc32) self.assertEqual(b'x', unused) def test_write_pack_object_sha(self): f = BytesIO() f.write(b'header') offset = f.tell() sha_a = sha1(b'foo') sha_b = sha_a.copy() write_pack_object(f, Blob.type_num, b'blob', sha=sha_a) self.assertNotEqual(sha_a.digest(), sha_b.digest()) sha_b.update(f.getvalue()[offset:]) self.assertEqual(sha_a.digest(), sha_b.digest()) pack_checksum = hex_to_sha('721980e866af9a5f93ad674144e1459b8ba3e7b7') class BaseTestPackIndexWriting(object): def assertSucceeds(self, func, *args, **kwargs): try: func(*args, **kwargs) except ChecksumMismatch as e: self.fail(e) def index(self, filename, entries, pack_checksum): raise NotImplementedError(self.index) def test_empty(self): idx = self.index('empty.idx', [], pack_checksum) self.assertEqual(idx.get_pack_checksum(), pack_checksum) self.assertEqual(0, len(idx)) def test_large(self): entry1_sha = hex_to_sha('4e6388232ec39792661e2e75db8fb117fc869ce6') entry2_sha = hex_to_sha('e98f071751bd77f59967bfa671cd2caebdccc9a2') entries = [(entry1_sha, 0xf2972d0830529b87, 24), (entry2_sha, (~0xf2972d0830529b87) & (2 ** 64 - 1), 92)] if not self._supports_large: self.assertRaises(TypeError, self.index, 'single.idx', entries, pack_checksum) return idx = self.index('single.idx', entries, pack_checksum) self.assertEqual(idx.get_pack_checksum(), pack_checksum) self.assertEqual(2, len(idx)) actual_entries = list(idx.iterentries()) self.assertEqual(len(entries), len(actual_entries)) for mine, actual in zip(entries, actual_entries): my_sha, my_offset, my_crc = mine actual_sha, actual_offset, actual_crc = actual self.assertEqual(my_sha, actual_sha) self.assertEqual(my_offset, actual_offset) if self._has_crc32_checksum: self.assertEqual(my_crc, actual_crc) else: self.assertTrue(actual_crc is None) def test_single(self): entry_sha = hex_to_sha('6f670c0fb53f9463760b7295fbb814e965fb20c8') my_entries = [(entry_sha, 178, 42)] idx = self.index('single.idx', my_entries, pack_checksum) self.assertEqual(idx.get_pack_checksum(), pack_checksum) self.assertEqual(1, len(idx)) actual_entries = list(idx.iterentries()) self.assertEqual(len(my_entries), len(actual_entries)) for mine, actual in zip(my_entries, actual_entries): my_sha, my_offset, my_crc = mine actual_sha, actual_offset, actual_crc = actual self.assertEqual(my_sha, actual_sha) self.assertEqual(my_offset, actual_offset) if self._has_crc32_checksum: self.assertEqual(my_crc, actual_crc) else: self.assertTrue(actual_crc is None) class BaseTestFilePackIndexWriting(BaseTestPackIndexWriting): def setUp(self): self.tempdir = tempfile.mkdtemp() def tearDown(self): shutil.rmtree(self.tempdir) def index(self, filename, entries, pack_checksum): path = os.path.join(self.tempdir, filename) self.writeIndex(path, entries, pack_checksum) idx = load_pack_index(path) self.assertSucceeds(idx.check) self.assertEqual(idx.version, self._expected_version) return idx def writeIndex(self, filename, entries, pack_checksum): # FIXME: Write to BytesIO instead rather than hitting disk ? with GitFile(filename, "wb") as f: self._write_fn(f, entries, pack_checksum) class TestMemoryIndexWriting(TestCase, BaseTestPackIndexWriting): def setUp(self): TestCase.setUp(self) self._has_crc32_checksum = True self._supports_large = True def index(self, filename, entries, pack_checksum): return MemoryPackIndex(entries, pack_checksum) def tearDown(self): TestCase.tearDown(self) class TestPackIndexWritingv1(TestCase, BaseTestFilePackIndexWriting): def setUp(self): TestCase.setUp(self) BaseTestFilePackIndexWriting.setUp(self) self._has_crc32_checksum = False self._expected_version = 1 self._supports_large = False self._write_fn = write_pack_index_v1 def tearDown(self): TestCase.tearDown(self) BaseTestFilePackIndexWriting.tearDown(self) class TestPackIndexWritingv2(TestCase, BaseTestFilePackIndexWriting): def setUp(self): TestCase.setUp(self) BaseTestFilePackIndexWriting.setUp(self) self._has_crc32_checksum = True self._supports_large = True self._expected_version = 2 self._write_fn = write_pack_index_v2 def tearDown(self): TestCase.tearDown(self) BaseTestFilePackIndexWriting.tearDown(self) class ReadZlibTests(TestCase): decomp = ( b'tree 4ada885c9196b6b6fa08744b5862bf92896fc002\n' b'parent None\n' b'author Jelmer Vernooij 1228980214 +0000\n' b'committer Jelmer Vernooij 1228980214 +0000\n' b'\n' b"Provide replacement for mmap()'s offset argument.") comp = zlib.compress(decomp) extra = b'nextobject' def setUp(self): super(ReadZlibTests, self).setUp() self.read = BytesIO(self.comp + self.extra).read self.unpacked = UnpackedObject( Tree.type_num, None, len(self.decomp), 0) def test_decompress_size(self): good_decomp_len = len(self.decomp) self.unpacked.decomp_len = -1 self.assertRaises(ValueError, read_zlib_chunks, self.read, self.unpacked) self.unpacked.decomp_len = good_decomp_len - 1 self.assertRaises(zlib.error, read_zlib_chunks, self.read, self.unpacked) self.unpacked.decomp_len = good_decomp_len + 1 self.assertRaises(zlib.error, read_zlib_chunks, self.read, self.unpacked) def test_decompress_truncated(self): read = BytesIO(self.comp[:10]).read self.assertRaises(zlib.error, read_zlib_chunks, read, self.unpacked) read = BytesIO(self.comp).read self.assertRaises(zlib.error, read_zlib_chunks, read, self.unpacked) def test_decompress_empty(self): unpacked = UnpackedObject(Tree.type_num, None, 0, None) comp = zlib.compress(b'') read = BytesIO(comp + self.extra).read unused = read_zlib_chunks(read, unpacked) self.assertEqual(b'', b''.join(unpacked.decomp_chunks)) self.assertNotEqual(b'', unused) self.assertEqual(self.extra, unused + read()) def test_decompress_no_crc32(self): self.unpacked.crc32 = None read_zlib_chunks(self.read, self.unpacked) self.assertEqual(None, self.unpacked.crc32) def _do_decompress_test(self, buffer_size, **kwargs): unused = read_zlib_chunks(self.read, self.unpacked, buffer_size=buffer_size, **kwargs) self.assertEqual(self.decomp, b''.join(self.unpacked.decomp_chunks)) self.assertEqual(zlib.crc32(self.comp), self.unpacked.crc32) self.assertNotEqual(b'', unused) self.assertEqual(self.extra, unused + self.read()) def test_simple_decompress(self): self._do_decompress_test(4096) self.assertEqual(None, self.unpacked.comp_chunks) # These buffer sizes are not intended to be realistic, but rather simulate # larger buffer sizes that may end at various places. def test_decompress_buffer_size_1(self): self._do_decompress_test(1) def test_decompress_buffer_size_2(self): self._do_decompress_test(2) def test_decompress_buffer_size_3(self): self._do_decompress_test(3) def test_decompress_buffer_size_4(self): self._do_decompress_test(4) def test_decompress_include_comp(self): self._do_decompress_test(4096, include_comp=True) self.assertEqual(self.comp, b''.join(self.unpacked.comp_chunks)) class DeltifyTests(TestCase): def test_empty(self): self.assertEqual([], list(deltify_pack_objects([]))) def test_single(self): b = Blob.from_string(b"foo") self.assertEqual( [(b.type_num, b.sha().digest(), None, b.as_raw_string())], list(deltify_pack_objects([(b, b"")]))) def test_simple_delta(self): b1 = Blob.from_string(b"a" * 101) b2 = Blob.from_string(b"a" * 100) delta = create_delta(b1.as_raw_string(), b2.as_raw_string()) self.assertEqual([ (b1.type_num, b1.sha().digest(), None, b1.as_raw_string()), (b2.type_num, b2.sha().digest(), b1.sha().digest(), delta) ], list(deltify_pack_objects([(b1, b""), (b2, b"")]))) class TestPackStreamReader(TestCase): def test_read_objects_emtpy(self): f = BytesIO() build_pack(f, []) reader = PackStreamReader(f.read) self.assertEqual(0, len(list(reader.read_objects()))) def test_read_objects(self): f = BytesIO() entries = build_pack(f, [ (Blob.type_num, b'blob'), (OFS_DELTA, (0, b'blob1')), ]) reader = PackStreamReader(f.read) objects = list(reader.read_objects(compute_crc32=True)) self.assertEqual(2, len(objects)) unpacked_blob, unpacked_delta = objects self.assertEqual(entries[0][0], unpacked_blob.offset) self.assertEqual(Blob.type_num, unpacked_blob.pack_type_num) self.assertEqual(Blob.type_num, unpacked_blob.obj_type_num) self.assertEqual(None, unpacked_blob.delta_base) self.assertEqual(b'blob', b''.join(unpacked_blob.decomp_chunks)) self.assertEqual(entries[0][4], unpacked_blob.crc32) self.assertEqual(entries[1][0], unpacked_delta.offset) self.assertEqual(OFS_DELTA, unpacked_delta.pack_type_num) self.assertEqual(None, unpacked_delta.obj_type_num) self.assertEqual(unpacked_delta.offset - unpacked_blob.offset, unpacked_delta.delta_base) delta = create_delta(b'blob', b'blob1') self.assertEqual(delta, b''.join(unpacked_delta.decomp_chunks)) self.assertEqual(entries[1][4], unpacked_delta.crc32) def test_read_objects_buffered(self): f = BytesIO() build_pack(f, [ (Blob.type_num, b'blob'), (OFS_DELTA, (0, b'blob1')), ]) reader = PackStreamReader(f.read, zlib_bufsize=4) self.assertEqual(2, len(list(reader.read_objects()))) def test_read_objects_empty(self): reader = PackStreamReader(BytesIO().read) self.assertEqual([], list(reader.read_objects())) class TestPackIterator(DeltaChainIterator): _compute_crc32 = True def __init__(self, *args, **kwargs): super(TestPackIterator, self).__init__(*args, **kwargs) self._unpacked_offsets = set() def _result(self, unpacked): """Return entries in the same format as build_pack.""" return (unpacked.offset, unpacked.obj_type_num, b''.join(unpacked.obj_chunks), unpacked.sha(), unpacked.crc32) def _resolve_object(self, offset, pack_type_num, base_chunks): assert offset not in self._unpacked_offsets, ( 'Attempted to re-inflate offset %i' % offset) self._unpacked_offsets.add(offset) return super(TestPackIterator, self)._resolve_object( offset, pack_type_num, base_chunks) class DeltaChainIteratorTests(TestCase): def setUp(self): super(DeltaChainIteratorTests, self).setUp() self.store = MemoryObjectStore() self.fetched = set() def store_blobs(self, blobs_data): blobs = [] for data in blobs_data: blob = make_object(Blob, data=data) blobs.append(blob) self.store.add_object(blob) return blobs def get_raw_no_repeat(self, bin_sha): """Wrapper around store.get_raw that doesn't allow repeat lookups.""" hex_sha = sha_to_hex(bin_sha) self.assertFalse(hex_sha in self.fetched, 'Attempted to re-fetch object %s' % hex_sha) self.fetched.add(hex_sha) return self.store.get_raw(hex_sha) def make_pack_iter(self, f, thin=None): if thin is None: thin = bool(list(self.store)) resolve_ext_ref = thin and self.get_raw_no_repeat or None data = PackData('test.pack', file=f) return TestPackIterator.for_pack_data( data, resolve_ext_ref=resolve_ext_ref) def assertEntriesMatch(self, expected_indexes, entries, pack_iter): expected = [entries[i] for i in expected_indexes] self.assertEqual(expected, list(pack_iter._walk_all_chains())) def test_no_deltas(self): f = BytesIO() entries = build_pack(f, [ (Commit.type_num, b'commit'), (Blob.type_num, b'blob'), (Tree.type_num, b'tree'), ]) self.assertEntriesMatch([0, 1, 2], entries, self.make_pack_iter(f)) def test_ofs_deltas(self): f = BytesIO() entries = build_pack(f, [ (Blob.type_num, b'blob'), (OFS_DELTA, (0, b'blob1')), (OFS_DELTA, (0, b'blob2')), ]) self.assertEntriesMatch([0, 1, 2], entries, self.make_pack_iter(f)) def test_ofs_deltas_chain(self): f = BytesIO() entries = build_pack(f, [ (Blob.type_num, b'blob'), (OFS_DELTA, (0, b'blob1')), (OFS_DELTA, (1, b'blob2')), ]) self.assertEntriesMatch([0, 1, 2], entries, self.make_pack_iter(f)) def test_ref_deltas(self): f = BytesIO() entries = build_pack(f, [ (REF_DELTA, (1, b'blob1')), (Blob.type_num, (b'blob')), (REF_DELTA, (1, b'blob2')), ]) self.assertEntriesMatch([1, 0, 2], entries, self.make_pack_iter(f)) def test_ref_deltas_chain(self): f = BytesIO() entries = build_pack(f, [ (REF_DELTA, (2, b'blob1')), (Blob.type_num, (b'blob')), (REF_DELTA, (1, b'blob2')), ]) self.assertEntriesMatch([1, 2, 0], entries, self.make_pack_iter(f)) def test_ofs_and_ref_deltas(self): # Deltas pending on this offset are popped before deltas depending on # this ref. f = BytesIO() entries = build_pack(f, [ (REF_DELTA, (1, b'blob1')), (Blob.type_num, (b'blob')), (OFS_DELTA, (1, b'blob2')), ]) self.assertEntriesMatch([1, 2, 0], entries, self.make_pack_iter(f)) def test_mixed_chain(self): f = BytesIO() entries = build_pack(f, [ (Blob.type_num, b'blob'), (REF_DELTA, (2, b'blob2')), (OFS_DELTA, (0, b'blob1')), (OFS_DELTA, (1, b'blob3')), (OFS_DELTA, (0, b'bob')), ]) self.assertEntriesMatch([0, 2, 4, 1, 3], entries, self.make_pack_iter(f)) def test_long_chain(self): n = 100 objects_spec = [(Blob.type_num, b'blob')] for i in range(n): objects_spec.append( (OFS_DELTA, (i, b'blob' + str(i).encode('ascii')))) f = BytesIO() entries = build_pack(f, objects_spec) self.assertEntriesMatch(range(n + 1), entries, self.make_pack_iter(f)) def test_branchy_chain(self): n = 100 objects_spec = [(Blob.type_num, b'blob')] for i in range(n): objects_spec.append( (OFS_DELTA, (0, b'blob' + str(i).encode('ascii')))) f = BytesIO() entries = build_pack(f, objects_spec) self.assertEntriesMatch(range(n + 1), entries, self.make_pack_iter(f)) def test_ext_ref(self): blob, = self.store_blobs([b'blob']) f = BytesIO() entries = build_pack(f, [(REF_DELTA, (blob.id, b'blob1'))], store=self.store) pack_iter = self.make_pack_iter(f) self.assertEntriesMatch([0], entries, pack_iter) self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs()) def test_ext_ref_chain(self): blob, = self.store_blobs([b'blob']) f = BytesIO() entries = build_pack(f, [ (REF_DELTA, (1, b'blob2')), (REF_DELTA, (blob.id, b'blob1')), ], store=self.store) pack_iter = self.make_pack_iter(f) self.assertEntriesMatch([1, 0], entries, pack_iter) self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs()) def test_ext_ref_chain_degenerate(self): # Test a degenerate case where the sender is sending a REF_DELTA # object that expands to an object already in the repository. blob, = self.store_blobs([b'blob']) blob2, = self.store_blobs([b'blob2']) assert blob.id < blob2.id f = BytesIO() entries = build_pack(f, [ (REF_DELTA, (blob.id, b'blob2')), (REF_DELTA, (0, b'blob3')), ], store=self.store) pack_iter = self.make_pack_iter(f) self.assertEntriesMatch([0, 1], entries, pack_iter) self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs()) def test_ext_ref_multiple_times(self): blob, = self.store_blobs([b'blob']) f = BytesIO() entries = build_pack(f, [ (REF_DELTA, (blob.id, b'blob1')), (REF_DELTA, (blob.id, b'blob2')), ], store=self.store) pack_iter = self.make_pack_iter(f) self.assertEntriesMatch([0, 1], entries, pack_iter) self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs()) def test_multiple_ext_refs(self): b1, b2 = self.store_blobs([b'foo', b'bar']) f = BytesIO() entries = build_pack(f, [ (REF_DELTA, (b1.id, b'foo1')), (REF_DELTA, (b2.id, b'bar2')), ], store=self.store) pack_iter = self.make_pack_iter(f) self.assertEntriesMatch([0, 1], entries, pack_iter) self.assertEqual([hex_to_sha(b1.id), hex_to_sha(b2.id)], pack_iter.ext_refs()) def test_bad_ext_ref_non_thin_pack(self): blob, = self.store_blobs([b'blob']) f = BytesIO() build_pack(f, [(REF_DELTA, (blob.id, b'blob1'))], store=self.store) pack_iter = self.make_pack_iter(f, thin=False) try: list(pack_iter._walk_all_chains()) self.fail() except KeyError as e: self.assertEqual(([blob.id],), e.args) def test_bad_ext_ref_thin_pack(self): b1, b2, b3 = self.store_blobs([b'foo', b'bar', b'baz']) f = BytesIO() build_pack(f, [ (REF_DELTA, (1, b'foo99')), (REF_DELTA, (b1.id, b'foo1')), (REF_DELTA, (b2.id, b'bar2')), (REF_DELTA, (b3.id, b'baz3')), ], store=self.store) del self.store[b2.id] del self.store[b3.id] pack_iter = self.make_pack_iter(f) try: list(pack_iter._walk_all_chains()) self.fail() except KeyError as e: self.assertEqual((sorted([b2.id, b3.id]),), (sorted(e.args[0]),)) class DeltaEncodeSizeTests(TestCase): def test_basic(self): self.assertEqual(b'\x00', _delta_encode_size(0)) self.assertEqual(b'\x01', _delta_encode_size(1)) self.assertEqual(b'\xfa\x01', _delta_encode_size(250)) self.assertEqual(b'\xe8\x07', _delta_encode_size(1000)) self.assertEqual(b'\xa0\x8d\x06', _delta_encode_size(100000)) class EncodeCopyOperationTests(TestCase): def test_basic(self): self.assertEqual(b'\x80', _encode_copy_operation(0, 0)) self.assertEqual(b'\x91\x01\x0a', _encode_copy_operation(1, 10)) self.assertEqual(b'\xb1\x64\xe8\x03', _encode_copy_operation(100, 1000)) self.assertEqual(b'\x93\xe8\x03\x01', _encode_copy_operation(1000, 1))