diff --git a/dulwich/index.py b/dulwich/index.py index 4cd6093f..6338d608 100644 --- a/dulwich/index.py +++ b/dulwich/index.py @@ -1,589 +1,597 @@ # index.py -- File parser/writer for the git index file # Copyright (C) 2008-2013 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Parser for the git index file format.""" import collections import errno import os import stat import struct import sys from dulwich.file import GitFile from dulwich.objects import ( Blob, S_IFGITLINK, S_ISGITLINK, Tree, hex_to_sha, sha_to_hex, ) from dulwich.pack import ( SHA1Reader, SHA1Writer, ) IndexEntry = collections.namedtuple( 'IndexEntry', [ 'ctime', 'mtime', 'dev', 'ino', 'mode', 'uid', 'gid', 'size', 'sha', 'flags']) def pathsplit(path): """Split a /-delimited path into a directory part and a basename. :param path: The path to split. :return: Tuple with directory name and basename """ try: (dirname, basename) = path.rsplit(b"/", 1) except ValueError: return (b"", path) else: return (dirname, basename) def pathjoin(*args): """Join a /-delimited path. """ return b"/".join([p for p in args if p]) def read_cache_time(f): """Read a cache time. :param f: File-like object to read from :return: Tuple with seconds and nanoseconds """ return struct.unpack(">LL", f.read(8)) def write_cache_time(f, t): """Write a cache time. :param f: File-like object to write to :param t: Time to write (as int, float or tuple with secs and nsecs) """ if isinstance(t, int): t = (t, 0) elif isinstance(t, float): (secs, nsecs) = divmod(t, 1.0) t = (int(secs), int(nsecs * 1000000000)) elif not isinstance(t, tuple): raise TypeError(t) f.write(struct.pack(">LL", *t)) def read_cache_entry(f): """Read an entry from a cache file. :param f: File-like object to read from :return: tuple with: device, inode, mode, uid, gid, size, sha, flags """ beginoffset = f.tell() ctime = read_cache_time(f) mtime = read_cache_time(f) (dev, ino, mode, uid, gid, size, sha, flags, ) = \ struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2)) name = f.read((flags & 0x0fff)) # Padding: real_size = ((f.tell() - beginoffset + 8) & ~7) f.read((beginoffset + real_size) - f.tell()) return (name, ctime, mtime, dev, ino, mode, uid, gid, size, sha_to_hex(sha), flags & ~0x0fff) def write_cache_entry(f, entry): """Write an index entry to a file. :param f: File object :param entry: Entry to write, tuple with: (name, ctime, mtime, dev, ino, mode, uid, gid, size, sha, flags) """ beginoffset = f.tell() (name, ctime, mtime, dev, ino, mode, uid, gid, size, sha, flags) = entry write_cache_time(f, ctime) write_cache_time(f, mtime) flags = len(name) | (flags &~ 0x0fff) f.write(struct.pack(b'>LLLLLL20sH', dev & 0xFFFFFFFF, ino & 0xFFFFFFFF, mode, uid, gid, size, hex_to_sha(sha), flags)) f.write(name) real_size = ((f.tell() - beginoffset + 8) & ~7) f.write(b'\0' * ((beginoffset + real_size) - f.tell())) def read_index(f): """Read an index file, yielding the individual entries.""" header = f.read(4) if header != b'DIRC': raise AssertionError("Invalid index file header: %r" % header) (version, num_entries) = struct.unpack(b'>LL', f.read(4 * 2)) assert version in (1, 2) for i in range(num_entries): yield read_cache_entry(f) def read_index_dict(f): """Read an index file and return it as a dictionary. :param f: File object to read from """ ret = {} for x in read_index(f): ret[x[0]] = IndexEntry(*x[1:]) return ret def write_index(f, entries): """Write an index file. :param f: File-like object to write to :param entries: Iterable over the entries to write """ f.write(b'DIRC') f.write(struct.pack(b'>LL', 2, len(entries))) for x in entries: write_cache_entry(f, x) def write_index_dict(f, entries): """Write an index file based on the contents of a dictionary. """ entries_list = [] for name in sorted(entries): entries_list.append((name,) + tuple(entries[name])) write_index(f, entries_list) def cleanup_mode(mode): """Cleanup a mode value. This will return a mode that can be stored in a tree object. :param mode: Mode to clean up. """ if stat.S_ISLNK(mode): return stat.S_IFLNK elif stat.S_ISDIR(mode): return stat.S_IFDIR elif S_ISGITLINK(mode): return S_IFGITLINK ret = stat.S_IFREG | 0o644 ret |= (mode & 0o111) return ret class Index(object): """A Git Index file.""" def __init__(self, filename): """Open an index file. :param filename: Path to the index file """ self._filename = filename self.clear() self.read() @property def path(self): return self._filename def __repr__(self): return "%s(%r)" % (self.__class__.__name__, self._filename) def write(self): """Write current contents of index to disk.""" f = GitFile(self._filename, 'wb') try: f = SHA1Writer(f) write_index_dict(f, self._byname) finally: f.close() def read(self): """Read current contents of index from disk.""" if not os.path.exists(self._filename): return f = GitFile(self._filename, 'rb') try: f = SHA1Reader(f) for x in read_index(f): self[x[0]] = IndexEntry(*x[1:]) # FIXME: Additional data? f.read(os.path.getsize(self._filename)-f.tell()-20) f.check_sha() finally: f.close() def __len__(self): """Number of entries in this index file.""" return len(self._byname) def __getitem__(self, name): """Retrieve entry by relative path. :return: tuple with (ctime, mtime, dev, ino, mode, uid, gid, size, sha, flags) """ return self._byname[name] def __iter__(self): """Iterate over the paths in this index.""" return iter(self._byname) def get_sha1(self, path): """Return the (git object) SHA1 for the object at a path.""" return self[path].sha def get_mode(self, path): """Return the POSIX file mode for the object at a path.""" return self[path].mode def iterblobs(self): """Iterate over path, sha, mode tuples for use with commit_tree.""" for path in self: entry = self[path] yield path, entry.sha, cleanup_mode(entry.mode) def clear(self): """Remove all contents from this index.""" self._byname = {} def __setitem__(self, name, x): assert isinstance(name, bytes) assert len(x) == 10 # Remove the old entry if any self._byname[name] = x def __delitem__(self, name): assert isinstance(name, bytes) del self._byname[name] def iteritems(self): return self._byname.items() def update(self, entries): for name, value in entries.items(): self[name] = value def changes_from_tree(self, object_store, tree, want_unchanged=False): """Find the differences between the contents of this index and a tree. :param object_store: Object store to use for retrieving tree contents :param tree: SHA1 of the root tree :param want_unchanged: Whether unchanged files should be reported :return: Iterator over tuples with (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) """ def lookup_entry(path): entry = self[path] return entry.sha, entry.mode for (name, mode, sha) in changes_from_tree(self._byname.keys(), lookup_entry, object_store, tree, want_unchanged=want_unchanged): yield (name, mode, sha) def commit(self, object_store): """Create a new tree from an index. :param object_store: Object store to save the tree in :return: Root tree SHA """ return commit_tree(object_store, self.iterblobs()) def commit_tree(object_store, blobs): """Commit a new tree. :param object_store: Object store to add trees to :param blobs: Iterable over blob path, sha, mode entries :return: SHA1 of the created tree. """ trees = {b'': {}} def add_tree(path): if path in trees: return trees[path] dirname, basename = pathsplit(path) t = add_tree(dirname) assert isinstance(basename, bytes) newtree = {} t[basename] = newtree trees[path] = newtree return newtree for path, sha, mode in blobs: tree_path, basename = pathsplit(path) tree = add_tree(tree_path) tree[basename] = (mode, sha) def build_tree(path): tree = Tree() for basename, entry in trees[path].items(): if isinstance(entry, dict): mode = stat.S_IFDIR sha = build_tree(pathjoin(path, basename)) else: (mode, sha) = entry tree.add(basename, mode, sha) object_store.add_object(tree) return tree.id return build_tree(b'') def commit_index(object_store, index): """Create a new tree from an index. :param object_store: Object store to save the tree in :param index: Index file :note: This function is deprecated, use index.commit() instead. :return: Root tree sha. """ return commit_tree(object_store, index.iterblobs()) def changes_from_tree(names, lookup_entry, object_store, tree, want_unchanged=False): """Find the differences between the contents of a tree and a working copy. :param names: Iterable of names in the working copy :param lookup_entry: Function to lookup an entry in the working copy :param object_store: Object store to use for retrieving tree contents :param tree: SHA1 of the root tree, or None for an empty tree :param want_unchanged: Whether unchanged files should be reported :return: Iterator over tuples with (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) """ other_names = set(names) if tree is not None: for (name, mode, sha) in object_store.iter_tree_contents(tree): try: (other_sha, other_mode) = lookup_entry(name) except KeyError: # Was removed yield ((name, None), (mode, None), (sha, None)) else: other_names.remove(name) if (want_unchanged or other_sha != sha or other_mode != mode): yield ((name, name), (mode, other_mode), (sha, other_sha)) # Mention added files for name in other_names: try: (other_sha, other_mode) = lookup_entry(name) except KeyError: pass else: yield ((None, name), (None, other_mode), (None, other_sha)) def index_entry_from_stat(stat_val, hex_sha, flags, mode=None): """Create a new index entry from a stat value. :param stat_val: POSIX stat_result instance :param hex_sha: Hex sha of the object :param flags: Index flags """ if mode is None: mode = cleanup_mode(stat_val.st_mode) return (stat_val.st_ctime, stat_val.st_mtime, stat_val.st_dev, stat_val.st_ino, mode, stat_val.st_uid, stat_val.st_gid, stat_val.st_size, hex_sha, flags) def build_file_from_blob(blob, mode, target_path, honor_filemode=True): """Build a file or symlink on disk based on a Git object. :param obj: The git object :param mode: File mode :param target_path: Path to write to :param honor_filemode: An optional flag to honor core.filemode setting in config file, default is core.filemode=True, change executable bit """ if stat.S_ISLNK(mode): # FIXME: This will fail on Windows. What should we do instead? src_path = blob.as_raw_string() try: os.symlink(src_path, target_path) except OSError as e: if e.errno == errno.EEXIST: os.unlink(target_path) os.symlink(src_path, target_path) else: raise else: with open(target_path, 'wb') as f: # Write out file f.write(blob.as_raw_string()) if honor_filemode: os.chmod(target_path, mode) INVALID_DOTNAMES = (b".git", b".", b"..", b"") def validate_path_element_default(element): return element.lower() not in INVALID_DOTNAMES def validate_path_element_ntfs(element): stripped = element.rstrip(b". ").lower() if stripped in INVALID_DOTNAMES: return False if stripped == b"git~1": return False return True def validate_path(path, element_validator=validate_path_element_default): """Default path validator that just checks for .git/.""" parts = path.split(b"/") for p in parts: if not element_validator(p): return False else: return True def build_index_from_tree(root_path, index_path, object_store, tree_id, honor_filemode=True, validate_path_element=validate_path_element_default): """Generate and materialize index from a tree :param tree_id: Tree to materialize :param root_path: Target dir for materialized index files :param index_path: Target path for generated index :param object_store: Non-empty object store holding tree contents :param honor_filemode: An optional flag to honor core.filemode setting in config file, default is core.filemode=True, change executable bit :param validate_path_element: Function to validate path elements to check out; default just refuses .git and .. directories. :note:: existing index is wiped and contents are not merged in a working dir. Suitable only for fresh clones. """ index = Index(index_path) if not isinstance(root_path, bytes): root_path = root_path.encode(sys.getfilesystemencoding()) for entry in object_store.iter_tree_contents(tree_id): if not validate_path(entry.path, validate_path_element): continue full_path = _tree_to_fs_path(root_path, entry.path) if not os.path.exists(os.path.dirname(full_path)): os.makedirs(os.path.dirname(full_path)) # FIXME: Merge new index into working tree if S_ISGITLINK(entry.mode): os.mkdir(full_path) else: obj = object_store[entry.sha] build_file_from_blob(obj, entry.mode, full_path, honor_filemode=honor_filemode) # Add file to index st = os.lstat(full_path) if not honor_filemode or S_ISGITLINK(entry.mode): st = st.__class__((entry.mode, ) + st[1:]) index[entry.path] = index_entry_from_stat(st, entry.sha, 0) index.write() def blob_from_path_and_stat(fs_path, st): """Create a blob from a path and a stat object. :param fs_path: Full file system path to file :param st: A stat object :return: A `Blob` object """ assert isinstance(fs_path, bytes) blob = Blob() if not stat.S_ISLNK(st.st_mode): with open(fs_path, 'rb') as f: blob.data = f.read() else: blob.data = os.readlink(fs_path) return blob def get_unstaged_changes(index, root_path): """Walk through an index and check for differences against working tree. :param index: index to check :param root_path: path in which to find files :return: iterator over paths with unstaged changes """ # For each entry in the index check the sha1 & ensure not staged if not isinstance(root_path, bytes): root_path = root_path.encode(sys.getfilesystemencoding()) for tree_path, entry in index.iteritems(): full_path = _tree_to_fs_path(root_path, tree_path) - blob = blob_from_path_and_stat(full_path, os.lstat(full_path)) - if blob.id != entry.sha: + try: + blob = blob_from_path_and_stat(full_path, os.lstat(full_path)) + except OSError as e: + if e.errno != errno.ENOENT: + raise + # The file was removed, so we assume that counts as + # different from whatever file used to exist. yield tree_path + else: + if blob.id != entry.sha: + yield tree_path os_sep_bytes = os.sep.encode('ascii') def _tree_to_fs_path(root_path, tree_path): """Convert a git tree path to a file system path. :param root_path: Root filesystem path :param tree_path: Git tree path as bytes :return: File system path. """ assert isinstance(tree_path, bytes) if os_sep_bytes != b'/': sep_corrected_path = tree_path.replace(b'/', os_sep_bytes) else: sep_corrected_path = tree_path return os.path.join(root_path, sep_corrected_path) def _fs_to_tree_path(fs_path, fs_encoding=None): """Convert a file system path to a git tree path. :param fs_path: File system path. :param fs_encoding: File system encoding :return: Git tree path as bytes """ if fs_encoding is None: fs_encoding = sys.getfilesystemencoding() if not isinstance(fs_path, bytes): fs_path_bytes = fs_path.encode(fs_encoding) else: fs_path_bytes = fs_path if os_sep_bytes != b'/': tree_path = fs_path_bytes.replace(os_sep_bytes, b'/') else: tree_path = fs_path_bytes return tree_path diff --git a/dulwich/tests/test_index.py b/dulwich/tests/test_index.py index 2e508476..cb856e78 100644 --- a/dulwich/tests/test_index.py +++ b/dulwich/tests/test_index.py @@ -1,543 +1,564 @@ # -*- coding: utf-8 -*- # test_index.py -- Tests for the git index # encoding: utf-8 # Copyright (C) 2008-2009 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Tests for the index.""" from io import BytesIO import os import shutil import stat import struct import sys import tempfile from dulwich.index import ( Index, build_index_from_tree, cleanup_mode, commit_tree, get_unstaged_changes, index_entry_from_stat, read_index, read_index_dict, validate_path_element_default, validate_path_element_ntfs, write_cache_time, write_index, write_index_dict, _tree_to_fs_path, _fs_to_tree_path, ) from dulwich.object_store import ( MemoryObjectStore, ) from dulwich.objects import ( Blob, Commit, Tree, S_IFGITLINK, ) from dulwich.repo import Repo from dulwich.tests import ( TestCase, skipIf, ) class IndexTestCase(TestCase): datadir = os.path.join(os.path.dirname(__file__), 'data/indexes') def get_simple_index(self, name): return Index(os.path.join(self.datadir, name)) class SimpleIndexTestCase(IndexTestCase): def test_len(self): self.assertEqual(1, len(self.get_simple_index("index"))) def test_iter(self): self.assertEqual([b'bla'], list(self.get_simple_index("index"))) def test_getitem(self): self.assertEqual(((1230680220, 0), (1230680220, 0), 2050, 3761020, 33188, 1000, 1000, 0, b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', 0), self.get_simple_index("index")[b"bla"]) def test_empty(self): i = self.get_simple_index("notanindex") self.assertEqual(0, len(i)) self.assertFalse(os.path.exists(i._filename)) def test_against_empty_tree(self): i = self.get_simple_index("index") changes = list(i.changes_from_tree(MemoryObjectStore(), None)) self.assertEqual(1, len(changes)) (oldname, newname), (oldmode, newmode), (oldsha, newsha) = changes[0] self.assertEqual(b'bla', newname) self.assertEqual(b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', newsha) class SimpleIndexWriterTestCase(IndexTestCase): def setUp(self): IndexTestCase.setUp(self) self.tempdir = tempfile.mkdtemp() def tearDown(self): IndexTestCase.tearDown(self) shutil.rmtree(self.tempdir) def test_simple_write(self): entries = [(b'barbla', (1230680220, 0), (1230680220, 0), 2050, 3761020, 33188, 1000, 1000, 0, b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', 0)] filename = os.path.join(self.tempdir, 'test-simple-write-index') with open(filename, 'wb+') as x: write_index(x, entries) with open(filename, 'rb') as x: self.assertEqual(entries, list(read_index(x))) class ReadIndexDictTests(IndexTestCase): def setUp(self): IndexTestCase.setUp(self) self.tempdir = tempfile.mkdtemp() def tearDown(self): IndexTestCase.tearDown(self) shutil.rmtree(self.tempdir) def test_simple_write(self): entries = {b'barbla': ((1230680220, 0), (1230680220, 0), 2050, 3761020, 33188, 1000, 1000, 0, b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', 0)} filename = os.path.join(self.tempdir, 'test-simple-write-index') with open(filename, 'wb+') as x: write_index_dict(x, entries) with open(filename, 'rb') as x: self.assertEqual(entries, read_index_dict(x)) class CommitTreeTests(TestCase): def setUp(self): super(CommitTreeTests, self).setUp() self.store = MemoryObjectStore() def test_single_blob(self): blob = Blob() blob.data = b"foo" self.store.add_object(blob) blobs = [(b"bla", blob.id, stat.S_IFREG)] rootid = commit_tree(self.store, blobs) self.assertEqual(rootid, b"1a1e80437220f9312e855c37ac4398b68e5c1d50") self.assertEqual((stat.S_IFREG, blob.id), self.store[rootid][b"bla"]) self.assertEqual(set([rootid, blob.id]), set(self.store._data.keys())) def test_nested(self): blob = Blob() blob.data = b"foo" self.store.add_object(blob) blobs = [(b"bla/bar", blob.id, stat.S_IFREG)] rootid = commit_tree(self.store, blobs) self.assertEqual(rootid, b"d92b959b216ad0d044671981196781b3258fa537") dirid = self.store[rootid][b"bla"][1] self.assertEqual(dirid, b"c1a1deb9788150829579a8b4efa6311e7b638650") self.assertEqual((stat.S_IFDIR, dirid), self.store[rootid][b"bla"]) self.assertEqual((stat.S_IFREG, blob.id), self.store[dirid][b"bar"]) self.assertEqual(set([rootid, dirid, blob.id]), set(self.store._data.keys())) class CleanupModeTests(TestCase): def test_file(self): self.assertEqual(0o100644, cleanup_mode(0o100000)) def test_executable(self): self.assertEqual(0o100755, cleanup_mode(0o100711)) def test_symlink(self): self.assertEqual(0o120000, cleanup_mode(0o120711)) def test_dir(self): self.assertEqual(0o040000, cleanup_mode(0o40531)) def test_submodule(self): self.assertEqual(0o160000, cleanup_mode(0o160744)) class WriteCacheTimeTests(TestCase): def test_write_string(self): f = BytesIO() self.assertRaises(TypeError, write_cache_time, f, "foo") def test_write_int(self): f = BytesIO() write_cache_time(f, 434343) self.assertEqual(struct.pack(">LL", 434343, 0), f.getvalue()) def test_write_tuple(self): f = BytesIO() write_cache_time(f, (434343, 21)) self.assertEqual(struct.pack(">LL", 434343, 21), f.getvalue()) def test_write_float(self): f = BytesIO() write_cache_time(f, 434343.000000021) self.assertEqual(struct.pack(">LL", 434343, 21), f.getvalue()) class IndexEntryFromStatTests(TestCase): def test_simple(self): st = os.stat_result((16877, 131078, 64769, 154, 1000, 1000, 12288, 1323629595, 1324180496, 1324180496)) entry = index_entry_from_stat(st, "22" * 20, 0) self.assertEqual(entry, ( 1324180496, 1324180496, 64769, 131078, 16384, 1000, 1000, 12288, '2222222222222222222222222222222222222222', 0)) def test_override_mode(self): st = os.stat_result((stat.S_IFREG + 0o644, 131078, 64769, 154, 1000, 1000, 12288, 1323629595, 1324180496, 1324180496)) entry = index_entry_from_stat(st, "22" * 20, 0, mode=stat.S_IFREG + 0o755) self.assertEqual(entry, ( 1324180496, 1324180496, 64769, 131078, 33261, 1000, 1000, 12288, '2222222222222222222222222222222222222222', 0)) class BuildIndexTests(TestCase): def assertReasonableIndexEntry(self, index_entry, mode, filesize, sha): self.assertEqual(index_entry[4], mode) # mode self.assertEqual(index_entry[7], filesize) # filesize self.assertEqual(index_entry[8], sha) # sha def assertFileContents(self, path, contents, symlink=False): if symlink: self.assertEqual(os.readlink(path), contents) else: with open(path, 'rb') as f: self.assertEqual(f.read(), contents) def test_empty(self): repo_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, repo_dir) with Repo.init(repo_dir) as repo: tree = Tree() repo.object_store.add_object(tree) build_index_from_tree(repo.path, repo.index_path(), repo.object_store, tree.id) # Verify index entries index = repo.open_index() self.assertEqual(len(index), 0) # Verify no files self.assertEqual(['.git'], os.listdir(repo.path)) def test_git_dir(self): repo_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, repo_dir) with Repo.init(repo_dir) as repo: # Populate repo filea = Blob.from_string(b'file a') filee = Blob.from_string(b'd') tree = Tree() tree[b'.git/a'] = (stat.S_IFREG | 0o644, filea.id) tree[b'c/e'] = (stat.S_IFREG | 0o644, filee.id) repo.object_store.add_objects([(o, None) for o in [filea, filee, tree]]) build_index_from_tree(repo.path, repo.index_path(), repo.object_store, tree.id) # Verify index entries index = repo.open_index() self.assertEqual(len(index), 1) # filea apath = os.path.join(repo.path, '.git', 'a') self.assertFalse(os.path.exists(apath)) # filee epath = os.path.join(repo.path, 'c', 'e') self.assertTrue(os.path.exists(epath)) self.assertReasonableIndexEntry(index[b'c/e'], stat.S_IFREG | 0o644, 1, filee.id) self.assertFileContents(epath, b'd') def test_nonempty(self): repo_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, repo_dir) with Repo.init(repo_dir) as repo: # Populate repo filea = Blob.from_string(b'file a') fileb = Blob.from_string(b'file b') filed = Blob.from_string(b'file d') tree = Tree() tree[b'a'] = (stat.S_IFREG | 0o644, filea.id) tree[b'b'] = (stat.S_IFREG | 0o644, fileb.id) tree[b'c/d'] = (stat.S_IFREG | 0o644, filed.id) repo.object_store.add_objects([(o, None) for o in [filea, fileb, filed, tree]]) build_index_from_tree(repo.path, repo.index_path(), repo.object_store, tree.id) # Verify index entries index = repo.open_index() self.assertEqual(len(index), 3) # filea apath = os.path.join(repo.path, 'a') self.assertTrue(os.path.exists(apath)) self.assertReasonableIndexEntry(index[b'a'], stat.S_IFREG | 0o644, 6, filea.id) self.assertFileContents(apath, b'file a') # fileb bpath = os.path.join(repo.path, 'b') self.assertTrue(os.path.exists(bpath)) self.assertReasonableIndexEntry(index[b'b'], stat.S_IFREG | 0o644, 6, fileb.id) self.assertFileContents(bpath, b'file b') # filed dpath = os.path.join(repo.path, 'c', 'd') self.assertTrue(os.path.exists(dpath)) self.assertReasonableIndexEntry(index[b'c/d'], stat.S_IFREG | 0o644, 6, filed.id) self.assertFileContents(dpath, b'file d') # Verify no extra files self.assertEqual(['.git', 'a', 'b', 'c'], sorted(os.listdir(repo.path))) self.assertEqual(['d'], sorted(os.listdir(os.path.join(repo.path, 'c')))) @skipIf(not getattr(os, 'symlink', None), 'Requires symlink support') def test_symlink(self): repo_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, repo_dir) with Repo.init(repo_dir) as repo: # Populate repo filed = Blob.from_string(b'file d') filee = Blob.from_string(b'd') tree = Tree() tree[b'c/d'] = (stat.S_IFREG | 0o644, filed.id) tree[b'c/e'] = (stat.S_IFLNK, filee.id) # symlink repo.object_store.add_objects([(o, None) for o in [filed, filee, tree]]) build_index_from_tree(repo.path, repo.index_path(), repo.object_store, tree.id) # Verify index entries index = repo.open_index() # symlink to d epath = os.path.join(repo.path, 'c', 'e') self.assertTrue(os.path.exists(epath)) self.assertReasonableIndexEntry( index[b'c/e'], stat.S_IFLNK, 0 if sys.platform == 'win32' else 1, filee.id) self.assertFileContents(epath, 'd', symlink=True) def test_no_decode_encode(self): repo_dir = tempfile.mkdtemp() repo_dir_bytes = repo_dir.encode(sys.getfilesystemencoding()) self.addCleanup(shutil.rmtree, repo_dir) with Repo.init(repo_dir) as repo: # Populate repo file = Blob.from_string(b'foo') tree = Tree() latin1_name = u'À'.encode('latin1') utf8_name = u'À'.encode('utf8') tree[latin1_name] = (stat.S_IFREG | 0o644, file.id) tree[utf8_name] = (stat.S_IFREG | 0o644, file.id) repo.object_store.add_objects( [(o, None) for o in [file, tree]]) build_index_from_tree( repo.path, repo.index_path(), repo.object_store, tree.id) # Verify index entries index = repo.open_index() latin1_path = os.path.join(repo_dir_bytes, latin1_name) self.assertTrue(os.path.exists(latin1_path)) utf8_path = os.path.join(repo_dir_bytes, utf8_name) self.assertTrue(os.path.exists(utf8_path)) def test_git_submodule(self): repo_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, repo_dir) with Repo.init(repo_dir) as repo: filea = Blob.from_string(b'file alalala') subtree = Tree() subtree[b'a'] = (stat.S_IFREG | 0o644, filea.id) c = Commit() c.tree = subtree.id c.committer = c.author = b'Somebody ' c.commit_time = c.author_time = 42342 c.commit_timezone = c.author_timezone = 0 c.parents = [] c.message = b'Subcommit' tree = Tree() tree[b'c'] = (S_IFGITLINK, c.id) repo.object_store.add_objects( [(o, None) for o in [tree]]) build_index_from_tree(repo.path, repo.index_path(), repo.object_store, tree.id) # Verify index entries index = repo.open_index() self.assertEqual(len(index), 1) # filea apath = os.path.join(repo.path, 'c/a') self.assertFalse(os.path.exists(apath)) # dir c cpath = os.path.join(repo.path, 'c') self.assertTrue(os.path.isdir(cpath)) self.assertEqual(index[b'c'][4], S_IFGITLINK) # mode self.assertEqual(index[b'c'][8], c.id) # sha class GetUnstagedChangesTests(TestCase): def test_get_unstaged_changes(self): """Unit test for get_unstaged_changes.""" repo_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, repo_dir) with Repo.init(repo_dir) as repo: # Commit a dummy file then modify it foo1_fullpath = os.path.join(repo_dir, 'foo1') with open(foo1_fullpath, 'wb') as f: f.write(b'origstuff') foo2_fullpath = os.path.join(repo_dir, 'foo2') with open(foo2_fullpath, 'wb') as f: f.write(b'origstuff') repo.stage(['foo1', 'foo2']) repo.do_commit(b'test status', author=b'', committer=b'') with open(foo1_fullpath, 'wb') as f: f.write(b'newstuff') # modify access and modify time of path os.utime(foo1_fullpath, (0, 0)) changes = get_unstaged_changes(repo.open_index(), repo_dir) self.assertEqual(list(changes), [b'foo1']) + def test_get_unstaged_deleted_changes(self): + """Unit test for get_unstaged_changes.""" + + repo_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, repo_dir) + with Repo.init(repo_dir) as repo: + + # Commit a dummy file then remove it + foo1_fullpath = os.path.join(repo_dir, 'foo1') + with open(foo1_fullpath, 'wb') as f: + f.write(b'origstuff') + + repo.stage(['foo1']) + repo.do_commit(b'test status', author=b'', committer=b'') + + os.unlink(foo1_fullpath) + + changes = get_unstaged_changes(repo.open_index(), repo_dir) + + self.assertEqual(list(changes), [b'foo1']) + class TestValidatePathElement(TestCase): def test_default(self): self.assertTrue(validate_path_element_default(b"bla")) self.assertTrue(validate_path_element_default(b".bla")) self.assertFalse(validate_path_element_default(b".git")) self.assertFalse(validate_path_element_default(b".giT")) self.assertFalse(validate_path_element_default(b"..")) self.assertTrue(validate_path_element_default(b"git~1")) def test_ntfs(self): self.assertTrue(validate_path_element_ntfs(b"bla")) self.assertTrue(validate_path_element_ntfs(b".bla")) self.assertFalse(validate_path_element_ntfs(b".git")) self.assertFalse(validate_path_element_ntfs(b".giT")) self.assertFalse(validate_path_element_ntfs(b"..")) self.assertFalse(validate_path_element_ntfs(b"git~1")) class TestTreeFSPathConversion(TestCase): def test_tree_to_fs_path(self): tree_path = u'délwíçh/foo'.encode('utf8') fs_path = _tree_to_fs_path(b'/prefix/path', tree_path) self.assertEqual( fs_path, os.path.join(u'/prefix/path', u'délwíçh', u'foo').encode('utf8')) def test_fs_to_tree_path_str(self): fs_path = os.path.join(os.path.join(u'délwíçh', u'foo')) tree_path = _fs_to_tree_path(fs_path, "utf-8") self.assertEqual(tree_path, u'délwíçh/foo'.encode("utf-8")) def test_fs_to_tree_path_bytes(self): fs_path = os.path.join(os.path.join(u'délwíçh', u'foo').encode('utf8')) tree_path = _fs_to_tree_path(fs_path, "utf-8") self.assertEqual(tree_path, u'délwíçh/foo'.encode('utf8'))