diff --git a/dulwich/index.py b/dulwich/index.py index 3a8c5209..15aa5483 100644 --- a/dulwich/index.py +++ b/dulwich/index.py @@ -1,587 +1,586 @@ # index.py -- File parser/writer for the git index file # Copyright (C) 2008-2013 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Parser for the git index file format.""" import collections import errno import os import stat import struct import sys from dulwich.file import GitFile from dulwich.objects import ( Blob, S_IFGITLINK, S_ISGITLINK, Tree, hex_to_sha, sha_to_hex, ) from dulwich.pack import ( SHA1Reader, SHA1Writer, ) IndexEntry = collections.namedtuple( 'IndexEntry', [ 'ctime', 'mtime', 'dev', 'ino', 'mode', 'uid', 'gid', 'size', 'sha', 'flags']) def pathsplit(path): """Split a /-delimited path into a directory part and a basename. :param path: The path to split. :return: Tuple with directory name and basename """ try: (dirname, basename) = path.rsplit(b"/", 1) except ValueError: return (b"", path) else: return (dirname, basename) def pathjoin(*args): """Join a /-delimited path. """ return b"/".join([p for p in args if p]) def read_cache_time(f): """Read a cache time. :param f: File-like object to read from :return: Tuple with seconds and nanoseconds """ return struct.unpack(">LL", f.read(8)) def write_cache_time(f, t): """Write a cache time. :param f: File-like object to write to :param t: Time to write (as int, float or tuple with secs and nsecs) """ if isinstance(t, int): t = (t, 0) elif isinstance(t, float): (secs, nsecs) = divmod(t, 1.0) t = (int(secs), int(nsecs * 1000000000)) elif not isinstance(t, tuple): raise TypeError(t) f.write(struct.pack(">LL", *t)) def read_cache_entry(f): """Read an entry from a cache file. :param f: File-like object to read from :return: tuple with: device, inode, mode, uid, gid, size, sha, flags """ beginoffset = f.tell() ctime = read_cache_time(f) mtime = read_cache_time(f) (dev, ino, mode, uid, gid, size, sha, flags, ) = \ struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2)) name = f.read((flags & 0x0fff)) # Padding: real_size = ((f.tell() - beginoffset + 8) & ~7) f.read((beginoffset + real_size) - f.tell()) return (name, ctime, mtime, dev, ino, mode, uid, gid, size, sha_to_hex(sha), flags & ~0x0fff) def write_cache_entry(f, entry): """Write an index entry to a file. :param f: File object :param entry: Entry to write, tuple with: (name, ctime, mtime, dev, ino, mode, uid, gid, size, sha, flags) """ beginoffset = f.tell() (name, ctime, mtime, dev, ino, mode, uid, gid, size, sha, flags) = entry write_cache_time(f, ctime) write_cache_time(f, mtime) flags = len(name) | (flags &~ 0x0fff) f.write(struct.pack(b'>LLLLLL20sH', dev & 0xFFFFFFFF, ino & 0xFFFFFFFF, mode, uid, gid, size, hex_to_sha(sha), flags)) f.write(name) real_size = ((f.tell() - beginoffset + 8) & ~7) f.write(b'\0' * ((beginoffset + real_size) - f.tell())) def read_index(f): """Read an index file, yielding the individual entries.""" header = f.read(4) if header != b'DIRC': raise AssertionError("Invalid index file header: %r" % header) (version, num_entries) = struct.unpack(b'>LL', f.read(4 * 2)) assert version in (1, 2) for i in range(num_entries): yield read_cache_entry(f) def read_index_dict(f): """Read an index file and return it as a dictionary. :param f: File object to read from """ ret = {} for x in read_index(f): ret[x[0]] = IndexEntry(*x[1:]) return ret def write_index(f, entries): """Write an index file. :param f: File-like object to write to :param entries: Iterable over the entries to write """ f.write(b'DIRC') f.write(struct.pack(b'>LL', 2, len(entries))) for x in entries: write_cache_entry(f, x) def write_index_dict(f, entries): """Write an index file based on the contents of a dictionary. """ entries_list = [] for name in sorted(entries): entries_list.append((name,) + tuple(entries[name])) write_index(f, entries_list) def cleanup_mode(mode): """Cleanup a mode value. This will return a mode that can be stored in a tree object. :param mode: Mode to clean up. """ if stat.S_ISLNK(mode): return stat.S_IFLNK elif stat.S_ISDIR(mode): return stat.S_IFDIR elif S_ISGITLINK(mode): return S_IFGITLINK ret = stat.S_IFREG | 0o644 ret |= (mode & 0o111) return ret class Index(object): """A Git Index file.""" def __init__(self, filename): """Open an index file. :param filename: Path to the index file """ self._filename = filename self.clear() self.read() @property def path(self): return self._filename def __repr__(self): return "%s(%r)" % (self.__class__.__name__, self._filename) def write(self): """Write current contents of index to disk.""" f = GitFile(self._filename, 'wb') try: f = SHA1Writer(f) write_index_dict(f, self._byname) finally: f.close() def read(self): """Read current contents of index from disk.""" if not os.path.exists(self._filename): return f = GitFile(self._filename, 'rb') try: f = SHA1Reader(f) for x in read_index(f): self[x[0]] = IndexEntry(*x[1:]) # FIXME: Additional data? f.read(os.path.getsize(self._filename)-f.tell()-20) f.check_sha() finally: f.close() def __len__(self): """Number of entries in this index file.""" return len(self._byname) def __getitem__(self, name): """Retrieve entry by relative path. :return: tuple with (ctime, mtime, dev, ino, mode, uid, gid, size, sha, flags) """ return self._byname[name] def __iter__(self): """Iterate over the paths in this index.""" return iter(self._byname) def get_sha1(self, path): """Return the (git object) SHA1 for the object at a path.""" return self[path].sha def get_mode(self, path): """Return the POSIX file mode for the object at a path.""" return self[path].mode def iterblobs(self): """Iterate over path, sha, mode tuples for use with commit_tree.""" for path in self: entry = self[path] yield path, entry.sha, cleanup_mode(entry.mode) def clear(self): """Remove all contents from this index.""" self._byname = {} def __setitem__(self, name, x): assert isinstance(name, bytes) assert len(x) == 10 # Remove the old entry if any self._byname[name] = x def __delitem__(self, name): assert isinstance(name, bytes) del self._byname[name] def iteritems(self): return self._byname.items() def update(self, entries): for name, value in entries.items(): self[name] = value def changes_from_tree(self, object_store, tree, want_unchanged=False): """Find the differences between the contents of this index and a tree. :param object_store: Object store to use for retrieving tree contents :param tree: SHA1 of the root tree :param want_unchanged: Whether unchanged files should be reported :return: Iterator over tuples with (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) """ def lookup_entry(path): entry = self[path] return entry.sha, entry.mode for (name, mode, sha) in changes_from_tree(self._byname.keys(), lookup_entry, object_store, tree, want_unchanged=want_unchanged): yield (name, mode, sha) def commit(self, object_store): """Create a new tree from an index. :param object_store: Object store to save the tree in :return: Root tree SHA """ return commit_tree(object_store, self.iterblobs()) def commit_tree(object_store, blobs): """Commit a new tree. :param object_store: Object store to add trees to :param blobs: Iterable over blob path, sha, mode entries :return: SHA1 of the created tree. """ trees = {b'': {}} def add_tree(path): if path in trees: return trees[path] dirname, basename = pathsplit(path) t = add_tree(dirname) assert isinstance(basename, bytes) newtree = {} t[basename] = newtree trees[path] = newtree return newtree for path, sha, mode in blobs: tree_path, basename = pathsplit(path) tree = add_tree(tree_path) tree[basename] = (mode, sha) def build_tree(path): tree = Tree() for basename, entry in trees[path].items(): if isinstance(entry, dict): mode = stat.S_IFDIR sha = build_tree(pathjoin(path, basename)) else: (mode, sha) = entry tree.add(basename, mode, sha) object_store.add_object(tree) return tree.id return build_tree(b'') def commit_index(object_store, index): """Create a new tree from an index. :param object_store: Object store to save the tree in :param index: Index file :note: This function is deprecated, use index.commit() instead. :return: Root tree sha. """ return commit_tree(object_store, index.iterblobs()) def changes_from_tree(names, lookup_entry, object_store, tree, want_unchanged=False): """Find the differences between the contents of a tree and a working copy. :param names: Iterable of names in the working copy :param lookup_entry: Function to lookup an entry in the working copy :param object_store: Object store to use for retrieving tree contents :param tree: SHA1 of the root tree, or None for an empty tree :param want_unchanged: Whether unchanged files should be reported :return: Iterator over tuples with (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) """ other_names = set(names) if tree is not None: for (name, mode, sha) in object_store.iter_tree_contents(tree): try: (other_sha, other_mode) = lookup_entry(name) except KeyError: # Was removed yield ((name, None), (mode, None), (sha, None)) else: other_names.remove(name) if (want_unchanged or other_sha != sha or other_mode != mode): yield ((name, name), (mode, other_mode), (sha, other_sha)) # Mention added files for name in other_names: try: (other_sha, other_mode) = lookup_entry(name) except KeyError: pass else: yield ((None, name), (None, other_mode), (None, other_sha)) def index_entry_from_stat(stat_val, hex_sha, flags, mode=None): """Create a new index entry from a stat value. :param stat_val: POSIX stat_result instance :param hex_sha: Hex sha of the object :param flags: Index flags """ if mode is None: mode = cleanup_mode(stat_val.st_mode) return (stat_val.st_ctime, stat_val.st_mtime, stat_val.st_dev, stat_val.st_ino, mode, stat_val.st_uid, stat_val.st_gid, stat_val.st_size, hex_sha, flags) def build_file_from_blob(blob, mode, target_path, honor_filemode=True): """Build a file or symlink on disk based on a Git object. :param obj: The git object :param mode: File mode :param target_path: Path to write to :param honor_filemode: An optional flag to honor core.filemode setting in config file, default is core.filemode=True, change executable bit """ if stat.S_ISLNK(mode): # FIXME: This will fail on Windows. What should we do instead? src_path = blob.as_raw_string() try: os.symlink(src_path, target_path) except OSError as e: if e.errno == errno.EEXIST: os.unlink(target_path) os.symlink(src_path, target_path) else: raise else: with open(target_path, 'wb') as f: # Write out file f.write(blob.as_raw_string()) if honor_filemode: os.chmod(target_path, mode) INVALID_DOTNAMES = (b".git", b".", b"..", b"") def validate_path_element_default(element): return element.lower() not in INVALID_DOTNAMES def validate_path_element_ntfs(element): stripped = element.rstrip(b". ").lower() if stripped in INVALID_DOTNAMES: return False if stripped == b"git~1": return False return True def validate_path(path, element_validator=validate_path_element_default): """Default path validator that just checks for .git/.""" parts = path.split(b"/") for p in parts: if not element_validator(p): return False else: return True def build_index_from_tree(root_path, index_path, object_store, tree_id, honor_filemode=True, validate_path_element=validate_path_element_default): """Generate and materialize index from a tree :param tree_id: Tree to materialize :param root_path: Target dir for materialized index files :param index_path: Target path for generated index :param object_store: Non-empty object store holding tree contents :param honor_filemode: An optional flag to honor core.filemode setting in config file, default is core.filemode=True, change executable bit :param validate_path_element: Function to validate path elements to check out; default just refuses .git and .. directories. :note:: existing index is wiped and contents are not merged in a working dir. Suitable only for fresh clones. """ index = Index(index_path) if not isinstance(root_path, bytes): root_path = root_path.encode(sys.getfilesystemencoding()) for entry in object_store.iter_tree_contents(tree_id): if not validate_path(entry.path, validate_path_element): continue full_path = _tree_to_fs_path(root_path, entry.path) if not os.path.exists(os.path.dirname(full_path)): os.makedirs(os.path.dirname(full_path)) # FIXME: Merge new index into working tree obj = object_store[entry.sha] build_file_from_blob(obj, entry.mode, full_path, honor_filemode=honor_filemode) # Add file to index st = os.lstat(full_path) if not honor_filemode: - st_tuple = (entry.mode, st.st_ino, st.st_dev, st.st_nlink, st.st_uid, st.st_gid, st.st_size, st.st_atime, st.st_mtime, st.st_ctime) - st = st.__class__(st_tuple) + st = st.__class__((entry.mode, ) + st[1:]) index[entry.path] = index_entry_from_stat(st, entry.sha, 0) index.write() def blob_from_path_and_stat(fs_path, st): """Create a blob from a path and a stat object. :param fs_path: Full file system path to file :param st: A stat object :return: A `Blob` object """ assert isinstance(fs_path, bytes) blob = Blob() if not stat.S_ISLNK(st.st_mode): with open(fs_path, 'rb') as f: blob.data = f.read() else: blob.data = os.readlink(fs_path) return blob def get_unstaged_changes(index, root_path): """Walk through an index and check for differences against working tree. :param index: index to check :param root_path: path in which to find files :return: iterator over paths with unstaged changes """ # For each entry in the index check the sha1 & ensure not staged if not isinstance(root_path, bytes): root_path = root_path.encode(sys.getfilesystemencoding()) for tree_path, entry in index.iteritems(): full_path = _tree_to_fs_path(root_path, tree_path) blob = blob_from_path_and_stat(full_path, os.lstat(full_path)) if blob.id != entry.sha: yield tree_path os_sep_bytes = os.sep.encode('ascii') def _tree_to_fs_path(root_path, tree_path): """Convert a git tree path to a file system path. :param root_path: Root filesystem path :param tree_path: Git tree path as bytes :return: File system path. """ assert isinstance(tree_path, bytes) if os_sep_bytes != b'/': sep_corrected_path = tree_path.replace(b'/', os_sep_bytes) else: sep_corrected_path = tree_path return os.path.join(root_path, sep_corrected_path) def _fs_to_tree_path(fs_path, fs_encoding=None): """Convert a file system path to a git tree path. :param fs_path: File system path. :param fs_encoding: File system encoding :return: Git tree path as bytes """ if fs_encoding is None: fs_encoding = sys.getfilesystemencoding() if not isinstance(fs_path, bytes): fs_path_bytes = fs_path.encode(fs_encoding) else: fs_path_bytes = fs_path if os_sep_bytes != b'/': tree_path = fs_path_bytes.replace(os_sep_bytes, b'/') else: tree_path = fs_path_bytes return tree_path diff --git a/dulwich/repo.py b/dulwich/repo.py index 627e8aba..4b6180ab 100644 --- a/dulwich/repo.py +++ b/dulwich/repo.py @@ -1,1141 +1,1141 @@ # repo.py -- For dealing with git repositories. # Copyright (C) 2007 James Westby # Copyright (C) 2008-2013 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Repository access. This module contains the base class for git repositories (BaseRepo) and an implementation which uses a repository on local disk (Repo). """ from io import BytesIO import errno import os import sys import stat from dulwich.errors import ( NoIndexPresent, NotBlobError, NotCommitError, NotGitRepository, NotTreeError, NotTagError, CommitError, RefFormatError, HookError, ) from dulwich.file import ( GitFile, ) from dulwich.object_store import ( DiskObjectStore, MemoryObjectStore, ObjectStoreGraphWalker, ) from dulwich.objects import ( check_hexsha, Blob, Commit, ShaFile, Tag, Tree, ) from dulwich.hooks import ( PreCommitShellHook, PostCommitShellHook, CommitMsgShellHook, ) from dulwich.refs import ( check_ref_format, RefsContainer, DictRefsContainer, InfoRefsContainer, DiskRefsContainer, read_packed_refs, read_packed_refs_with_peeled, write_packed_refs, SYMREF, ) import warnings CONTROLDIR = '.git' OBJECTDIR = 'objects' REFSDIR = 'refs' REFSDIR_TAGS = 'tags' REFSDIR_HEADS = 'heads' INDEX_FILENAME = "index" COMMONDIR = 'commondir' GITDIR = 'gitdir' WORKTREES = 'worktrees' BASE_DIRECTORIES = [ ["branches"], [REFSDIR], [REFSDIR, REFSDIR_TAGS], [REFSDIR, REFSDIR_HEADS], ["hooks"], ["info"] ] DEFAULT_REF = b'refs/heads/master' def parse_graftpoints(graftpoints): """Convert a list of graftpoints into a dict :param graftpoints: Iterator of graftpoint lines Each line is formatted as: []* Resulting dictionary is: : [*] https://git.wiki.kernel.org/index.php/GraftPoint """ grafts = {} for l in graftpoints: raw_graft = l.split(None, 1) commit = raw_graft[0] if len(raw_graft) == 2: parents = raw_graft[1].split() else: parents = [] for sha in [commit] + parents: check_hexsha(sha, 'Invalid graftpoint') grafts[commit] = parents return grafts def serialize_graftpoints(graftpoints): """Convert a dictionary of grafts into string The graft dictionary is: : [*] Each line is formatted as: []* https://git.wiki.kernel.org/index.php/GraftPoint """ graft_lines = [] for commit, parents in graftpoints.items(): if parents: graft_lines.append(commit + b' ' + b' '.join(parents)) else: graft_lines.append(commit) return b'\n'.join(graft_lines) class BaseRepo(object): """Base class for a git repository. :ivar object_store: Dictionary-like object for accessing the objects :ivar refs: Dictionary-like object with the refs in this repository """ def __init__(self, object_store, refs): """Open a repository. This shouldn't be called directly, but rather through one of the base classes, such as MemoryRepo or Repo. :param object_store: Object store to use :param refs: Refs container to use """ self.object_store = object_store self.refs = refs self._graftpoints = {} self.hooks = {} def _determine_file_mode(self): """Probe the file-system to determine whether permissions can be trusted. :return: True if permissions can be trusted, False otherwise. """ raise NotImplementedError(self._determine_file_mode) def _init_files(self, bare): """Initialize a default set of named files.""" from dulwich.config import ConfigFile self._put_named_file('description', b"Unnamed repository") f = BytesIO() cf = ConfigFile() cf.set(b"core", b"repositoryformatversion", b"0") if self._determine_file_mode(): - cf.set(b"core", b"filemode", b"true") + cf.set(b"core", b"filemode", True) else: - cf.set(b"core", b"filemode", b"false") + cf.set(b"core", b"filemode", False) cf.set(b"core", b"bare", bare) cf.set(b"core", b"logallrefupdates", True) cf.write_to_file(f) self._put_named_file('config', f.getvalue()) self._put_named_file(os.path.join('info', 'exclude'), b'') def get_named_file(self, path): """Get a file from the control dir with a specific name. Although the filename should be interpreted as a filename relative to the control dir in a disk-based Repo, the object returned need not be pointing to a file in that location. :param path: The path to the file, relative to the control dir. :return: An open file object, or None if the file does not exist. """ raise NotImplementedError(self.get_named_file) def _put_named_file(self, path, contents): """Write a file to the control dir with the given name and contents. :param path: The path to the file, relative to the control dir. :param contents: A string to write to the file. """ raise NotImplementedError(self._put_named_file) def open_index(self): """Open the index for this repository. :raise NoIndexPresent: If no index is present :return: The matching `Index` """ raise NotImplementedError(self.open_index) def fetch(self, target, determine_wants=None, progress=None): """Fetch objects into another repository. :param target: The target repository :param determine_wants: Optional function to determine what refs to fetch. :param progress: Optional progress function :return: The local refs """ if determine_wants is None: determine_wants = target.object_store.determine_wants_all target.object_store.add_objects( self.fetch_objects(determine_wants, target.get_graph_walker(), progress)) return self.get_refs() def fetch_objects(self, determine_wants, graph_walker, progress, get_tagged=None): """Fetch the missing objects required for a set of revisions. :param determine_wants: Function that takes a dictionary with heads and returns the list of heads to fetch. :param graph_walker: Object that can iterate over the list of revisions to fetch and has an "ack" method that will be called to acknowledge that a revision is present. :param progress: Simple progress function that will be called with updated progress strings. :param get_tagged: Function that returns a dict of pointed-to sha -> tag sha for including tags. :return: iterator over objects, with __len__ implemented """ wants = determine_wants(self.get_refs()) if not isinstance(wants, list): raise TypeError("determine_wants() did not return a list") shallows = getattr(graph_walker, 'shallow', frozenset()) unshallows = getattr(graph_walker, 'unshallow', frozenset()) if wants == []: # TODO(dborowitz): find a way to short-circuit that doesn't change # this interface. if shallows or unshallows: # Do not send a pack in shallow short-circuit path return None return [] # If the graph walker is set up with an implementation that can # ACK/NAK to the wire, it will write data to the client through # this call as a side-effect. haves = self.object_store.find_common_revisions(graph_walker) # Deal with shallow requests separately because the haves do # not reflect what objects are missing if shallows or unshallows: haves = [] # TODO: filter the haves commits from iter_shas. # the specific commits aren't missing. def get_parents(commit): if commit.id in shallows: return [] return self.get_parents(commit.id, commit) return self.object_store.iter_shas( self.object_store.find_missing_objects( haves, wants, progress, get_tagged, get_parents=get_parents)) def get_graph_walker(self, heads=None): """Retrieve a graph walker. A graph walker is used by a remote repository (or proxy) to find out which objects are present in this repository. :param heads: Repository heads to use (optional) :return: A graph walker object """ if heads is None: heads = self.refs.as_dict(b'refs/heads').values() return ObjectStoreGraphWalker(heads, self.get_parents) def get_refs(self): """Get dictionary with all refs. :return: A ``dict`` mapping ref names to SHA1s """ return self.refs.as_dict() def head(self): """Return the SHA1 pointed at by HEAD.""" return self.refs[b'HEAD'] def _get_object(self, sha, cls): assert len(sha) in (20, 40) ret = self.get_object(sha) if not isinstance(ret, cls): if cls is Commit: raise NotCommitError(ret) elif cls is Blob: raise NotBlobError(ret) elif cls is Tree: raise NotTreeError(ret) elif cls is Tag: raise NotTagError(ret) else: raise Exception("Type invalid: %r != %r" % ( ret.type_name, cls.type_name)) return ret def get_object(self, sha): """Retrieve the object with the specified SHA. :param sha: SHA to retrieve :return: A ShaFile object :raise KeyError: when the object can not be found """ return self.object_store[sha] def get_parents(self, sha, commit=None): """Retrieve the parents of a specific commit. If the specific commit is a graftpoint, the graft parents will be returned instead. :param sha: SHA of the commit for which to retrieve the parents :param commit: Optional commit matching the sha :return: List of parents """ try: return self._graftpoints[sha] except KeyError: if commit is None: commit = self[sha] return commit.parents def get_config(self): """Retrieve the config object. :return: `ConfigFile` object for the ``.git/config`` file. """ raise NotImplementedError(self.get_config) def get_description(self): """Retrieve the description for this repository. :return: String with the description of the repository as set by the user. """ raise NotImplementedError(self.get_description) def set_description(self, description): """Set the description for this repository. :param description: Text to set as description for this repository. """ raise NotImplementedError(self.set_description) def get_config_stack(self): """Return a config stack for this repository. This stack accesses the configuration for both this repository itself (.git/config) and the global configuration, which usually lives in ~/.gitconfig. :return: `Config` instance for this repository """ from dulwich.config import StackedConfig backends = [self.get_config()] + StackedConfig.default_backends() return StackedConfig(backends, writable=backends[0]) def get_peeled(self, ref): """Get the peeled value of a ref. :param ref: The refname to peel. :return: The fully-peeled SHA1 of a tag object, after peeling all intermediate tags; if the original ref does not point to a tag, this will equal the original SHA1. """ cached = self.refs.get_peeled(ref) if cached is not None: return cached return self.object_store.peel_sha(self.refs[ref]).id def get_walker(self, include=None, *args, **kwargs): """Obtain a walker for this repository. :param include: Iterable of SHAs of commits to include along with their ancestors. Defaults to [HEAD] :param exclude: Iterable of SHAs of commits to exclude along with their ancestors, overriding includes. :param order: ORDER_* constant specifying the order of results. Anything other than ORDER_DATE may result in O(n) memory usage. :param reverse: If True, reverse the order of output, requiring O(n) memory. :param max_entries: The maximum number of entries to yield, or None for no limit. :param paths: Iterable of file or subtree paths to show entries for. :param rename_detector: diff.RenameDetector object for detecting renames. :param follow: If True, follow path across renames/copies. Forces a default rename_detector. :param since: Timestamp to list commits after. :param until: Timestamp to list commits before. :param queue_cls: A class to use for a queue of commits, supporting the iterator protocol. The constructor takes a single argument, the Walker. :return: A `Walker` object """ from dulwich.walk import Walker if include is None: include = [self.head()] if isinstance(include, str): include = [include] kwargs['get_parents'] = lambda commit: self.get_parents(commit.id, commit) return Walker(self.object_store, include, *args, **kwargs) def __getitem__(self, name): """Retrieve a Git object by SHA1 or ref. :param name: A Git object SHA1 or a ref name :return: A `ShaFile` object, such as a Commit or Blob :raise KeyError: when the specified ref or object does not exist """ if not isinstance(name, bytes): raise TypeError("'name' must be bytestring, not %.80s" % type(name).__name__) if len(name) in (20, 40): try: return self.object_store[name] except (KeyError, ValueError): pass try: return self.object_store[self.refs[name]] except RefFormatError: raise KeyError(name) def __contains__(self, name): """Check if a specific Git object or ref is present. :param name: Git object SHA1 or ref name """ if len(name) in (20, 40): return name in self.object_store or name in self.refs else: return name in self.refs def __setitem__(self, name, value): """Set a ref. :param name: ref name :param value: Ref value - either a ShaFile object, or a hex sha """ if name.startswith(b"refs/") or name == b'HEAD': if isinstance(value, ShaFile): self.refs[name] = value.id elif isinstance(value, bytes): self.refs[name] = value else: raise TypeError(value) else: raise ValueError(name) def __delitem__(self, name): """Remove a ref. :param name: Name of the ref to remove """ if name.startswith(b"refs/") or name == b"HEAD": del self.refs[name] else: raise ValueError(name) def _get_user_identity(self): """Determine the identity to use for new commits. """ config = self.get_config_stack() return (config.get((b"user", ), b"name") + b" <" + config.get((b"user", ), b"email") + b">") def _add_graftpoints(self, updated_graftpoints): """Add or modify graftpoints :param updated_graftpoints: Dict of commit shas to list of parent shas """ # Simple validation for commit, parents in updated_graftpoints.items(): for sha in [commit] + parents: check_hexsha(sha, 'Invalid graftpoint') self._graftpoints.update(updated_graftpoints) def _remove_graftpoints(self, to_remove=[]): """Remove graftpoints :param to_remove: List of commit shas """ for sha in to_remove: del self._graftpoints[sha] def do_commit(self, message=None, committer=None, author=None, commit_timestamp=None, commit_timezone=None, author_timestamp=None, author_timezone=None, tree=None, encoding=None, ref=b'HEAD', merge_heads=None): """Create a new commit. :param message: Commit message :param committer: Committer fullname :param author: Author fullname (defaults to committer) :param commit_timestamp: Commit timestamp (defaults to now) :param commit_timezone: Commit timestamp timezone (defaults to GMT) :param author_timestamp: Author timestamp (defaults to commit timestamp) :param author_timezone: Author timestamp timezone (defaults to commit timestamp timezone) :param tree: SHA1 of the tree root to use (if not specified the current index will be committed). :param encoding: Encoding :param ref: Optional ref to commit to (defaults to current branch) :param merge_heads: Merge heads (defaults to .git/MERGE_HEADS) :return: New commit SHA1 """ import time c = Commit() if tree is None: index = self.open_index() c.tree = index.commit(self.object_store) else: if len(tree) != 40: raise ValueError("tree must be a 40-byte hex sha string") c.tree = tree try: self.hooks['pre-commit'].execute() except HookError as e: raise CommitError(e) except KeyError: # no hook defined, silent fallthrough pass if merge_heads is None: # FIXME: Read merge heads from .git/MERGE_HEADS merge_heads = [] if committer is None: # FIXME: Support GIT_COMMITTER_NAME/GIT_COMMITTER_EMAIL environment # variables committer = self._get_user_identity() c.committer = committer if commit_timestamp is None: # FIXME: Support GIT_COMMITTER_DATE environment variable commit_timestamp = time.time() c.commit_time = int(commit_timestamp) if commit_timezone is None: # FIXME: Use current user timezone rather than UTC commit_timezone = 0 c.commit_timezone = commit_timezone if author is None: # FIXME: Support GIT_AUTHOR_NAME/GIT_AUTHOR_EMAIL environment # variables author = committer c.author = author if author_timestamp is None: # FIXME: Support GIT_AUTHOR_DATE environment variable author_timestamp = commit_timestamp c.author_time = int(author_timestamp) if author_timezone is None: author_timezone = commit_timezone c.author_timezone = author_timezone if encoding is not None: c.encoding = encoding if message is None: # FIXME: Try to read commit message from .git/MERGE_MSG raise ValueError("No commit message specified") try: c.message = self.hooks['commit-msg'].execute(message) if c.message is None: c.message = message except HookError as e: raise CommitError(e) except KeyError: # no hook defined, message not modified c.message = message if ref is None: # Create a dangling commit c.parents = merge_heads self.object_store.add_object(c) else: try: old_head = self.refs[ref] c.parents = [old_head] + merge_heads self.object_store.add_object(c) ok = self.refs.set_if_equals(ref, old_head, c.id) except KeyError: c.parents = merge_heads self.object_store.add_object(c) ok = self.refs.add_if_new(ref, c.id) if not ok: # Fail if the atomic compare-and-swap failed, leaving the commit and # all its objects as garbage. raise CommitError("%s changed during commit" % (ref,)) try: self.hooks['post-commit'].execute() except HookError as e: # silent failure warnings.warn("post-commit hook failed: %s" % e, UserWarning) except KeyError: # no hook defined, silent fallthrough pass return c.id def read_gitfile(f): """Read a ``.git`` file. The first line of the file should start with "gitdir: " :param f: File-like object to read from :return: A path """ cs = f.read() if not cs.startswith("gitdir: "): raise ValueError("Expected file to start with 'gitdir: '") return cs[len("gitdir: "):].rstrip("\n") class Repo(BaseRepo): """A git repository backed by local disk. To open an existing repository, call the contructor with the path of the repository. To create a new repository, use the Repo.init class method. """ def __init__(self, root): hidden_path = os.path.join(root, CONTROLDIR) if os.path.isdir(os.path.join(hidden_path, OBJECTDIR)): self.bare = False self._controldir = hidden_path elif (os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir(os.path.join(root, REFSDIR))): self.bare = True self._controldir = root elif os.path.isfile(hidden_path): self.bare = False with open(hidden_path, 'r') as f: path = read_gitfile(f) self.bare = False self._controldir = os.path.join(root, path) else: raise NotGitRepository( "No git repository was found at %(path)s" % dict(path=root) ) commondir = self.get_named_file(COMMONDIR) if commondir is not None: with commondir: self._commondir = os.path.join( self.controldir(), commondir.read().rstrip(b"\r\n").decode(sys.getfilesystemencoding())) else: self._commondir = self._controldir self.path = root object_store = DiskObjectStore( os.path.join(self.commondir(), OBJECTDIR)) refs = DiskRefsContainer(self.commondir(), self._controldir) BaseRepo.__init__(self, object_store, refs) self._graftpoints = {} graft_file = self.get_named_file(os.path.join("info", "grafts"), basedir=self.commondir()) if graft_file: with graft_file: self._graftpoints.update(parse_graftpoints(graft_file)) graft_file = self.get_named_file("shallow", basedir=self.commondir()) if graft_file: with graft_file: self._graftpoints.update(parse_graftpoints(graft_file)) self.hooks['pre-commit'] = PreCommitShellHook(self.controldir()) self.hooks['commit-msg'] = CommitMsgShellHook(self.controldir()) self.hooks['post-commit'] = PostCommitShellHook(self.controldir()) @classmethod def discover(cls, start='.'): """Iterate parent directories to discover a repository Return a Repo object for the first parent directory that looks like a Git repository. :param start: The directory to start discovery from (defaults to '.') """ remaining = True path = os.path.abspath(start) while remaining: try: return cls(path) except NotGitRepository: path, remaining = os.path.split(path) raise NotGitRepository( "No git repository was found at %(path)s" % dict(path=start) ) def controldir(self): """Return the path of the control directory.""" return self._controldir def commondir(self): """Return the path of the common directory. For a main working tree, it is identical to `controldir()`. For a linked working tree, it is the control directory of the main working tree.""" return self._commondir def _determine_file_mode(self): """Probe the file-system to determine whether permissions can be trusted. :return: True if permissions can be trusted, False otherwise. """ fname = os.path.join(self.path, '.probe-permissions') with open(fname, 'w') as f: f.write('') st1 = os.lstat(fname) os.chmod(fname, st1.st_mode ^ stat.S_IXUSR) st2 = os.lstat(fname) os.unlink(fname) mode_differs = st1.st_mode != st2.st_mode st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0 return mode_differs and st2_has_exec def _put_named_file(self, path, contents): """Write a file to the control dir with the given name and contents. :param path: The path to the file, relative to the control dir. :param contents: A string to write to the file. """ path = path.lstrip(os.path.sep) with GitFile(os.path.join(self.controldir(), path), 'wb') as f: f.write(contents) def get_named_file(self, path, basedir=None): """Get a file from the control dir with a specific name. Although the filename should be interpreted as a filename relative to the control dir in a disk-based Repo, the object returned need not be pointing to a file in that location. :param path: The path to the file, relative to the control dir. :param basedir: Optional argument that specifies an alternative to the control dir. :return: An open file object, or None if the file does not exist. """ # TODO(dborowitz): sanitize filenames, since this is used directly by # the dumb web serving code. if basedir is None: basedir = self.controldir() path = path.lstrip(os.path.sep) try: return open(os.path.join(basedir, path), 'rb') except (IOError, OSError) as e: if e.errno == errno.ENOENT: return None raise def index_path(self): """Return path to the index file.""" return os.path.join(self.controldir(), INDEX_FILENAME) def open_index(self): """Open the index for this repository. :raise NoIndexPresent: If no index is present :return: The matching `Index` """ from dulwich.index import Index if not self.has_index(): raise NoIndexPresent() return Index(self.index_path()) def has_index(self): """Check if an index is present.""" # Bare repos must never have index files; non-bare repos may have a # missing index file, which is treated as empty. return not self.bare def stage(self, fs_paths): """Stage a set of paths. :param fs_paths: List of paths, relative to the repository path """ root_path_bytes = self.path.encode(sys.getfilesystemencoding()) if not isinstance(fs_paths, list): fs_paths = [fs_paths] from dulwich.index import ( blob_from_path_and_stat, index_entry_from_stat, _fs_to_tree_path, ) index = self.open_index() for fs_path in fs_paths: if not isinstance(fs_path, bytes): fs_path = fs_path.encode(sys.getfilesystemencoding()) tree_path = _fs_to_tree_path(fs_path) full_path = os.path.join(root_path_bytes, fs_path) try: st = os.lstat(full_path) except OSError: # File no longer exists try: del index[tree_path] except KeyError: pass # already removed else: blob = blob_from_path_and_stat(full_path, st) self.object_store.add_object(blob) index[tree_path] = index_entry_from_stat(st, blob.id, 0) index.write() def clone(self, target_path, mkdir=True, bare=False, origin=b"origin"): """Clone this repository. :param target_path: Target path :param mkdir: Create the target directory :param bare: Whether to create a bare repository :param origin: Base name for refs in target repository cloned from this repository :return: Created repository as `Repo` """ if not bare: target = self.init(target_path, mkdir=mkdir) else: target = self.init_bare(target_path) self.fetch(target) target.refs.import_refs( b'refs/remotes/' + origin, self.refs.as_dict(b'refs/heads')) target.refs.import_refs( b'refs/tags', self.refs.as_dict(b'refs/tags')) try: target.refs.add_if_new(DEFAULT_REF, self.refs[DEFAULT_REF]) except KeyError: pass target_config = target.get_config() encoded_path = self.path if not isinstance(encoded_path, bytes): encoded_path = encoded_path.encode(sys.getfilesystemencoding()) target_config.set((b'remote', b'origin'), b'url', encoded_path) target_config.set((b'remote', b'origin'), b'fetch', b'+refs/heads/*:refs/remotes/origin/*') target_config.write_to_path() # Update target head head_chain, head_sha = self.refs.follow(b'HEAD') if head_chain and head_sha is not None: target.refs.set_symbolic_ref(b'HEAD', head_chain[-1]) target[b'HEAD'] = head_sha if not bare: # Checkout HEAD to target dir target.reset_index() return target def reset_index(self, tree=None): """Reset the index back to a specific tree. :param tree: Tree SHA to reset to, None for current HEAD tree. """ from dulwich.index import ( build_index_from_tree, validate_path_element_default, validate_path_element_ntfs, ) if tree is None: tree = self[b'HEAD'].tree config = self.get_config() honor_filemode = config.get_boolean('core', 'filemode', os.name != "nt") if config.get_boolean('core', 'core.protectNTFS', os.name == "nt"): validate_path_element = validate_path_element_ntfs else: validate_path_element = validate_path_element_default return build_index_from_tree(self.path, self.index_path(), self.object_store, tree, honor_filemode=honor_filemode, validate_path_element=validate_path_element) def get_config(self): """Retrieve the config object. :return: `ConfigFile` object for the ``.git/config`` file. """ from dulwich.config import ConfigFile path = os.path.join(self._controldir, 'config') try: return ConfigFile.from_path(path) except (IOError, OSError) as e: if e.errno != errno.ENOENT: raise ret = ConfigFile() ret.path = path return ret def get_description(self): """Retrieve the description of this repository. :return: A string describing the repository or None. """ path = os.path.join(self._controldir, 'description') try: with GitFile(path, 'rb') as f: return f.read() except (IOError, OSError) as e: if e.errno != errno.ENOENT: raise return None def __repr__(self): return "" % self.path def set_description(self, description): """Set the description for this repository. :param description: Text to set as description for this repository. """ self._put_named_file('description', description) @classmethod def _init_maybe_bare(cls, path, bare): for d in BASE_DIRECTORIES: os.mkdir(os.path.join(path, *d)) DiskObjectStore.init(os.path.join(path, OBJECTDIR)) ret = cls(path) ret.refs.set_symbolic_ref(b'HEAD', DEFAULT_REF) ret._init_files(bare) return ret @classmethod def init(cls, path, mkdir=False): """Create a new repository. :param path: Path in which to create the repository :param mkdir: Whether to create the directory :return: `Repo` instance """ if mkdir: os.mkdir(path) controldir = os.path.join(path, CONTROLDIR) os.mkdir(controldir) cls._init_maybe_bare(controldir, False) return cls(path) @classmethod def _init_new_working_directory(cls, path, main_repo, identifier=None, mkdir=False): """Create a new working directory linked to a repository. :param path: Path in which to create the working tree. :param main_repo: Main repository to reference :param identifier: Worktree identifier :param mkdir: Whether to create the directory :return: `Repo` instance """ if mkdir: os.mkdir(path) if identifier is None: identifier = os.path.basename(path) main_worktreesdir = os.path.join(main_repo.controldir(), WORKTREES) worktree_controldir = os.path.join(main_worktreesdir, identifier) gitdirfile = os.path.join(path, CONTROLDIR) with open(gitdirfile, 'wb') as f: f.write(b'gitdir: ' + worktree_controldir.encode(sys.getfilesystemencoding()) + b'\n') try: os.mkdir(main_worktreesdir) except OSError as e: if e.errno != errno.EEXIST: raise try: os.mkdir(worktree_controldir) except OSError as e: if e.errno != errno.EEXIST: raise with open(os.path.join(worktree_controldir, GITDIR), 'wb') as f: f.write(gitdirfile.encode(sys.getfilesystemencoding()) + b'\n') with open(os.path.join(worktree_controldir, COMMONDIR), 'wb') as f: f.write(b'../..\n') with open(os.path.join(worktree_controldir, 'HEAD'), 'wb') as f: f.write(main_repo.head() + b'\n') r = cls(path) r.reset_index() return r @classmethod def init_bare(cls, path): """Create a new bare repository. ``path`` should already exist and be an emty directory. :param path: Path to create bare repository in :return: a `Repo` instance """ return cls._init_maybe_bare(path, True) create = init_bare def close(self): """Close any files opened by this repository.""" self.object_store.close() class MemoryRepo(BaseRepo): """Repo that stores refs, objects, and named files in memory. MemoryRepos are always bare: they have no working tree and no index, since those have a stronger dependency on the filesystem. """ def __init__(self): from dulwich.config import ConfigFile BaseRepo.__init__(self, MemoryObjectStore(), DictRefsContainer({})) self._named_files = {} self.bare = True self._config = ConfigFile() def _determine_file_mode(self): """Probe the file-system to determine whether permissions can be trusted. :return: True if permissions can be trusted, False otherwise. """ return sys.platform != 'win32' def _put_named_file(self, path, contents): """Write a file to the control dir with the given name and contents. :param path: The path to the file, relative to the control dir. :param contents: A string to write to the file. """ self._named_files[path] = contents def get_named_file(self, path): """Get a file from the control dir with a specific name. Although the filename should be interpreted as a filename relative to the control dir in a disk-baked Repo, the object returned need not be pointing to a file in that location. :param path: The path to the file, relative to the control dir. :return: An open file object, or None if the file does not exist. """ contents = self._named_files.get(path, None) if contents is None: return None return BytesIO(contents) def open_index(self): """Fail to open index for this repo, since it is bare. :raise NoIndexPresent: Raised when no index is present """ raise NoIndexPresent() def get_config(self): """Retrieve the config object. :return: `ConfigFile` object. """ return self._config def get_description(self): """Retrieve the repository description. This defaults to None, for no description. """ return None @classmethod def init_bare(cls, objects, refs): """Create a new bare repository in memory. :param objects: Objects for the new repository, as iterable :param refs: Refs as dictionary, mapping names to object SHA1s """ ret = cls() for obj in objects: ret.object_store.add_object(obj) for refname, sha in refs.items(): ret.refs[refname] = sha ret._init_files(bare=True) return ret diff --git a/dulwich/tests/compat/test_repository.py b/dulwich/tests/compat/test_repository.py index f9fee631..81a321a9 100644 --- a/dulwich/tests/compat/test_repository.py +++ b/dulwich/tests/compat/test_repository.py @@ -1,216 +1,217 @@ # test_repo.py -- Git repo compatibility tests # Copyright (C) 2010 Google, Inc. # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Compatibility tests for dulwich repositories.""" from io import BytesIO from itertools import chain import os import tempfile import sys from dulwich.objects import ( hex_to_sha, ) from dulwich.repo import ( check_ref_format, Repo, ) from dulwich.tests.compat.utils import ( require_git_version, rmtree_ro, run_git_or_fail, CompatTestCase, ) class ObjectStoreTestCase(CompatTestCase): """Tests for git repository compatibility.""" def setUp(self): super(ObjectStoreTestCase, self).setUp() self._repo = self.import_repo('server_new.export') def _run_git(self, args): return run_git_or_fail(args, cwd=self._repo.path) def _parse_refs(self, output): refs = {} for line in BytesIO(output): fields = line.rstrip(b'\n').split(b' ') self.assertEqual(3, len(fields)) refname, type_name, sha = fields check_ref_format(refname[5:]) hex_to_sha(sha) refs[refname] = (type_name, sha) return refs def _parse_objects(self, output): return set(s.rstrip(b'\n').split(b' ')[0] for s in BytesIO(output)) def test_bare(self): self.assertTrue(self._repo.bare) self.assertFalse(os.path.exists(os.path.join(self._repo.path, '.git'))) def test_head(self): output = self._run_git(['rev-parse', 'HEAD']) head_sha = output.rstrip(b'\n') hex_to_sha(head_sha) self.assertEqual(head_sha, self._repo.refs[b'HEAD']) def test_refs(self): output = self._run_git( ['for-each-ref', '--format=%(refname) %(objecttype) %(objectname)']) expected_refs = self._parse_refs(output) actual_refs = {} for refname, sha in self._repo.refs.as_dict().items(): if refname == b'HEAD': continue # handled in test_head obj = self._repo[sha] self.assertEqual(sha, obj.id) actual_refs[refname] = (obj.type_name, obj.id) self.assertEqual(expected_refs, actual_refs) # TODO(dborowitz): peeled ref tests def _get_loose_shas(self): output = self._run_git(['rev-list', '--all', '--objects', '--unpacked']) return self._parse_objects(output) def _get_all_shas(self): output = self._run_git(['rev-list', '--all', '--objects']) return self._parse_objects(output) def assertShasMatch(self, expected_shas, actual_shas_iter): actual_shas = set() for sha in actual_shas_iter: obj = self._repo[sha] self.assertEqual(sha, obj.id) actual_shas.add(sha) self.assertEqual(expected_shas, actual_shas) def test_loose_objects(self): # TODO(dborowitz): This is currently not very useful since fast-imported # repos only contained packed objects. expected_shas = self._get_loose_shas() self.assertShasMatch(expected_shas, self._repo.object_store._iter_loose_objects()) def test_packed_objects(self): expected_shas = self._get_all_shas() - self._get_loose_shas() self.assertShasMatch(expected_shas, chain(*self._repo.object_store.packs)) def test_all_objects(self): expected_shas = self._get_all_shas() self.assertShasMatch(expected_shas, iter(self._repo.object_store)) class WorkingTreeTestCase(ObjectStoreTestCase): """Test for compatibility with git-worktree.""" min_git_version = (2, 5, 0) def create_new_worktree(self, repo_dir, branch): """Create a new worktree using git-worktree. :param repo_dir: The directory of the main working tree. :param branch: The branch or commit to checkout in the new worktree. :returns: The path to the new working tree. """ temp_dir = tempfile.mkdtemp() run_git_or_fail(['worktree', 'add', temp_dir, branch], cwd=repo_dir) self.addCleanup(rmtree_ro, temp_dir) return temp_dir def setUp(self): super(WorkingTreeTestCase, self).setUp() self._worktree_path = self.create_new_worktree(self._repo.path, 'branch') self._worktree_repo = Repo(self._worktree_path) self.addCleanup(self._worktree_repo.close) self._mainworktree_repo = self._repo self._number_of_working_tree = 2 self._repo = self._worktree_repo def test_refs(self): super(WorkingTreeTestCase, self).test_refs() self.assertEqual(self._mainworktree_repo.refs.allkeys(), self._repo.refs.allkeys()) def test_head_equality(self): self.assertNotEqual(self._repo.refs[b'HEAD'], self._mainworktree_repo.refs[b'HEAD']) def test_bare(self): self.assertFalse(self._repo.bare) self.assertTrue(os.path.isfile(os.path.join(self._repo.path, '.git'))) def _parse_worktree_list(self, output): worktrees = [] for line in BytesIO(output): fields = line.rstrip(b'\n').split() worktrees.append(tuple(f.decode() for f in fields)) return worktrees def test_git_worktree_list(self): # 'git worktree list' was introduced in 2.7.0 require_git_version((2, 7, 0)) output = run_git_or_fail(['worktree', 'list'], cwd=self._repo.path) worktrees = self._parse_worktree_list(output) self.assertEqual(len(worktrees), self._number_of_working_tree) self.assertEqual(worktrees[0][1], '(bare)') - self.assertEqual(os.path.normcase(worktrees[0][0]).lower(), - self._mainworktree_repo.path.lower()) + self.assertEqual(os.path.normcase(worktrees[0][0]), + os.path.normcase(self._mainworktree_repo.path)) - output = run_git_or_fail(['worktree', 'list'], cwd=self._mainworktree_repo.path) + output = run_git_or_fail(['worktree', 'list'], + cwd=self._mainworktree_repo.path) worktrees = self._parse_worktree_list(output) self.assertEqual(len(worktrees), self._number_of_working_tree) self.assertEqual(worktrees[0][1], '(bare)') - self.assertEqual(os.path.normcase(worktrees[0][0]).lower(), - self._mainworktree_repo.path.lower()) + self.assertEqual(os.path.normcase(worktrees[0][0]), + os.path.normcase(self._mainworktree_repo.path)) class InitNewWorkingDirectoryTestCase(WorkingTreeTestCase): """Test compatibility of Repo.init_new_working_directory.""" min_git_version = (2, 5, 0) def setUp(self): super(InitNewWorkingDirectoryTestCase, self).setUp() self._other_worktree = self._repo worktree_repo_path = tempfile.mkdtemp() self.addCleanup(rmtree_ro, worktree_repo_path) self._repo = Repo._init_new_working_directory( worktree_repo_path, self._mainworktree_repo) self.addCleanup(self._repo.close) self._number_of_working_tree = 3 def test_head_equality(self): self.assertEqual(self._repo.refs[b'HEAD'], self._mainworktree_repo.refs[b'HEAD']) def test_bare(self): self.assertFalse(self._repo.bare) self.assertTrue(os.path.isfile(os.path.join(self._repo.path, '.git')))