diff --git a/dulwich/index.py b/dulwich/index.py index ef9415af..9eb2e5dd 100644 --- a/dulwich/index.py +++ b/dulwich/index.py @@ -1,833 +1,844 @@ # index.py -- File parser/writer for the git index file # Copyright (C) 2008-2013 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Parser for the git index file format.""" import collections import os import stat import struct import sys from dulwich.file import GitFile from dulwich.objects import ( Blob, S_IFGITLINK, S_ISGITLINK, Tree, hex_to_sha, sha_to_hex, ) from dulwich.pack import ( SHA1Reader, SHA1Writer, ) IndexEntry = collections.namedtuple( 'IndexEntry', [ 'ctime', 'mtime', 'dev', 'ino', 'mode', 'uid', 'gid', 'size', 'sha', 'flags']) FLAG_STAGEMASK = 0x3000 FLAG_VALID = 0x8000 FLAG_EXTENDED = 0x4000 def pathsplit(path): """Split a /-delimited path into a directory part and a basename. Args: path: The path to split. Returns: Tuple with directory name and basename """ try: (dirname, basename) = path.rsplit(b"/", 1) except ValueError: return (b"", path) else: return (dirname, basename) def pathjoin(*args): """Join a /-delimited path. """ return b"/".join([p for p in args if p]) def read_cache_time(f): """Read a cache time. Args: f: File-like object to read from Returns: Tuple with seconds and nanoseconds """ return struct.unpack(">LL", f.read(8)) def write_cache_time(f, t): """Write a cache time. Args: f: File-like object to write to t: Time to write (as int, float or tuple with secs and nsecs) """ if isinstance(t, int): t = (t, 0) elif isinstance(t, float): (secs, nsecs) = divmod(t, 1.0) t = (int(secs), int(nsecs * 1000000000)) elif not isinstance(t, tuple): raise TypeError(t) f.write(struct.pack(">LL", *t)) def read_cache_entry(f): """Read an entry from a cache file. Args: f: File-like object to read from Returns: tuple with: device, inode, mode, uid, gid, size, sha, flags """ beginoffset = f.tell() ctime = read_cache_time(f) mtime = read_cache_time(f) (dev, ino, mode, uid, gid, size, sha, flags, ) = \ struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2)) name = f.read((flags & 0x0fff)) # Padding: real_size = ((f.tell() - beginoffset + 8) & ~7) f.read((beginoffset + real_size) - f.tell()) return (name, ctime, mtime, dev, ino, mode, uid, gid, size, sha_to_hex(sha), flags & ~0x0fff) def write_cache_entry(f, entry): """Write an index entry to a file. Args: f: File object entry: Entry to write, tuple with: (name, ctime, mtime, dev, ino, mode, uid, gid, size, sha, flags) """ beginoffset = f.tell() (name, ctime, mtime, dev, ino, mode, uid, gid, size, sha, flags) = entry write_cache_time(f, ctime) write_cache_time(f, mtime) flags = len(name) | (flags & ~0x0fff) f.write(struct.pack( b'>LLLLLL20sH', dev & 0xFFFFFFFF, ino & 0xFFFFFFFF, mode, uid, gid, size, hex_to_sha(sha), flags)) f.write(name) real_size = ((f.tell() - beginoffset + 8) & ~7) f.write(b'\0' * ((beginoffset + real_size) - f.tell())) def read_index(f): """Read an index file, yielding the individual entries.""" header = f.read(4) if header != b'DIRC': raise AssertionError("Invalid index file header: %r" % header) (version, num_entries) = struct.unpack(b'>LL', f.read(4 * 2)) assert version in (1, 2) for i in range(num_entries): yield read_cache_entry(f) def read_index_dict(f): """Read an index file and return it as a dictionary. Args: f: File object to read from """ ret = {} for x in read_index(f): ret[x[0]] = IndexEntry(*x[1:]) return ret def write_index(f, entries): """Write an index file. Args: f: File-like object to write to entries: Iterable over the entries to write """ f.write(b'DIRC') f.write(struct.pack(b'>LL', 2, len(entries))) for x in entries: write_cache_entry(f, x) def write_index_dict(f, entries): """Write an index file based on the contents of a dictionary. """ entries_list = [] for name in sorted(entries): entries_list.append((name,) + tuple(entries[name])) write_index(f, entries_list) def cleanup_mode(mode): """Cleanup a mode value. This will return a mode that can be stored in a tree object. Args: mode: Mode to clean up. """ if stat.S_ISLNK(mode): return stat.S_IFLNK elif stat.S_ISDIR(mode): return stat.S_IFDIR elif S_ISGITLINK(mode): return S_IFGITLINK ret = stat.S_IFREG | 0o644 if mode & 0o100: ret |= 0o111 return ret class Index(object): """A Git Index file.""" def __init__(self, filename): """Open an index file. Args: filename: Path to the index file """ self._filename = filename self.clear() self.read() @property def path(self): return self._filename def __repr__(self): return "%s(%r)" % (self.__class__.__name__, self._filename) def write(self): """Write current contents of index to disk.""" f = GitFile(self._filename, 'wb') try: f = SHA1Writer(f) write_index_dict(f, self._byname) finally: f.close() def read(self): """Read current contents of index from disk.""" if not os.path.exists(self._filename): return f = GitFile(self._filename, 'rb') try: f = SHA1Reader(f) for x in read_index(f): self[x[0]] = IndexEntry(*x[1:]) # FIXME: Additional data? f.read(os.path.getsize(self._filename)-f.tell()-20) f.check_sha() finally: f.close() def __len__(self): """Number of entries in this index file.""" return len(self._byname) def __getitem__(self, name): """Retrieve entry by relative path. Returns: tuple with (ctime, mtime, dev, ino, mode, uid, gid, size, sha, flags) """ return self._byname[name] def __iter__(self): """Iterate over the paths in this index.""" return iter(self._byname) def get_sha1(self, path): """Return the (git object) SHA1 for the object at a path.""" return self[path].sha def get_mode(self, path): """Return the POSIX file mode for the object at a path.""" return self[path].mode def iterobjects(self): """Iterate over path, sha, mode tuples for use with commit_tree.""" for path in self: entry = self[path] yield path, entry.sha, cleanup_mode(entry.mode) def iterblobs(self): import warnings warnings.warn('Use iterobjects() instead.', PendingDeprecationWarning) return self.iterobjects() def clear(self): """Remove all contents from this index.""" self._byname = {} def __setitem__(self, name, x): assert isinstance(name, bytes) assert len(x) == 10 # Remove the old entry if any self._byname[name] = IndexEntry(*x) def __delitem__(self, name): assert isinstance(name, bytes) del self._byname[name] def iteritems(self): return self._byname.items() def items(self): return self._byname.items() def update(self, entries): for name, value in entries.items(): self[name] = value def changes_from_tree(self, object_store, tree, want_unchanged=False): """Find the differences between the contents of this index and a tree. Args: object_store: Object store to use for retrieving tree contents tree: SHA1 of the root tree want_unchanged: Whether unchanged files should be reported Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) """ def lookup_entry(path): entry = self[path] return entry.sha, cleanup_mode(entry.mode) for (name, mode, sha) in changes_from_tree( self._byname.keys(), lookup_entry, object_store, tree, want_unchanged=want_unchanged): yield (name, mode, sha) def commit(self, object_store): """Create a new tree from an index. Args: object_store: Object store to save the tree in Returns: Root tree SHA """ return commit_tree(object_store, self.iterobjects()) def commit_tree(object_store, blobs): """Commit a new tree. Args: object_store: Object store to add trees to blobs: Iterable over blob path, sha, mode entries Returns: SHA1 of the created tree. """ trees = {b'': {}} def add_tree(path): if path in trees: return trees[path] dirname, basename = pathsplit(path) t = add_tree(dirname) assert isinstance(basename, bytes) newtree = {} t[basename] = newtree trees[path] = newtree return newtree for path, sha, mode in blobs: tree_path, basename = pathsplit(path) tree = add_tree(tree_path) tree[basename] = (mode, sha) def build_tree(path): tree = Tree() for basename, entry in trees[path].items(): if isinstance(entry, dict): mode = stat.S_IFDIR sha = build_tree(pathjoin(path, basename)) else: (mode, sha) = entry tree.add(basename, mode, sha) object_store.add_object(tree) return tree.id return build_tree(b'') def commit_index(object_store, index): """Create a new tree from an index. Args: object_store: Object store to save the tree in index: Index file Note: This function is deprecated, use index.commit() instead. Returns: Root tree sha. """ return commit_tree(object_store, index.iterobjects()) def changes_from_tree(names, lookup_entry, object_store, tree, want_unchanged=False): """Find the differences between the contents of a tree and a working copy. Args: names: Iterable of names in the working copy lookup_entry: Function to lookup an entry in the working copy object_store: Object store to use for retrieving tree contents tree: SHA1 of the root tree, or None for an empty tree want_unchanged: Whether unchanged files should be reported Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) """ # TODO(jelmer): Support a include_trees option other_names = set(names) if tree is not None: for (name, mode, sha) in object_store.iter_tree_contents(tree): try: (other_sha, other_mode) = lookup_entry(name) except KeyError: # Was removed yield ((name, None), (mode, None), (sha, None)) else: other_names.remove(name) if (want_unchanged or other_sha != sha or other_mode != mode): yield ((name, name), (mode, other_mode), (sha, other_sha)) # Mention added files for name in other_names: try: (other_sha, other_mode) = lookup_entry(name) except KeyError: pass else: yield ((None, name), (None, other_mode), (None, other_sha)) def index_entry_from_stat(stat_val, hex_sha, flags, mode=None): """Create a new index entry from a stat value. Args: stat_val: POSIX stat_result instance hex_sha: Hex sha of the object flags: Index flags """ if mode is None: mode = cleanup_mode(stat_val.st_mode) return IndexEntry( stat_val.st_ctime, stat_val.st_mtime, stat_val.st_dev, stat_val.st_ino, mode, stat_val.st_uid, stat_val.st_gid, stat_val.st_size, hex_sha, flags) def build_file_from_blob(blob, mode, target_path, honor_filemode=True, tree_encoding='utf-8'): """Build a file or symlink on disk based on a Git object. Args: obj: The git object mode: File mode target_path: Path to write to honor_filemode: An optional flag to honor core.filemode setting in config file, default is core.filemode=True, change executable bit Returns: stat object for the file """ try: oldstat = os.lstat(target_path) except FileNotFoundError: oldstat = None contents = blob.as_raw_string() if stat.S_ISLNK(mode): # FIXME: This will fail on Windows. What should we do instead? if oldstat: os.unlink(target_path) if sys.platform == 'win32': # os.readlink on Python3 on Windows requires a unicode string. contents = contents.decode(tree_encoding) target_path = target_path.decode(tree_encoding) os.symlink(contents, target_path) else: if oldstat is not None and oldstat.st_size == len(contents): with open(target_path, 'rb') as f: if f.read() == contents: return oldstat with open(target_path, 'wb') as f: # Write out file f.write(contents) if honor_filemode: os.chmod(target_path, mode) return os.lstat(target_path) INVALID_DOTNAMES = (b".git", b".", b"..", b"") def validate_path_element_default(element): return element.lower() not in INVALID_DOTNAMES def validate_path_element_ntfs(element): stripped = element.rstrip(b". ").lower() if stripped in INVALID_DOTNAMES: return False if stripped == b"git~1": return False return True def validate_path(path, element_validator=validate_path_element_default): """Default path validator that just checks for .git/.""" parts = path.split(b"/") for p in parts: if not element_validator(p): return False else: return True def build_index_from_tree(root_path, index_path, object_store, tree_id, honor_filemode=True, validate_path_element=validate_path_element_default): """Generate and materialize index from a tree Args: tree_id: Tree to materialize root_path: Target dir for materialized index files index_path: Target path for generated index object_store: Non-empty object store holding tree contents honor_filemode: An optional flag to honor core.filemode setting in config file, default is core.filemode=True, change executable bit validate_path_element: Function to validate path elements to check out; default just refuses .git and .. directories. Note: existing index is wiped and contents are not merged in a working dir. Suitable only for fresh clones. """ index = Index(index_path) if not isinstance(root_path, bytes): root_path = os.fsencode(root_path) for entry in object_store.iter_tree_contents(tree_id): if not validate_path(entry.path, validate_path_element): continue full_path = _tree_to_fs_path(root_path, entry.path) if not os.path.exists(os.path.dirname(full_path)): os.makedirs(os.path.dirname(full_path)) # TODO(jelmer): Merge new index into working tree if S_ISGITLINK(entry.mode): if not os.path.isdir(full_path): os.mkdir(full_path) st = os.lstat(full_path) # TODO(jelmer): record and return submodule paths else: obj = object_store[entry.sha] - st = build_file_from_blob( - obj, entry.mode, full_path, honor_filemode=honor_filemode) + try: + st = build_file_from_blob( + obj, entry.mode, full_path, honor_filemode=honor_filemode) + except OSError as e: + if e.errno == 92 and sys.platform == 'darwin': + # Our filename isn't supported by the platform :( + import warnings + warnings.warn( + 'Unable to write ile %s: %s', full_path, e.strerror) + continue + else: + raise + # Add file to index if not honor_filemode or S_ISGITLINK(entry.mode): # we can not use tuple slicing to build a new tuple, # because on windows that will convert the times to # longs, which causes errors further along st_tuple = (entry.mode, st.st_ino, st.st_dev, st.st_nlink, st.st_uid, st.st_gid, st.st_size, st.st_atime, st.st_mtime, st.st_ctime) st = st.__class__(st_tuple) index[entry.path] = index_entry_from_stat(st, entry.sha, 0) index.write() def blob_from_path_and_stat(fs_path, st, tree_encoding='utf-8'): """Create a blob from a path and a stat object. Args: fs_path: Full file system path to file st: A stat object Returns: A `Blob` object """ assert isinstance(fs_path, bytes) blob = Blob() if stat.S_ISLNK(st.st_mode): if sys.platform == 'win32': # os.readlink on Python3 on Windows requires a unicode string. fs_path = os.fsdecode(fs_path) blob.data = os.readlink(fs_path).encode(tree_encoding) else: blob.data = os.readlink(fs_path) else: with open(fs_path, 'rb') as f: blob.data = f.read() return blob def read_submodule_head(path): """Read the head commit of a submodule. Args: path: path to the submodule Returns: HEAD sha, None if not a valid head/repository """ from dulwich.errors import NotGitRepository from dulwich.repo import Repo # Repo currently expects a "str", so decode if necessary. # TODO(jelmer): Perhaps move this into Repo() ? if not isinstance(path, str): path = os.fsdecode(path) try: repo = Repo(path) except NotGitRepository: return None try: return repo.head() except KeyError: return None def _has_directory_changed(tree_path, entry): """Check if a directory has changed after getting an error. When handling an error trying to create a blob from a path, call this function. It will check if the path is a directory. If it's a directory and a submodule, check the submodule head to see if it's has changed. If not, consider the file as changed as Git tracked a file and not a directory. Return true if the given path should be considered as changed and False otherwise or if the path is not a directory. """ # This is actually a directory if os.path.exists(os.path.join(tree_path, b'.git')): # Submodule head = read_submodule_head(tree_path) if entry.sha != head: return True else: # The file was changed to a directory, so consider it removed. return True return False def get_unstaged_changes(index, root_path, filter_blob_callback=None): """Walk through an index and check for differences against working tree. Args: index: index to check root_path: path in which to find files Returns: iterator over paths with unstaged changes """ # For each entry in the index check the sha1 & ensure not staged if not isinstance(root_path, bytes): root_path = os.fsencode(root_path) for tree_path, entry in index.iteritems(): full_path = _tree_to_fs_path(root_path, tree_path) try: st = os.lstat(full_path) if stat.S_ISDIR(st.st_mode): if _has_directory_changed(tree_path, entry): yield tree_path continue if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode): continue blob = blob_from_path_and_stat(full_path, st) if filter_blob_callback is not None: blob = filter_blob_callback(blob, tree_path) except FileNotFoundError: # The file was removed, so we assume that counts as # different from whatever file used to exist. yield tree_path else: if blob.id != entry.sha: yield tree_path os_sep_bytes = os.sep.encode('ascii') def _tree_to_fs_path(root_path, tree_path): """Convert a git tree path to a file system path. Args: root_path: Root filesystem path tree_path: Git tree path as bytes Returns: File system path. """ assert isinstance(tree_path, bytes) if os_sep_bytes != b'/': sep_corrected_path = tree_path.replace(b'/', os_sep_bytes) else: sep_corrected_path = tree_path return os.path.join(root_path, sep_corrected_path) def _fs_to_tree_path(fs_path, fs_encoding=None): """Convert a file system path to a git tree path. Args: fs_path: File system path. fs_encoding: File system encoding Returns: Git tree path as bytes """ if not isinstance(fs_path, bytes): fs_path_bytes = fs_path.encode(fs_encoding) else: fs_path_bytes = fs_path if os_sep_bytes != b'/': tree_path = fs_path_bytes.replace(os_sep_bytes, b'/') else: tree_path = fs_path_bytes return tree_path def index_entry_from_path(path, object_store=None): """Create an index from a filesystem path. This returns an index value for files, symlinks and tree references. for directories and non-existant files it returns None Args: path: Path to create an index entry for object_store: Optional object store to save new blobs in Returns: An index entry; None for directories """ assert isinstance(path, bytes) st = os.lstat(path) if stat.S_ISDIR(st.st_mode): if os.path.exists(os.path.join(path, b'.git')): head = read_submodule_head(path) if head is None: return None return index_entry_from_stat( st, head, 0, mode=S_IFGITLINK) return None if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode): blob = blob_from_path_and_stat(path, st) if object_store is not None: object_store.add_object(blob) return index_entry_from_stat(st, blob.id, 0) return None def iter_fresh_entries(paths, root_path, object_store=None): """Iterate over current versions of index entries on disk. Args: paths: Paths to iterate over root_path: Root path to access from store: Optional store to save new blobs in Returns: Iterator over path, index_entry """ for path in paths: p = _tree_to_fs_path(root_path, path) try: entry = index_entry_from_path(p, object_store=object_store) except (FileNotFoundError, IsADirectoryError): entry = None yield path, entry def iter_fresh_blobs(index, root_path): """Iterate over versions of blobs on disk referenced by index. Don't use this function; it removes missing entries from index. Args: index: Index file root_path: Root path to access from include_deleted: Include deleted entries with sha and mode set to None Returns: Iterator over path, sha, mode """ import warnings warnings.warn(PendingDeprecationWarning, "Use iter_fresh_objects instead.") for entry in iter_fresh_objects( index, root_path, include_deleted=True): if entry[1] is None: del index[entry[0]] else: yield entry def iter_fresh_objects(paths, root_path, include_deleted=False, object_store=None): """Iterate over versions of objecs on disk referenced by index. Args: index: Index file root_path: Root path to access from include_deleted: Include deleted entries with sha and mode set to None object_store: Optional object store to report new items to Returns: Iterator over path, sha, mode """ for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store): if entry is None: if include_deleted: yield path, None, None else: entry = IndexEntry(*entry) yield path, entry.sha, cleanup_mode(entry.mode) def refresh_index(index, root_path): """Refresh the contents of an index. This is the equivalent to running 'git commit -a'. Args: index: Index to update root_path: Root filesystem path """ for path, entry in iter_fresh_entries(index, root_path): index[path] = path diff --git a/dulwich/tests/test_hooks.py b/dulwich/tests/test_hooks.py index cc3be8ff..47bca20a 100644 --- a/dulwich/tests/test_hooks.py +++ b/dulwich/tests/test_hooks.py @@ -1,158 +1,159 @@ # test_hooks.py -- Tests for executing hooks # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Tests for executing hooks.""" import os import stat import shutil import tempfile from dulwich import errors from dulwich.hooks import ( PreCommitShellHook, PostCommitShellHook, CommitMsgShellHook, ) from dulwich.tests import TestCase class ShellHookTests(TestCase): def setUp(self): super(ShellHookTests, self).setUp() if os.name != 'posix': self.skipTest('shell hook tests requires POSIX shell') + self.assertTrue(os.path.exists('/bin/sh')) def test_hook_pre_commit(self): repo_dir = os.path.join(tempfile.mkdtemp()) os.mkdir(os.path.join(repo_dir, 'hooks')) self.addCleanup(shutil.rmtree, repo_dir) pre_commit_fail = """#!/bin/sh exit 1 """ pre_commit_success = """#!/bin/sh exit 0 """ pre_commit_cwd = """#!/bin/sh if [ "$(pwd)" = '""" + repo_dir + "' ]; then exit 0; else exit 1; fi\n" pre_commit = os.path.join(repo_dir, 'hooks', 'pre-commit') hook = PreCommitShellHook(repo_dir) with open(pre_commit, 'w') as f: f.write(pre_commit_fail) os.chmod(pre_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) self.assertRaises(errors.HookError, hook.execute) with open(pre_commit, 'w') as f: f.write(pre_commit_cwd) os.chmod(pre_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) hook.execute() with open(pre_commit, 'w') as f: f.write(pre_commit_success) os.chmod(pre_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) hook.execute() def test_hook_commit_msg(self): repo_dir = os.path.join(tempfile.mkdtemp()) os.mkdir(os.path.join(repo_dir, 'hooks')) self.addCleanup(shutil.rmtree, repo_dir) commit_msg_fail = """#!/bin/sh exit 1 """ commit_msg_success = """#!/bin/sh exit 0 """ commit_msg_cwd = """#!/bin/sh if [ "$(pwd)" = '""" + repo_dir + "' ]; then exit 0; else exit 1; fi\n" commit_msg = os.path.join(repo_dir, 'hooks', 'commit-msg') hook = CommitMsgShellHook(repo_dir) with open(commit_msg, 'w') as f: f.write(commit_msg_fail) os.chmod(commit_msg, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) self.assertRaises(errors.HookError, hook.execute, b'failed commit') with open(commit_msg, 'w') as f: f.write(commit_msg_cwd) os.chmod(commit_msg, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) hook.execute(b'cwd test commit') with open(commit_msg, 'w') as f: f.write(commit_msg_success) os.chmod(commit_msg, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) hook.execute(b'empty commit') def test_hook_post_commit(self): (fd, path) = tempfile.mkstemp() os.close(fd) repo_dir = os.path.join(tempfile.mkdtemp()) os.mkdir(os.path.join(repo_dir, 'hooks')) self.addCleanup(shutil.rmtree, repo_dir) post_commit_success = """#!/bin/sh rm """ + path + "\n" post_commit_fail = """#!/bin/sh exit 1 """ post_commit_cwd = """#!/bin/sh if [ "$(pwd)" = '""" + repo_dir + "' ]; then exit 0; else exit 1; fi\n" post_commit = os.path.join(repo_dir, 'hooks', 'post-commit') hook = PostCommitShellHook(repo_dir) with open(post_commit, 'w') as f: f.write(post_commit_fail) os.chmod(post_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) self.assertRaises(errors.HookError, hook.execute) with open(post_commit, 'w') as f: f.write(post_commit_cwd) os.chmod(post_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) hook.execute() with open(post_commit, 'w') as f: f.write(post_commit_success) os.chmod(post_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) hook.execute() self.assertFalse(os.path.exists(path)) diff --git a/dulwich/tests/test_refs.py b/dulwich/tests/test_refs.py index b2ec1e0a..66af5e19 100644 --- a/dulwich/tests/test_refs.py +++ b/dulwich/tests/test_refs.py @@ -1,689 +1,689 @@ # test_refs.py -- tests for refs.py # encoding: utf-8 # Copyright (C) 2013 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Tests for dulwich.refs.""" from io import BytesIO import os import sys import tempfile from dulwich import errors from dulwich.file import ( GitFile, ) from dulwich.objects import ZERO_SHA from dulwich.refs import ( DictRefsContainer, InfoRefsContainer, check_ref_format, _split_ref_line, parse_symref_value, read_packed_refs_with_peeled, read_packed_refs, strip_peeled_refs, write_packed_refs, ) from dulwich.repo import Repo from dulwich.tests import ( SkipTest, TestCase, ) from dulwich.tests.utils import ( open_repo, tear_down_repo, ) class CheckRefFormatTests(TestCase): """Tests for the check_ref_format function. These are the same tests as in the git test suite. """ def test_valid(self): self.assertTrue(check_ref_format(b'heads/foo')) self.assertTrue(check_ref_format(b'foo/bar/baz')) self.assertTrue(check_ref_format(b'refs///heads/foo')) self.assertTrue(check_ref_format(b'foo./bar')) self.assertTrue(check_ref_format(b'heads/foo@bar')) self.assertTrue(check_ref_format(b'heads/fix.lock.error')) def test_invalid(self): self.assertFalse(check_ref_format(b'foo')) self.assertFalse(check_ref_format(b'heads/foo/')) self.assertFalse(check_ref_format(b'./foo')) self.assertFalse(check_ref_format(b'.refs/foo')) self.assertFalse(check_ref_format(b'heads/foo..bar')) self.assertFalse(check_ref_format(b'heads/foo?bar')) self.assertFalse(check_ref_format(b'heads/foo.lock')) self.assertFalse(check_ref_format(b'heads/v@{ation')) self.assertFalse(check_ref_format(b'heads/foo\bar')) ONES = b'1' * 40 TWOS = b'2' * 40 THREES = b'3' * 40 FOURS = b'4' * 40 class PackedRefsFileTests(TestCase): def test_split_ref_line_errors(self): self.assertRaises(errors.PackedRefsException, _split_ref_line, b'singlefield') self.assertRaises(errors.PackedRefsException, _split_ref_line, b'badsha name') self.assertRaises(errors.PackedRefsException, _split_ref_line, ONES + b' bad/../refname') def test_read_without_peeled(self): f = BytesIO(b'\n'.join([ b'# comment', ONES + b' ref/1', TWOS + b' ref/2'])) self.assertEqual([(ONES, b'ref/1'), (TWOS, b'ref/2')], list(read_packed_refs(f))) def test_read_without_peeled_errors(self): f = BytesIO(b'\n'.join([ ONES + b' ref/1', b'^' + TWOS])) self.assertRaises(errors.PackedRefsException, list, read_packed_refs(f)) def test_read_with_peeled(self): f = BytesIO(b'\n'.join([ ONES + b' ref/1', TWOS + b' ref/2', b'^' + THREES, FOURS + b' ref/4'])) self.assertEqual([ (ONES, b'ref/1', None), (TWOS, b'ref/2', THREES), (FOURS, b'ref/4', None), ], list(read_packed_refs_with_peeled(f))) def test_read_with_peeled_errors(self): f = BytesIO(b'\n'.join([ b'^' + TWOS, ONES + b' ref/1'])) self.assertRaises(errors.PackedRefsException, list, read_packed_refs(f)) f = BytesIO(b'\n'.join([ ONES + b' ref/1', b'^' + TWOS, b'^' + THREES])) self.assertRaises(errors.PackedRefsException, list, read_packed_refs(f)) def test_write_with_peeled(self): f = BytesIO() write_packed_refs(f, {b'ref/1': ONES, b'ref/2': TWOS}, {b'ref/1': THREES}) self.assertEqual( b'\n'.join([b'# pack-refs with: peeled', ONES + b' ref/1', b'^' + THREES, TWOS + b' ref/2']) + b'\n', f.getvalue()) def test_write_without_peeled(self): f = BytesIO() write_packed_refs(f, {b'ref/1': ONES, b'ref/2': TWOS}) self.assertEqual(b'\n'.join([ONES + b' ref/1', TWOS + b' ref/2']) + b'\n', f.getvalue()) # Dict of refs that we expect all RefsContainerTests subclasses to define. _TEST_REFS = { b'HEAD': b'42d06bd4b77fed026b154d16493e5deab78f02ec', b'refs/heads/40-char-ref-aaaaaaaaaaaaaaaaaa': b'42d06bd4b77fed026b154d16493e5deab78f02ec', b'refs/heads/master': b'42d06bd4b77fed026b154d16493e5deab78f02ec', b'refs/heads/packed': b'42d06bd4b77fed026b154d16493e5deab78f02ec', b'refs/tags/refs-0.1': b'df6800012397fb85c56e7418dd4eb9405dee075c', b'refs/tags/refs-0.2': b'3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8', b'refs/heads/loop': b'ref: refs/heads/loop', } class RefsContainerTests(object): def test_keys(self): actual_keys = set(self._refs.keys()) self.assertEqual(set(self._refs.allkeys()), actual_keys) self.assertEqual(set(_TEST_REFS.keys()), actual_keys) actual_keys = self._refs.keys(b'refs/heads') actual_keys.discard(b'loop') self.assertEqual( [b'40-char-ref-aaaaaaaaaaaaaaaaaa', b'master', b'packed'], sorted(actual_keys)) self.assertEqual([b'refs-0.1', b'refs-0.2'], sorted(self._refs.keys(b'refs/tags'))) def test_iter(self): actual_keys = set(self._refs.keys()) self.assertEqual(set(self._refs), actual_keys) self.assertEqual(set(_TEST_REFS.keys()), actual_keys) def test_as_dict(self): # refs/heads/loop does not show up even if it exists expected_refs = dict(_TEST_REFS) del expected_refs[b'refs/heads/loop'] self.assertEqual(expected_refs, self._refs.as_dict()) def test_get_symrefs(self): self._refs.set_symbolic_ref(b'refs/heads/src', b'refs/heads/dst') symrefs = self._refs.get_symrefs() if b'HEAD' in symrefs: symrefs.pop(b'HEAD') self.assertEqual({b'refs/heads/src': b'refs/heads/dst', b'refs/heads/loop': b'refs/heads/loop'}, symrefs) def test_setitem(self): self._refs[b'refs/some/ref'] = ( b'42d06bd4b77fed026b154d16493e5deab78f02ec') self.assertEqual(b'42d06bd4b77fed026b154d16493e5deab78f02ec', self._refs[b'refs/some/ref']) self.assertRaises( errors.RefFormatError, self._refs.__setitem__, b'notrefs/foo', b'42d06bd4b77fed026b154d16493e5deab78f02ec') def test_set_if_equals(self): nines = b'9' * 40 self.assertFalse(self._refs.set_if_equals(b'HEAD', b'c0ffee', nines)) self.assertEqual(b'42d06bd4b77fed026b154d16493e5deab78f02ec', self._refs[b'HEAD']) self.assertTrue(self._refs.set_if_equals( b'HEAD', b'42d06bd4b77fed026b154d16493e5deab78f02ec', nines)) self.assertEqual(nines, self._refs[b'HEAD']) # Setting the ref again is a no-op, but will return True. self.assertTrue(self._refs.set_if_equals(b'HEAD', nines, nines)) self.assertEqual(nines, self._refs[b'HEAD']) self.assertTrue(self._refs.set_if_equals(b'refs/heads/master', None, nines)) self.assertEqual(nines, self._refs[b'refs/heads/master']) self.assertTrue(self._refs.set_if_equals( b'refs/heads/nonexistant', ZERO_SHA, nines)) self.assertEqual(nines, self._refs[b'refs/heads/nonexistant']) def test_add_if_new(self): nines = b'9' * 40 self.assertFalse(self._refs.add_if_new(b'refs/heads/master', nines)) self.assertEqual(b'42d06bd4b77fed026b154d16493e5deab78f02ec', self._refs[b'refs/heads/master']) self.assertTrue(self._refs.add_if_new(b'refs/some/ref', nines)) self.assertEqual(nines, self._refs[b'refs/some/ref']) def test_set_symbolic_ref(self): self._refs.set_symbolic_ref(b'refs/heads/symbolic', b'refs/heads/master') self.assertEqual(b'ref: refs/heads/master', self._refs.read_loose_ref(b'refs/heads/symbolic')) self.assertEqual(b'42d06bd4b77fed026b154d16493e5deab78f02ec', self._refs[b'refs/heads/symbolic']) def test_set_symbolic_ref_overwrite(self): nines = b'9' * 40 self.assertFalse(b'refs/heads/symbolic' in self._refs) self._refs[b'refs/heads/symbolic'] = nines self.assertEqual(nines, self._refs.read_loose_ref(b'refs/heads/symbolic')) self._refs.set_symbolic_ref(b'refs/heads/symbolic', b'refs/heads/master') self.assertEqual(b'ref: refs/heads/master', self._refs.read_loose_ref(b'refs/heads/symbolic')) self.assertEqual(b'42d06bd4b77fed026b154d16493e5deab78f02ec', self._refs[b'refs/heads/symbolic']) def test_check_refname(self): self._refs._check_refname(b'HEAD') self._refs._check_refname(b'refs/stash') self._refs._check_refname(b'refs/heads/foo') self.assertRaises(errors.RefFormatError, self._refs._check_refname, b'refs') self.assertRaises(errors.RefFormatError, self._refs._check_refname, b'notrefs/foo') def test_contains(self): self.assertTrue(b'refs/heads/master' in self._refs) self.assertFalse(b'refs/heads/bar' in self._refs) def test_delitem(self): self.assertEqual(b'42d06bd4b77fed026b154d16493e5deab78f02ec', self._refs[b'refs/heads/master']) del self._refs[b'refs/heads/master'] self.assertRaises(KeyError, lambda: self._refs[b'refs/heads/master']) def test_remove_if_equals(self): self.assertFalse(self._refs.remove_if_equals(b'HEAD', b'c0ffee')) self.assertEqual(b'42d06bd4b77fed026b154d16493e5deab78f02ec', self._refs[b'HEAD']) self.assertTrue(self._refs.remove_if_equals( b'refs/tags/refs-0.2', b'3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8')) self.assertTrue(self._refs.remove_if_equals( b'refs/tags/refs-0.2', ZERO_SHA)) self.assertFalse(b'refs/tags/refs-0.2' in self._refs) def test_import_refs_name(self): self._refs[b'refs/remotes/origin/other'] = ( b'48d01bd4b77fed026b154d16493e5deab78f02ec') self._refs.import_refs( b'refs/remotes/origin', {b'master': b'42d06bd4b77fed026b154d16493e5deab78f02ec'}) self.assertEqual( b'42d06bd4b77fed026b154d16493e5deab78f02ec', self._refs[b'refs/remotes/origin/master']) self.assertEqual( b'48d01bd4b77fed026b154d16493e5deab78f02ec', self._refs[b'refs/remotes/origin/other']) def test_import_refs_name_prune(self): self._refs[b'refs/remotes/origin/other'] = ( b'48d01bd4b77fed026b154d16493e5deab78f02ec') self._refs.import_refs( b'refs/remotes/origin', {b'master': b'42d06bd4b77fed026b154d16493e5deab78f02ec'}, prune=True) self.assertEqual( b'42d06bd4b77fed026b154d16493e5deab78f02ec', self._refs[b'refs/remotes/origin/master']) self.assertNotIn( b'refs/remotes/origin/other', self._refs) class DictRefsContainerTests(RefsContainerTests, TestCase): def setUp(self): TestCase.setUp(self) self._refs = DictRefsContainer(dict(_TEST_REFS)) def test_invalid_refname(self): # FIXME: Move this test into RefsContainerTests, but requires # some way of injecting invalid refs. self._refs._refs[b'refs/stash'] = b'00' * 20 expected_refs = dict(_TEST_REFS) del expected_refs[b'refs/heads/loop'] expected_refs[b'refs/stash'] = b'00' * 20 self.assertEqual(expected_refs, self._refs.as_dict()) class DiskRefsContainerTests(RefsContainerTests, TestCase): def setUp(self): TestCase.setUp(self) self._repo = open_repo('refs.git') self.addCleanup(tear_down_repo, self._repo) self._refs = self._repo.refs def test_get_packed_refs(self): self.assertEqual({ b'refs/heads/packed': b'42d06bd4b77fed026b154d16493e5deab78f02ec', b'refs/tags/refs-0.1': b'df6800012397fb85c56e7418dd4eb9405dee075c', }, self._refs.get_packed_refs()) def test_get_peeled_not_packed(self): # not packed self.assertEqual(None, self._refs.get_peeled(b'refs/tags/refs-0.2')) self.assertEqual(b'3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8', self._refs[b'refs/tags/refs-0.2']) # packed, known not peelable self.assertEqual(self._refs[b'refs/heads/packed'], self._refs.get_peeled(b'refs/heads/packed')) # packed, peeled self.assertEqual(b'42d06bd4b77fed026b154d16493e5deab78f02ec', self._refs.get_peeled(b'refs/tags/refs-0.1')) def test_setitem(self): RefsContainerTests.test_setitem(self) path = os.path.join(self._refs.path, b'refs', b'some', b'ref') with open(path, 'rb') as f: self.assertEqual(b'42d06bd4b77fed026b154d16493e5deab78f02ec', f.read()[:40]) self.assertRaises( OSError, self._refs.__setitem__, b'refs/some/ref/sub', b'42d06bd4b77fed026b154d16493e5deab78f02ec') def test_setitem_packed(self): with open(os.path.join(self._refs.path, b'packed-refs'), 'w') as f: f.write('# pack-refs with: peeled fully-peeled sorted \n') f.write( '42d06bd4b77fed026b154d16493e5deab78f02ec refs/heads/packed\n') # It's allowed to set a new ref on a packed ref, the new ref will be # placed outside on refs/ self._refs[b'refs/heads/packed'] = ( b'3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8' ) packed_ref_path = os.path.join( self._refs.path, b'refs', b'heads', b'packed') with open(packed_ref_path, 'rb') as f: self.assertEqual( b'3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8', f.read()[:40]) self.assertRaises( OSError, self._refs.__setitem__, b'refs/heads/packed/sub', b'42d06bd4b77fed026b154d16493e5deab78f02ec') def test_setitem_symbolic(self): ones = b'1' * 40 self._refs[b'HEAD'] = ones self.assertEqual(ones, self._refs[b'HEAD']) # ensure HEAD was not modified f = open(os.path.join(self._refs.path, b'HEAD'), 'rb') v = next(iter(f)).rstrip(b'\n\r') f.close() self.assertEqual(b'ref: refs/heads/master', v) # ensure the symbolic link was written through f = open(os.path.join(self._refs.path, b'refs', b'heads', b'master'), 'rb') self.assertEqual(ones, f.read()[:40]) f.close() def test_set_if_equals(self): RefsContainerTests.test_set_if_equals(self) # ensure symref was followed self.assertEqual(b'9' * 40, self._refs[b'refs/heads/master']) # ensure lockfile was deleted self.assertFalse(os.path.exists( os.path.join(self._refs.path, b'refs', b'heads', b'master.lock'))) self.assertFalse(os.path.exists( os.path.join(self._refs.path, b'HEAD.lock'))) def test_add_if_new_packed(self): # don't overwrite packed ref self.assertFalse(self._refs.add_if_new(b'refs/tags/refs-0.1', b'9' * 40)) self.assertEqual(b'df6800012397fb85c56e7418dd4eb9405dee075c', self._refs[b'refs/tags/refs-0.1']) def test_add_if_new_symbolic(self): # Use an empty repo instead of the default. repo_dir = os.path.join(tempfile.mkdtemp(), 'test') os.makedirs(repo_dir) repo = Repo.init(repo_dir) self.addCleanup(tear_down_repo, repo) refs = repo.refs nines = b'9' * 40 self.assertEqual(b'ref: refs/heads/master', refs.read_ref(b'HEAD')) self.assertFalse(b'refs/heads/master' in refs) self.assertTrue(refs.add_if_new(b'HEAD', nines)) self.assertEqual(b'ref: refs/heads/master', refs.read_ref(b'HEAD')) self.assertEqual(nines, refs[b'HEAD']) self.assertEqual(nines, refs[b'refs/heads/master']) self.assertFalse(refs.add_if_new(b'HEAD', b'1' * 40)) self.assertEqual(nines, refs[b'HEAD']) self.assertEqual(nines, refs[b'refs/heads/master']) def test_follow(self): self.assertEqual(([b'HEAD', b'refs/heads/master'], b'42d06bd4b77fed026b154d16493e5deab78f02ec'), self._refs.follow(b'HEAD')) self.assertEqual(([b'refs/heads/master'], b'42d06bd4b77fed026b154d16493e5deab78f02ec'), self._refs.follow(b'refs/heads/master')) self.assertRaises(KeyError, self._refs.follow, b'refs/heads/loop') def test_delitem(self): RefsContainerTests.test_delitem(self) ref_file = os.path.join(self._refs.path, b'refs', b'heads', b'master') self.assertFalse(os.path.exists(ref_file)) self.assertFalse(b'refs/heads/master' in self._refs.get_packed_refs()) def test_delitem_symbolic(self): self.assertEqual(b'ref: refs/heads/master', self._refs.read_loose_ref(b'HEAD')) del self._refs[b'HEAD'] self.assertRaises(KeyError, lambda: self._refs[b'HEAD']) self.assertEqual(b'42d06bd4b77fed026b154d16493e5deab78f02ec', self._refs[b'refs/heads/master']) self.assertFalse( os.path.exists(os.path.join(self._refs.path, b'HEAD'))) def test_remove_if_equals_symref(self): # HEAD is a symref, so shouldn't equal its dereferenced value self.assertFalse(self._refs.remove_if_equals( b'HEAD', b'42d06bd4b77fed026b154d16493e5deab78f02ec')) self.assertTrue(self._refs.remove_if_equals( b'refs/heads/master', b'42d06bd4b77fed026b154d16493e5deab78f02ec')) self.assertRaises(KeyError, lambda: self._refs[b'refs/heads/master']) # HEAD is now a broken symref self.assertRaises(KeyError, lambda: self._refs[b'HEAD']) self.assertEqual(b'ref: refs/heads/master', self._refs.read_loose_ref(b'HEAD')) self.assertFalse(os.path.exists( os.path.join(self._refs.path, b'refs', b'heads', b'master.lock'))) self.assertFalse(os.path.exists( os.path.join(self._refs.path, b'HEAD.lock'))) def test_remove_packed_without_peeled(self): refs_file = os.path.join(self._repo.path, 'packed-refs') f = GitFile(refs_file) refs_data = f.read() f.close() f = GitFile(refs_file, 'wb') f.write(b'\n'.join(line for line in refs_data.split(b'\n') if not line or line[0] not in b'#^')) f.close() self._repo = Repo(self._repo.path) refs = self._repo.refs self.assertTrue(refs.remove_if_equals( b'refs/heads/packed', b'42d06bd4b77fed026b154d16493e5deab78f02ec')) def test_remove_if_equals_packed(self): # test removing ref that is only packed self.assertEqual(b'df6800012397fb85c56e7418dd4eb9405dee075c', self._refs[b'refs/tags/refs-0.1']) self.assertTrue( self._refs.remove_if_equals( b'refs/tags/refs-0.1', b'df6800012397fb85c56e7418dd4eb9405dee075c')) self.assertRaises(KeyError, lambda: self._refs[b'refs/tags/refs-0.1']) def test_remove_parent(self): self._refs[b'refs/heads/foo/bar'] = ( b'df6800012397fb85c56e7418dd4eb9405dee075c' ) del self._refs[b'refs/heads/foo/bar'] ref_file = os.path.join( self._refs.path, b'refs', b'heads', b'foo', b'bar', ) self.assertFalse(os.path.exists(ref_file)) ref_file = os.path.join(self._refs.path, b'refs', b'heads', b'foo') self.assertFalse(os.path.exists(ref_file)) ref_file = os.path.join(self._refs.path, b'refs', b'heads') self.assertTrue(os.path.exists(ref_file)) self._refs[b'refs/heads/foo'] = ( b'df6800012397fb85c56e7418dd4eb9405dee075c' ) def test_read_ref(self): self.assertEqual(b'ref: refs/heads/master', self._refs.read_ref(b'HEAD')) self.assertEqual(b'42d06bd4b77fed026b154d16493e5deab78f02ec', self._refs.read_ref(b'refs/heads/packed')) self.assertEqual(None, self._refs.read_ref(b'nonexistant')) def test_read_loose_ref(self): self._refs[b'refs/heads/foo'] = ( b'df6800012397fb85c56e7418dd4eb9405dee075c' ) self.assertEqual(None, self._refs.read_ref(b'refs/heads/foo/bar')) def test_non_ascii(self): try: encoded_ref = os.fsencode(u'refs/tags/schön') except UnicodeEncodeError: raise SkipTest( "filesystem encoding doesn't support special character") p = os.path.join(os.fsencode(self._repo.path), encoded_ref) with open(p, 'w') as f: f.write('00' * 20) expected_refs = dict(_TEST_REFS) expected_refs[encoded_ref] = b'00' * 20 del expected_refs[b'refs/heads/loop'] self.assertEqual(expected_refs, self._repo.get_refs()) def test_cyrillic(self): - if sys.platform == 'win32': + if sys.platform in ('darwin', 'win32'): raise SkipTest( "filesystem encoding doesn't support arbitrary bytes") # reported in https://github.com/dulwich/dulwich/issues/608 name = b'\xcd\xee\xe2\xe0\xff\xe2\xe5\xf2\xea\xe01' encoded_ref = b'refs/heads/' + name with open(os.path.join( os.fsencode(self._repo.path), encoded_ref), 'w') as f: f.write('00' * 20) expected_refs = set(_TEST_REFS.keys()) expected_refs.add(encoded_ref) self.assertEqual(expected_refs, set(self._repo.refs.allkeys())) self.assertEqual({r[len(b'refs/'):] for r in expected_refs if r.startswith(b'refs/')}, set(self._repo.refs.subkeys(b'refs/'))) expected_refs.remove(b'refs/heads/loop') expected_refs.add(b'HEAD') self.assertEqual(expected_refs, set(self._repo.get_refs().keys())) _TEST_REFS_SERIALIZED = ( b'42d06bd4b77fed026b154d16493e5deab78f02ec\t' b'refs/heads/40-char-ref-aaaaaaaaaaaaaaaaaa\n' b'42d06bd4b77fed026b154d16493e5deab78f02ec\trefs/heads/master\n' b'42d06bd4b77fed026b154d16493e5deab78f02ec\trefs/heads/packed\n' b'df6800012397fb85c56e7418dd4eb9405dee075c\trefs/tags/refs-0.1\n' b'3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8\trefs/tags/refs-0.2\n') class InfoRefsContainerTests(TestCase): def test_invalid_refname(self): text = _TEST_REFS_SERIALIZED + b'00' * 20 + b'\trefs/stash\n' refs = InfoRefsContainer(BytesIO(text)) expected_refs = dict(_TEST_REFS) del expected_refs[b'HEAD'] expected_refs[b'refs/stash'] = b'00' * 20 del expected_refs[b'refs/heads/loop'] self.assertEqual(expected_refs, refs.as_dict()) def test_keys(self): refs = InfoRefsContainer(BytesIO(_TEST_REFS_SERIALIZED)) actual_keys = set(refs.keys()) self.assertEqual(set(refs.allkeys()), actual_keys) expected_refs = dict(_TEST_REFS) del expected_refs[b'HEAD'] del expected_refs[b'refs/heads/loop'] self.assertEqual(set(expected_refs.keys()), actual_keys) actual_keys = refs.keys(b'refs/heads') actual_keys.discard(b'loop') self.assertEqual( [b'40-char-ref-aaaaaaaaaaaaaaaaaa', b'master', b'packed'], sorted(actual_keys)) self.assertEqual([b'refs-0.1', b'refs-0.2'], sorted(refs.keys(b'refs/tags'))) def test_as_dict(self): refs = InfoRefsContainer(BytesIO(_TEST_REFS_SERIALIZED)) # refs/heads/loop does not show up even if it exists expected_refs = dict(_TEST_REFS) del expected_refs[b'HEAD'] del expected_refs[b'refs/heads/loop'] self.assertEqual(expected_refs, refs.as_dict()) def test_contains(self): refs = InfoRefsContainer(BytesIO(_TEST_REFS_SERIALIZED)) self.assertTrue(b'refs/heads/master' in refs) self.assertFalse(b'refs/heads/bar' in refs) def test_get_peeled(self): refs = InfoRefsContainer(BytesIO(_TEST_REFS_SERIALIZED)) # refs/heads/loop does not show up even if it exists self.assertEqual( _TEST_REFS[b'refs/heads/master'], refs.get_peeled(b'refs/heads/master')) class ParseSymrefValueTests(TestCase): def test_valid(self): self.assertEqual( b'refs/heads/foo', parse_symref_value(b'ref: refs/heads/foo')) def test_invalid(self): self.assertRaises(ValueError, parse_symref_value, b'foobar') class StripPeeledRefsTests(TestCase): all_refs = { b'refs/heads/master': b'8843d7f92416211de9ebb963ff4ce28125932878', b'refs/heads/testing': b'186a005b134d8639a58b6731c7c1ea821a6eedba', b'refs/tags/1.0.0': b'a93db4b0360cc635a2b93675010bac8d101f73f0', b'refs/tags/1.0.0^{}': b'a93db4b0360cc635a2b93675010bac8d101f73f0', b'refs/tags/2.0.0': b'0749936d0956c661ac8f8d3483774509c165f89e', b'refs/tags/2.0.0^{}': b'0749936d0956c661ac8f8d3483774509c165f89e', } non_peeled_refs = { b'refs/heads/master': b'8843d7f92416211de9ebb963ff4ce28125932878', b'refs/heads/testing': b'186a005b134d8639a58b6731c7c1ea821a6eedba', b'refs/tags/1.0.0': b'a93db4b0360cc635a2b93675010bac8d101f73f0', b'refs/tags/2.0.0': b'0749936d0956c661ac8f8d3483774509c165f89e', } def test_strip_peeled_refs(self): # Simple check of two dicts self.assertEqual( strip_peeled_refs(self.all_refs), self.non_peeled_refs)