diff --git a/dulwich/refs.py b/dulwich/refs.py index 5d3e132d..84187495 100644 --- a/dulwich/refs.py +++ b/dulwich/refs.py @@ -1,788 +1,788 @@ # refs.py -- For dealing with git refs # Copyright (C) 2008-2013 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Ref handling. """ import errno import os import sys from dulwich.errors import ( PackedRefsException, RefFormatError, ) from dulwich.objects import ( git_line, valid_hexsha, ZERO_SHA, ) from dulwich.file import ( GitFile, ensure_dir_exists, ) SYMREF = b'ref: ' LOCAL_BRANCH_PREFIX = b'refs/heads/' BAD_REF_CHARS = set(b'\177 ~^:?*[') def check_ref_format(refname): """Check if a refname is correctly formatted. Implements all the same rules as git-check-ref-format[1]. [1] http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html :param refname: The refname to check :return: True if refname is valid, False otherwise """ # These could be combined into one big expression, but are listed separately # to parallel [1]. if b'/.' in refname or refname.startswith(b'.'): return False if b'/' not in refname: return False if b'..' in refname: return False for i, c in enumerate(refname): if ord(refname[i:i+1]) < 0o40 or c in BAD_REF_CHARS: return False if refname[-1] in b'/.': return False if refname.endswith(b'.lock'): return False if b'@{' in refname: return False if b'\\' in refname: return False return True class RefsContainer(object): """A container for refs.""" def set_symbolic_ref(self, name, other): """Make a ref point at another ref. :param name: Name of the ref to set :param other: Name of the ref to point at """ raise NotImplementedError(self.set_symbolic_ref) def get_packed_refs(self): """Get contents of the packed-refs file. :return: Dictionary mapping ref names to SHA1s :note: Will return an empty dictionary when no packed-refs file is present. """ raise NotImplementedError(self.get_packed_refs) def get_peeled(self, name): """Return the cached peeled value of a ref, if available. :param name: Name of the ref to peel :return: The peeled value of the ref. If the ref is known not point to a tag, this will be the SHA the ref refers to. If the ref may point to a tag, but no cached information is available, None is returned. """ return None def import_refs(self, base, other): for name, value in other.items(): self[b'/'.join((base, name))] = value def allkeys(self): """All refs present in this container.""" raise NotImplementedError(self.allkeys) def keys(self, base=None): """Refs present in this container. :param base: An optional base to return refs under. :return: An unsorted set of valid refs in this container, including packed refs. """ if base is not None: return self.subkeys(base) else: return self.allkeys() def subkeys(self, base): """Refs present in this container under a base. :param base: The base to return refs under. :return: A set of valid refs in this container under the base; the base prefix is stripped from the ref names returned. """ keys = set() base_len = len(base) + 1 for refname in self.allkeys(): if refname.startswith(base): keys.add(refname[base_len:]) return keys def as_dict(self, base=None): """Return the contents of this container as a dictionary. """ ret = {} keys = self.keys(base) if base is None: base = b'' else: base = base.rstrip(b'/') for key in keys: try: ret[key] = self[(base + b'/' + key).strip(b'/')] except KeyError: continue # Unable to resolve return ret def _check_refname(self, name): """Ensure a refname is valid and lives in refs or is HEAD. HEAD is not a valid refname according to git-check-ref-format, but this class needs to be able to touch HEAD. Also, check_ref_format expects refnames without the leading 'refs/', but this class requires that so it cannot touch anything outside the refs dir (or HEAD). :param name: The name of the reference. :raises KeyError: if a refname is not HEAD or is otherwise not valid. """ if name in (b'HEAD', b'refs/stash'): return if not name.startswith(b'refs/') or not check_ref_format(name[5:]): raise RefFormatError(name) def read_ref(self, refname): """Read a reference without following any references. :param refname: The name of the reference :return: The contents of the ref file, or None if it does not exist. """ contents = self.read_loose_ref(refname) if not contents: contents = self.get_packed_refs().get(refname, None) return contents def read_loose_ref(self, name): """Read a loose reference and return its contents. :param name: the refname to read :return: The contents of the ref file, or None if it does not exist. """ raise NotImplementedError(self.read_loose_ref) def follow(self, name): """Follow a reference name. :return: a tuple of (refnames, sha), wheres refnames are the names of references in the chain """ contents = SYMREF + name depth = 0 refnames = [] while contents.startswith(SYMREF): refname = contents[len(SYMREF):] refnames.append(refname) contents = self.read_ref(refname) if not contents: break depth += 1 if depth > 5: raise KeyError(name) return refnames, contents def _follow(self, name): import warnings warnings.warn( "RefsContainer._follow is deprecated. Use RefsContainer.follow instead.", DeprecationWarning) refnames, contents = self.follow(name) if not refnames: return (None, contents) return (refnames[-1], contents) def __contains__(self, refname): if self.read_ref(refname): return True return False def __getitem__(self, name): """Get the SHA1 for a reference name. This method follows all symbolic references. """ _, sha = self.follow(name) if sha is None: raise KeyError(name) return sha def set_if_equals(self, name, old_ref, new_ref): """Set a refname to new_ref only if it currently equals old_ref. This method follows all symbolic references if applicable for the subclass, and can be used to perform an atomic compare-and-swap operation. :param name: The refname to set. :param old_ref: The old sha the refname must refer to, or None to set unconditionally. :param new_ref: The new sha the refname will refer to. :return: True if the set was successful, False otherwise. """ raise NotImplementedError(self.set_if_equals) def add_if_new(self, name, ref): """Add a new reference only if it does not already exist.""" raise NotImplementedError(self.add_if_new) def __setitem__(self, name, ref): """Set a reference name to point to the given SHA1. This method follows all symbolic references if applicable for the subclass. :note: This method unconditionally overwrites the contents of a reference. To update atomically only if the reference has not changed, use set_if_equals(). :param name: The refname to set. :param ref: The new sha the refname will refer to. """ self.set_if_equals(name, None, ref) def remove_if_equals(self, name, old_ref): """Remove a refname only if it currently equals old_ref. This method does not follow symbolic references, even if applicable for the subclass. It can be used to perform an atomic compare-and-delete operation. :param name: The refname to delete. :param old_ref: The old sha the refname must refer to, or None to delete unconditionally. :return: True if the delete was successful, False otherwise. """ raise NotImplementedError(self.remove_if_equals) def __delitem__(self, name): """Remove a refname. This method does not follow symbolic references, even if applicable for the subclass. :note: This method unconditionally deletes the contents of a reference. To delete atomically only if the reference has not changed, use remove_if_equals(). :param name: The refname to delete. """ self.remove_if_equals(name, None) class DictRefsContainer(RefsContainer): """RefsContainer backed by a simple dict. This container does not support symbolic or packed references and is not threadsafe. """ def __init__(self, refs): self._refs = refs self._peeled = {} def allkeys(self): return self._refs.keys() def read_loose_ref(self, name): return self._refs.get(name, None) def get_packed_refs(self): return {} def set_symbolic_ref(self, name, other): self._refs[name] = SYMREF + other def set_if_equals(self, name, old_ref, new_ref): if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref: return False realnames, _ = self.follow(name) for realname in realnames: self._check_refname(realname) self._refs[realname] = new_ref return True def add_if_new(self, name, ref): if name in self._refs: return False self._refs[name] = ref return True def remove_if_equals(self, name, old_ref): if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref: return False try: del self._refs[name] except KeyError: pass return True def get_peeled(self, name): return self._peeled.get(name) def _update(self, refs): """Update multiple refs; intended only for testing.""" # TODO(dborowitz): replace this with a public function that uses # set_if_equal. self._refs.update(refs) def _update_peeled(self, peeled): """Update cached peeled refs; intended only for testing.""" self._peeled.update(peeled) class InfoRefsContainer(RefsContainer): """Refs container that reads refs from a info/refs file.""" def __init__(self, f): self._refs = {} self._peeled = {} for l in f.readlines(): sha, name = l.rstrip(b'\n').split(b'\t') if name.endswith(b'^{}'): name = name[:-3] if not check_ref_format(name): raise ValueError("invalid ref name %r" % name) self._peeled[name] = sha else: if not check_ref_format(name): raise ValueError("invalid ref name %r" % name) self._refs[name] = sha def allkeys(self): return self._refs.keys() def read_loose_ref(self, name): return self._refs.get(name, None) def get_packed_refs(self): return {} def get_peeled(self, name): try: return self._peeled[name] except KeyError: return self._refs[name] class DiskRefsContainer(RefsContainer): """Refs container that reads refs from disk.""" def __init__(self, path, worktree_path=None): self.path = path - self.worktree_path = worktree_path or path; + self.worktree_path = worktree_path or path self._packed_refs = None self._peeled_refs = None def __repr__(self): return "%s(%r)" % (self.__class__.__name__, self.path) def subkeys(self, base): subkeys = set() path = self.refpath(base) for root, dirs, files in os.walk(path): dir = root[len(path):].strip(os.path.sep).replace(os.path.sep, "/") for filename in files: refname = (("%s/%s" % (dir, filename)) .strip("/").encode(sys.getfilesystemencoding())) # check_ref_format requires at least one /, so we prepend the # base before calling it. if check_ref_format(base + b'/' + refname): subkeys.add(refname) for key in self.get_packed_refs(): if key.startswith(base): subkeys.add(key[len(base):].strip(b'/')) return subkeys def allkeys(self): allkeys = set() if os.path.exists(self.refpath(b'HEAD')): allkeys.add(b'HEAD') path = self.refpath(b'') for root, dirs, files in os.walk(self.refpath(b'refs')): dir = root[len(path):].strip(os.path.sep).replace(os.path.sep, "/") for filename in files: refname = ("%s/%s" % (dir, filename)).encode(sys.getfilesystemencoding()) if check_ref_format(refname): allkeys.add(refname) allkeys.update(self.get_packed_refs()) return allkeys def refpath(self, name): """Return the disk path of a ref. """ if getattr(self.path, "encode", None) and getattr(name, "decode", None): name = name.decode(sys.getfilesystemencoding()) if os.path.sep != "/": name = name.replace("/", os.path.sep) - #TODO: as the 'HEAD' reference is working tree specific, it + # TODO: as the 'HEAD' reference is working tree specific, it # should actually not be a part of RefsContainer if name == 'HEAD': return os.path.join(self.worktree_path, name) else: return os.path.join(self.path, name) def get_packed_refs(self): """Get contents of the packed-refs file. :return: Dictionary mapping ref names to SHA1s :note: Will return an empty dictionary when no packed-refs file is present. """ # TODO: invalidate the cache on repacking if self._packed_refs is None: # set both to empty because we want _peeled_refs to be # None if and only if _packed_refs is also None. self._packed_refs = {} self._peeled_refs = {} path = os.path.join(self.path, 'packed-refs') try: f = GitFile(path, 'rb') except IOError as e: if e.errno == errno.ENOENT: return {} raise with f: first_line = next(iter(f)).rstrip() if (first_line.startswith(b'# pack-refs') and b' peeled' in first_line): for sha, name, peeled in read_packed_refs_with_peeled(f): self._packed_refs[name] = sha if peeled: self._peeled_refs[name] = peeled else: f.seek(0) for sha, name in read_packed_refs(f): self._packed_refs[name] = sha return self._packed_refs def get_peeled(self, name): """Return the cached peeled value of a ref, if available. :param name: Name of the ref to peel :return: The peeled value of the ref. If the ref is known not point to a tag, this will be the SHA the ref refers to. If the ref may point to a tag, but no cached information is available, None is returned. """ self.get_packed_refs() if self._peeled_refs is None or name not in self._packed_refs: # No cache: no peeled refs were read, or this ref is loose return None if name in self._peeled_refs: return self._peeled_refs[name] else: # Known not peelable return self[name] def read_loose_ref(self, name): """Read a reference file and return its contents. If the reference file a symbolic reference, only read the first line of the file. Otherwise, only read the first 40 bytes. :param name: the refname to read, relative to refpath :return: The contents of the ref file, or None if the file does not exist. :raises IOError: if any other error occurs """ filename = self.refpath(name) try: with GitFile(filename, 'rb') as f: header = f.read(len(SYMREF)) if header == SYMREF: # Read only the first line return header + next(iter(f)).rstrip(b'\r\n') else: # Read only the first 40 bytes return header + f.read(40 - len(SYMREF)) except IOError as e: if e.errno == errno.ENOENT: return None raise def _remove_packed_ref(self, name): if self._packed_refs is None: return filename = os.path.join(self.path, 'packed-refs') # reread cached refs from disk, while holding the lock f = GitFile(filename, 'wb') try: self._packed_refs = None self.get_packed_refs() if name not in self._packed_refs: return del self._packed_refs[name] if name in self._peeled_refs: del self._peeled_refs[name] write_packed_refs(f, self._packed_refs, self._peeled_refs) f.close() finally: f.abort() def set_symbolic_ref(self, name, other): """Make a ref point at another ref. :param name: Name of the ref to set :param other: Name of the ref to point at """ self._check_refname(name) self._check_refname(other) filename = self.refpath(name) try: f = GitFile(filename, 'wb') try: f.write(SYMREF + other + b'\n') except (IOError, OSError): f.abort() raise finally: f.close() def set_if_equals(self, name, old_ref, new_ref): """Set a refname to new_ref only if it currently equals old_ref. This method follows all symbolic references, and can be used to perform an atomic compare-and-swap operation. :param name: The refname to set. :param old_ref: The old sha the refname must refer to, or None to set unconditionally. :param new_ref: The new sha the refname will refer to. :return: True if the set was successful, False otherwise. """ self._check_refname(name) try: realnames, _ = self.follow(name) realname = realnames[-1] except (KeyError, IndexError): realname = name filename = self.refpath(realname) ensure_dir_exists(os.path.dirname(filename)) with GitFile(filename, 'wb') as f: if old_ref is not None: try: # read again while holding the lock orig_ref = self.read_loose_ref(realname) if orig_ref is None: orig_ref = self.get_packed_refs().get(realname, ZERO_SHA) if orig_ref != old_ref: f.abort() return False except (OSError, IOError): f.abort() raise try: f.write(new_ref + b'\n') except (OSError, IOError): f.abort() raise return True def add_if_new(self, name, ref): """Add a new reference only if it does not already exist. This method follows symrefs, and only ensures that the last ref in the chain does not exist. :param name: The refname to set. :param ref: The new sha the refname will refer to. :return: True if the add was successful, False otherwise. """ try: realnames, contents = self.follow(name) if contents is not None: return False realname = realnames[-1] except (KeyError, IndexError): realname = name self._check_refname(realname) filename = self.refpath(realname) ensure_dir_exists(os.path.dirname(filename)) with GitFile(filename, 'wb') as f: if os.path.exists(filename) or name in self.get_packed_refs(): f.abort() return False try: f.write(ref + b'\n') except (OSError, IOError): f.abort() raise return True def remove_if_equals(self, name, old_ref): """Remove a refname only if it currently equals old_ref. This method does not follow symbolic references. It can be used to perform an atomic compare-and-delete operation. :param name: The refname to delete. :param old_ref: The old sha the refname must refer to, or None to delete unconditionally. :return: True if the delete was successful, False otherwise. """ self._check_refname(name) filename = self.refpath(name) ensure_dir_exists(os.path.dirname(filename)) f = GitFile(filename, 'wb') try: if old_ref is not None: orig_ref = self.read_loose_ref(name) if orig_ref is None: orig_ref = self.get_packed_refs().get(name, ZERO_SHA) if orig_ref != old_ref: return False # may only be packed try: os.remove(filename) except OSError as e: if e.errno != errno.ENOENT: raise self._remove_packed_ref(name) finally: # never write, we just wanted the lock f.abort() return True def _split_ref_line(line): """Split a single ref line into a tuple of SHA1 and name.""" fields = line.rstrip(b'\n').split(b' ') if len(fields) != 2: raise PackedRefsException("invalid ref line %r" % line) sha, name = fields if not valid_hexsha(sha): raise PackedRefsException("Invalid hex sha %r" % sha) if not check_ref_format(name): raise PackedRefsException("invalid ref name %r" % name) return (sha, name) def read_packed_refs(f): """Read a packed refs file. :param f: file-like object to read from :return: Iterator over tuples with SHA1s and ref names. """ for l in f: if l.startswith(b'#'): # Comment continue if l.startswith(b'^'): raise PackedRefsException( "found peeled ref in packed-refs without peeled") yield _split_ref_line(l) def read_packed_refs_with_peeled(f): """Read a packed refs file including peeled refs. Assumes the "# pack-refs with: peeled" line was already read. Yields tuples with ref names, SHA1s, and peeled SHA1s (or None). :param f: file-like object to read from, seek'ed to the second line """ last = None for l in f: if l[0] == b'#': continue l = l.rstrip(b'\r\n') if l.startswith(b'^'): if not last: raise PackedRefsException("unexpected peeled ref line") if not valid_hexsha(l[1:]): raise PackedRefsException("Invalid hex sha %r" % l[1:]) sha, name = _split_ref_line(last) last = None yield (sha, name, l[1:]) else: if last: sha, name = _split_ref_line(last) yield (sha, name, None) last = l if last: sha, name = _split_ref_line(last) yield (sha, name, None) def write_packed_refs(f, packed_refs, peeled_refs=None): """Write a packed refs file. :param f: empty file-like object to write to :param packed_refs: dict of refname to sha of packed refs to write :param peeled_refs: dict of refname to peeled value of sha """ if peeled_refs is None: peeled_refs = {} else: f.write(b'# pack-refs with: peeled\n') for refname in sorted(packed_refs.keys()): f.write(git_line(packed_refs[refname], refname)) if refname in peeled_refs: f.write(b'^' + peeled_refs[refname] + b'\n') def read_info_refs(f): ret = {} for l in f.readlines(): (sha, name) = l.rstrip(b"\r\n").split(b"\t", 1) ret[name] = sha return ret def write_info_refs(refs, store): """Generate info refs.""" for name, sha in sorted(refs.items()): # get_refs() includes HEAD as a special case, but we don't want to # advertise it if name == b'HEAD': continue try: o = store[sha] except KeyError: continue peeled = store.peel_sha(sha) yield o.id + b'\t' + name + b'\n' if o.id != peeled.id: yield peeled.id + b'\t' + name + b'^{}\n' is_local_branch = lambda x: x.startswith(b'refs/heads/') diff --git a/dulwich/repo.py b/dulwich/repo.py index bcf6c0ee..611b0593 100644 --- a/dulwich/repo.py +++ b/dulwich/repo.py @@ -1,1096 +1,1092 @@ # repo.py -- For dealing with git repositories. # Copyright (C) 2007 James Westby # Copyright (C) 2008-2013 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Repository access. This module contains the base class for git repositories (BaseRepo) and an implementation which uses a repository on local disk (Repo). """ from io import BytesIO import errno import os import sys from dulwich.errors import ( NoIndexPresent, NotBlobError, NotCommitError, NotGitRepository, NotTreeError, NotTagError, CommitError, RefFormatError, HookError, ) from dulwich.file import ( GitFile, ) from dulwich.object_store import ( DiskObjectStore, MemoryObjectStore, ObjectStoreGraphWalker, ) from dulwich.objects import ( check_hexsha, Blob, Commit, ShaFile, Tag, Tree, ) from dulwich.hooks import ( PreCommitShellHook, PostCommitShellHook, CommitMsgShellHook, ) from dulwich.refs import ( check_ref_format, RefsContainer, DictRefsContainer, InfoRefsContainer, DiskRefsContainer, read_packed_refs, read_packed_refs_with_peeled, write_packed_refs, SYMREF, ) import warnings CONTROLDIR = '.git' OBJECTDIR = 'objects' REFSDIR = 'refs' REFSDIR_TAGS = 'tags' REFSDIR_HEADS = 'heads' INDEX_FILENAME = "index" COMMONDIR = 'commondir' GITDIR = 'gitdir' WORKTREES = 'worktrees' BASE_DIRECTORIES = [ ["branches"], [REFSDIR], [REFSDIR, REFSDIR_TAGS], [REFSDIR, REFSDIR_HEADS], ["hooks"], ["info"] ] def parse_graftpoints(graftpoints): """Convert a list of graftpoints into a dict :param graftpoints: Iterator of graftpoint lines Each line is formatted as: []* Resulting dictionary is: : [*] https://git.wiki.kernel.org/index.php/GraftPoint """ grafts = {} for l in graftpoints: raw_graft = l.split(None, 1) commit = raw_graft[0] if len(raw_graft) == 2: parents = raw_graft[1].split() else: parents = [] for sha in [commit] + parents: check_hexsha(sha, 'Invalid graftpoint') grafts[commit] = parents return grafts def serialize_graftpoints(graftpoints): """Convert a dictionary of grafts into string The graft dictionary is: : [*] Each line is formatted as: []* https://git.wiki.kernel.org/index.php/GraftPoint """ graft_lines = [] for commit, parents in graftpoints.items(): if parents: graft_lines.append(commit + b' ' + b' '.join(parents)) else: graft_lines.append(commit) return b'\n'.join(graft_lines) class BaseRepo(object): """Base class for a git repository. :ivar object_store: Dictionary-like object for accessing the objects :ivar refs: Dictionary-like object with the refs in this repository """ def __init__(self, object_store, refs): """Open a repository. This shouldn't be called directly, but rather through one of the base classes, such as MemoryRepo or Repo. :param object_store: Object store to use :param refs: Refs container to use """ self.object_store = object_store self.refs = refs self._graftpoints = {} self.hooks = {} def _init_files(self, bare): """Initialize a default set of named files.""" from dulwich.config import ConfigFile self._put_named_file('description', b"Unnamed repository") f = BytesIO() cf = ConfigFile() cf.set(b"core", b"repositoryformatversion", b"0") cf.set(b"core", b"filemode", b"true") cf.set(b"core", b"bare", bare) cf.set(b"core", b"logallrefupdates", True) cf.write_to_file(f) self._put_named_file('config', f.getvalue()) self._put_named_file(os.path.join('info', 'exclude'), b'') def get_named_file(self, path): """Get a file from the control dir with a specific name. Although the filename should be interpreted as a filename relative to the control dir in a disk-based Repo, the object returned need not be pointing to a file in that location. :param path: The path to the file, relative to the control dir. :return: An open file object, or None if the file does not exist. """ raise NotImplementedError(self.get_named_file) def _put_named_file(self, path, contents): """Write a file to the control dir with the given name and contents. :param path: The path to the file, relative to the control dir. :param contents: A string to write to the file. """ raise NotImplementedError(self._put_named_file) def open_index(self): """Open the index for this repository. :raise NoIndexPresent: If no index is present :return: The matching `Index` """ raise NotImplementedError(self.open_index) def fetch(self, target, determine_wants=None, progress=None): """Fetch objects into another repository. :param target: The target repository :param determine_wants: Optional function to determine what refs to fetch. :param progress: Optional progress function :return: The local refs """ if determine_wants is None: determine_wants = target.object_store.determine_wants_all target.object_store.add_objects( self.fetch_objects(determine_wants, target.get_graph_walker(), progress)) return self.get_refs() def fetch_objects(self, determine_wants, graph_walker, progress, get_tagged=None): """Fetch the missing objects required for a set of revisions. :param determine_wants: Function that takes a dictionary with heads and returns the list of heads to fetch. :param graph_walker: Object that can iterate over the list of revisions to fetch and has an "ack" method that will be called to acknowledge that a revision is present. :param progress: Simple progress function that will be called with updated progress strings. :param get_tagged: Function that returns a dict of pointed-to sha -> tag sha for including tags. :return: iterator over objects, with __len__ implemented """ wants = determine_wants(self.get_refs()) if not isinstance(wants, list): raise TypeError("determine_wants() did not return a list") shallows = getattr(graph_walker, 'shallow', frozenset()) unshallows = getattr(graph_walker, 'unshallow', frozenset()) if wants == []: # TODO(dborowitz): find a way to short-circuit that doesn't change # this interface. if shallows or unshallows: # Do not send a pack in shallow short-circuit path return None return [] # If the graph walker is set up with an implementation that can # ACK/NAK to the wire, it will write data to the client through # this call as a side-effect. haves = self.object_store.find_common_revisions(graph_walker) # Deal with shallow requests separately because the haves do # not reflect what objects are missing if shallows or unshallows: haves = [] # TODO: filter the haves commits from iter_shas. # the specific commits aren't missing. def get_parents(commit): if commit.id in shallows: return [] return self.get_parents(commit.id, commit) return self.object_store.iter_shas( self.object_store.find_missing_objects( haves, wants, progress, get_tagged, get_parents=get_parents)) def get_graph_walker(self, heads=None): """Retrieve a graph walker. A graph walker is used by a remote repository (or proxy) to find out which objects are present in this repository. :param heads: Repository heads to use (optional) :return: A graph walker object """ if heads is None: heads = self.refs.as_dict(b'refs/heads').values() return ObjectStoreGraphWalker(heads, self.get_parents) def get_refs(self): """Get dictionary with all refs. :return: A ``dict`` mapping ref names to SHA1s """ return self.refs.as_dict() def head(self): """Return the SHA1 pointed at by HEAD.""" return self.refs[b'HEAD'] def _get_object(self, sha, cls): assert len(sha) in (20, 40) ret = self.get_object(sha) if not isinstance(ret, cls): if cls is Commit: raise NotCommitError(ret) elif cls is Blob: raise NotBlobError(ret) elif cls is Tree: raise NotTreeError(ret) elif cls is Tag: raise NotTagError(ret) else: raise Exception("Type invalid: %r != %r" % ( ret.type_name, cls.type_name)) return ret def get_object(self, sha): """Retrieve the object with the specified SHA. :param sha: SHA to retrieve :return: A ShaFile object :raise KeyError: when the object can not be found """ return self.object_store[sha] def get_parents(self, sha, commit=None): """Retrieve the parents of a specific commit. If the specific commit is a graftpoint, the graft parents will be returned instead. :param sha: SHA of the commit for which to retrieve the parents :param commit: Optional commit matching the sha :return: List of parents """ try: return self._graftpoints[sha] except KeyError: if commit is None: commit = self[sha] return commit.parents def get_config(self): """Retrieve the config object. :return: `ConfigFile` object for the ``.git/config`` file. """ raise NotImplementedError(self.get_config) def get_description(self): """Retrieve the description for this repository. :return: String with the description of the repository as set by the user. """ raise NotImplementedError(self.get_description) def set_description(self, description): """Set the description for this repository. :param description: Text to set as description for this repository. """ raise NotImplementedError(self.set_description) def get_config_stack(self): """Return a config stack for this repository. This stack accesses the configuration for both this repository itself (.git/config) and the global configuration, which usually lives in ~/.gitconfig. :return: `Config` instance for this repository """ from dulwich.config import StackedConfig backends = [self.get_config()] + StackedConfig.default_backends() return StackedConfig(backends, writable=backends[0]) def get_peeled(self, ref): """Get the peeled value of a ref. :param ref: The refname to peel. :return: The fully-peeled SHA1 of a tag object, after peeling all intermediate tags; if the original ref does not point to a tag, this will equal the original SHA1. """ cached = self.refs.get_peeled(ref) if cached is not None: return cached return self.object_store.peel_sha(self.refs[ref]).id def get_walker(self, include=None, *args, **kwargs): """Obtain a walker for this repository. :param include: Iterable of SHAs of commits to include along with their ancestors. Defaults to [HEAD] :param exclude: Iterable of SHAs of commits to exclude along with their ancestors, overriding includes. :param order: ORDER_* constant specifying the order of results. Anything other than ORDER_DATE may result in O(n) memory usage. :param reverse: If True, reverse the order of output, requiring O(n) memory. :param max_entries: The maximum number of entries to yield, or None for no limit. :param paths: Iterable of file or subtree paths to show entries for. :param rename_detector: diff.RenameDetector object for detecting renames. :param follow: If True, follow path across renames/copies. Forces a default rename_detector. :param since: Timestamp to list commits after. :param until: Timestamp to list commits before. :param queue_cls: A class to use for a queue of commits, supporting the iterator protocol. The constructor takes a single argument, the Walker. :return: A `Walker` object """ from dulwich.walk import Walker if include is None: include = [self.head()] if isinstance(include, str): include = [include] kwargs['get_parents'] = lambda commit: self.get_parents(commit.id, commit) return Walker(self.object_store, include, *args, **kwargs) def __getitem__(self, name): """Retrieve a Git object by SHA1 or ref. :param name: A Git object SHA1 or a ref name :return: A `ShaFile` object, such as a Commit or Blob :raise KeyError: when the specified ref or object does not exist """ if not isinstance(name, bytes): raise TypeError("'name' must be bytestring, not %.80s" % type(name).__name__) if len(name) in (20, 40): try: return self.object_store[name] except (KeyError, ValueError): pass try: return self.object_store[self.refs[name]] except RefFormatError: raise KeyError(name) def __contains__(self, name): """Check if a specific Git object or ref is present. :param name: Git object SHA1 or ref name """ if len(name) in (20, 40): return name in self.object_store or name in self.refs else: return name in self.refs def __setitem__(self, name, value): """Set a ref. :param name: ref name :param value: Ref value - either a ShaFile object, or a hex sha """ if name.startswith(b"refs/") or name == b'HEAD': if isinstance(value, ShaFile): self.refs[name] = value.id elif isinstance(value, bytes): self.refs[name] = value else: raise TypeError(value) else: raise ValueError(name) def __delitem__(self, name): """Remove a ref. :param name: Name of the ref to remove """ if name.startswith(b"refs/") or name == b"HEAD": del self.refs[name] else: raise ValueError(name) def _get_user_identity(self): """Determine the identity to use for new commits. """ config = self.get_config_stack() return (config.get((b"user", ), b"name") + b" <" + config.get((b"user", ), b"email") + b">") def _add_graftpoints(self, updated_graftpoints): """Add or modify graftpoints :param updated_graftpoints: Dict of commit shas to list of parent shas """ # Simple validation for commit, parents in updated_graftpoints.items(): for sha in [commit] + parents: check_hexsha(sha, 'Invalid graftpoint') self._graftpoints.update(updated_graftpoints) def _remove_graftpoints(self, to_remove=[]): """Remove graftpoints :param to_remove: List of commit shas """ for sha in to_remove: del self._graftpoints[sha] def do_commit(self, message=None, committer=None, author=None, commit_timestamp=None, commit_timezone=None, author_timestamp=None, author_timezone=None, tree=None, encoding=None, ref=b'HEAD', merge_heads=None): """Create a new commit. :param message: Commit message :param committer: Committer fullname :param author: Author fullname (defaults to committer) :param commit_timestamp: Commit timestamp (defaults to now) :param commit_timezone: Commit timestamp timezone (defaults to GMT) :param author_timestamp: Author timestamp (defaults to commit timestamp) :param author_timezone: Author timestamp timezone (defaults to commit timestamp timezone) :param tree: SHA1 of the tree root to use (if not specified the current index will be committed). :param encoding: Encoding :param ref: Optional ref to commit to (defaults to current branch) :param merge_heads: Merge heads (defaults to .git/MERGE_HEADS) :return: New commit SHA1 """ import time c = Commit() if tree is None: index = self.open_index() c.tree = index.commit(self.object_store) else: if len(tree) != 40: raise ValueError("tree must be a 40-byte hex sha string") c.tree = tree try: self.hooks['pre-commit'].execute() except HookError as e: raise CommitError(e) except KeyError: # no hook defined, silent fallthrough pass if merge_heads is None: # FIXME: Read merge heads from .git/MERGE_HEADS merge_heads = [] if committer is None: # FIXME: Support GIT_COMMITTER_NAME/GIT_COMMITTER_EMAIL environment # variables committer = self._get_user_identity() c.committer = committer if commit_timestamp is None: # FIXME: Support GIT_COMMITTER_DATE environment variable commit_timestamp = time.time() c.commit_time = int(commit_timestamp) if commit_timezone is None: # FIXME: Use current user timezone rather than UTC commit_timezone = 0 c.commit_timezone = commit_timezone if author is None: # FIXME: Support GIT_AUTHOR_NAME/GIT_AUTHOR_EMAIL environment # variables author = committer c.author = author if author_timestamp is None: # FIXME: Support GIT_AUTHOR_DATE environment variable author_timestamp = commit_timestamp c.author_time = int(author_timestamp) if author_timezone is None: author_timezone = commit_timezone c.author_timezone = author_timezone if encoding is not None: c.encoding = encoding if message is None: # FIXME: Try to read commit message from .git/MERGE_MSG raise ValueError("No commit message specified") try: c.message = self.hooks['commit-msg'].execute(message) if c.message is None: c.message = message except HookError as e: raise CommitError(e) except KeyError: # no hook defined, message not modified c.message = message if ref is None: # Create a dangling commit c.parents = merge_heads self.object_store.add_object(c) else: try: old_head = self.refs[ref] c.parents = [old_head] + merge_heads self.object_store.add_object(c) ok = self.refs.set_if_equals(ref, old_head, c.id) except KeyError: c.parents = merge_heads self.object_store.add_object(c) ok = self.refs.add_if_new(ref, c.id) if not ok: # Fail if the atomic compare-and-swap failed, leaving the commit and # all its objects as garbage. raise CommitError("%s changed during commit" % (ref,)) try: self.hooks['post-commit'].execute() except HookError as e: # silent failure warnings.warn("post-commit hook failed: %s" % e, UserWarning) except KeyError: # no hook defined, silent fallthrough pass return c.id def read_gitfile(f): """Read a ``.git`` file. The first line of the file should start with "gitdir: " :param f: File-like object to read from :return: A path """ cs = f.read() if not cs.startswith("gitdir: "): raise ValueError("Expected file to start with 'gitdir: '") return cs[len("gitdir: "):].rstrip("\n") class Repo(BaseRepo): """A git repository backed by local disk. To open an existing repository, call the contructor with the path of the repository. To create a new repository, use the Repo.init class method. """ def __init__(self, root): hidden_path = os.path.join(root, CONTROLDIR) if os.path.isdir(os.path.join(hidden_path, OBJECTDIR)): self.bare = False self._controldir = hidden_path elif (os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir(os.path.join(root, REFSDIR))): self.bare = True self._controldir = root elif os.path.isfile(hidden_path): self.bare = False with open(hidden_path, 'r') as f: path = read_gitfile(f) self.bare = False self._controldir = os.path.join(root, path) else: raise NotGitRepository( "No git repository was found at %(path)s" % dict(path=root) ) - commondir = self.get_named_file(COMMONDIR, mode='r') - if commondir: - with commondir: - self._commondir = os.path.join(self.controldir(), commondir.read().rstrip("\n")) + commondir = self.get_named_file(COMMONDIR) + if commondir is not None: + self._commondir = os.path.join( + self.controldir(), commondir.read().rstrip("\r\n")) else: self._commondir = self._controldir self.path = root - object_store = DiskObjectStore(os.path.join(self.commondir(), - OBJECTDIR)) + object_store = DiskObjectStore( + os.path.join(self.commondir(), OBJECTDIR)) refs = DiskRefsContainer(self.commondir(), self._controldir) BaseRepo.__init__(self, object_store, refs) self._graftpoints = {} graft_file = self.get_named_file(os.path.join("info", "grafts"), basedir=self.commondir()) if graft_file: with graft_file: self._graftpoints.update(parse_graftpoints(graft_file)) graft_file = self.get_named_file("shallow", basedir=self.commondir()) if graft_file: with graft_file: self._graftpoints.update(parse_graftpoints(graft_file)) self.hooks['pre-commit'] = PreCommitShellHook(self.controldir()) self.hooks['commit-msg'] = CommitMsgShellHook(self.controldir()) self.hooks['post-commit'] = PostCommitShellHook(self.controldir()) @classmethod def discover(cls, start='.'): """Iterate parent directories to discover a repository Return a Repo object for the first parent directory that looks like a Git repository. :param start: The directory to start discovery from (defaults to '.') """ remaining = True path = os.path.abspath(start) while remaining: try: return cls(path) except NotGitRepository: path, remaining = os.path.split(path) raise NotGitRepository( "No git repository was found at %(path)s" % dict(path=start) ) def controldir(self): """Return the path of the control directory.""" return self._controldir def commondir(self): """Return the path of the common directory. For a main working tree, it is identical to `controldir()`. For a linked working tree, it is the control directory of the main working tree.""" return self._commondir def _put_named_file(self, path, contents): """Write a file to the control dir with the given name and contents. :param path: The path to the file, relative to the control dir. :param contents: A string to write to the file. """ path = path.lstrip(os.path.sep) with GitFile(os.path.join(self.controldir(), path), 'wb') as f: f.write(contents) - def get_named_file(self, path, **kwargs): + def get_named_file(self, path, basedir=None): """Get a file from the control dir with a specific name. Although the filename should be interpreted as a filename relative to the control dir in a disk-based Repo, the object returned need not be pointing to a file in that location. :param path: The path to the file, relative to the control dir. :param basedir: Optional argument that specifies an alternative to the control dir. :return: An open file object, or None if the file does not exist. """ # TODO(dborowitz): sanitize filenames, since this is used directly by # the dumb web serving code. - basedir_ = kwargs.get('basedir', self.controldir()) - mode = kwargs.get('mode', 'rb') + if basedir is None: + basedir = self.controldir() path = path.lstrip(os.path.sep) try: - return open(os.path.join(basedir_, path), mode) + return open(os.path.join(basedir, path), 'rb') except (IOError, OSError) as e: if e.errno == errno.ENOENT: return None raise def index_path(self): """Return path to the index file.""" return os.path.join(self.controldir(), INDEX_FILENAME) def open_index(self): """Open the index for this repository. :raise NoIndexPresent: If no index is present :return: The matching `Index` """ from dulwich.index import Index if not self.has_index(): raise NoIndexPresent() return Index(self.index_path()) def has_index(self): """Check if an index is present.""" # Bare repos must never have index files; non-bare repos may have a # missing index file, which is treated as empty. return not self.bare def stage(self, fs_paths): """Stage a set of paths. :param fs_paths: List of paths, relative to the repository path """ root_path_bytes = self.path.encode(sys.getfilesystemencoding()) if not isinstance(fs_paths, list): fs_paths = [fs_paths] from dulwich.index import ( blob_from_path_and_stat, index_entry_from_stat, _fs_to_tree_path, ) index = self.open_index() for fs_path in fs_paths: if not isinstance(fs_path, bytes): fs_path = fs_path.encode(sys.getfilesystemencoding()) tree_path = _fs_to_tree_path(fs_path) full_path = os.path.join(root_path_bytes, fs_path) try: st = os.lstat(full_path) except OSError: # File no longer exists try: del index[tree_path] except KeyError: pass # already removed else: blob = blob_from_path_and_stat(full_path, st) self.object_store.add_object(blob) index[tree_path] = index_entry_from_stat(st, blob.id, 0) index.write() def clone(self, target_path, mkdir=True, bare=False, origin=b"origin"): """Clone this repository. :param target_path: Target path :param mkdir: Create the target directory :param bare: Whether to create a bare repository :param origin: Base name for refs in target repository cloned from this repository :return: Created repository as `Repo` """ if not bare: target = self.init(target_path, mkdir=mkdir) else: target = self.init_bare(target_path) self.fetch(target) target.refs.import_refs( b'refs/remotes/' + origin, self.refs.as_dict(b'refs/heads')) target.refs.import_refs( b'refs/tags', self.refs.as_dict(b'refs/tags')) try: target.refs.add_if_new( b'refs/heads/master', self.refs[b'refs/heads/master']) except KeyError: pass # Update target head head_chain, head_sha = self.refs.follow(b'HEAD') if head_chain and head_sha is not None: target.refs.set_symbolic_ref(b'HEAD', head_chain[-1]) target[b'HEAD'] = head_sha if not bare: # Checkout HEAD to target dir target.reset_index() return target def reset_index(self, tree=None): """Reset the index back to a specific tree. :param tree: Tree SHA to reset to, None for current HEAD tree. """ from dulwich.index import ( build_index_from_tree, validate_path_element_default, validate_path_element_ntfs, ) if tree is None: tree = self[b'HEAD'].tree config = self.get_config() honor_filemode = config.get_boolean('core', 'filemode', os.name != "nt") if config.get_boolean('core', 'core.protectNTFS', os.name == "nt"): validate_path_element = validate_path_element_ntfs else: validate_path_element = validate_path_element_default return build_index_from_tree(self.path, self.index_path(), self.object_store, tree, honor_filemode=honor_filemode, validate_path_element=validate_path_element) def get_config(self): """Retrieve the config object. :return: `ConfigFile` object for the ``.git/config`` file. """ from dulwich.config import ConfigFile path = os.path.join(self._controldir, 'config') try: return ConfigFile.from_path(path) except (IOError, OSError) as e: if e.errno != errno.ENOENT: raise ret = ConfigFile() ret.path = path return ret def get_description(self): """Retrieve the description of this repository. :return: A string describing the repository or None. """ path = os.path.join(self._controldir, 'description') try: with GitFile(path, 'rb') as f: return f.read() except (IOError, OSError) as e: if e.errno != errno.ENOENT: raise return None def __repr__(self): return "" % self.path def set_description(self, description): """Set the description for this repository. :param description: Text to set as description for this repository. """ self._put_named_file('description', description) @classmethod def _init_maybe_bare(cls, path, bare): for d in BASE_DIRECTORIES: os.mkdir(os.path.join(path, *d)) DiskObjectStore.init(os.path.join(path, OBJECTDIR)) ret = cls(path) ret.refs.set_symbolic_ref(b'HEAD', b"refs/heads/master") ret._init_files(bare) return ret @classmethod def init(cls, path, mkdir=False): """Create a new repository. :param path: Path in which to create the repository :param mkdir: Whether to create the directory :return: `Repo` instance """ if mkdir: os.mkdir(path) controldir = os.path.join(path, CONTROLDIR) os.mkdir(controldir) cls._init_maybe_bare(controldir, False) return cls(path) @classmethod - def init_new_working_directory(cls, path, main_path, mkdir=False): + def _init_new_working_directory(cls, path, main_repo, identifier=None, mkdir=False): """Create a new working directory linked to a repository. :param path: Path in which to create the working tree. - :param main_path: Path to the main repository (that can be bare or not). + :param main_repo: Main repository to reference + :param identifier: Worktree identifier :param mkdir: Whether to create the directory :return: `Repo` instance """ if mkdir: os.mkdir(path) - worktree_id = os.path.basename(path) + if identifier is None: + identifier = os.path.basename(path) + main_worktreesdir = os.path.join(main_repo.controldir(), WORKTREES) + worktree_controldir = os.path.join(main_worktreesdir, identifier) gitdirfile = os.path.join(path, CONTROLDIR) - main = Repo(main_path) - main_controldir = main.controldir() - main_worktreesdir = os.path.join(main_controldir, WORKTREES) - worktree_controldir = os.path.join(main_worktreesdir, worktree_id) - with open(gitdirfile, 'w') as f: - f.write('gitdir: {}\n'.format(worktree_controldir)) + with open(gitdirfile, 'wb') as f: + f.write('gitdir: ' + + worktree_controldir.encode(sys.getfilesystemencoding()) + + b'\n') try: os.mkdir(main_worktreesdir) except OSError as e: if e.errno != errno.EEXIST: raise try: os.mkdir(worktree_controldir) except OSError as e: if e.errno != errno.EEXIST: raise - with open(os.path.join(worktree_controldir, GITDIR), 'w') as f: - f.write(gitdirfile + '\n') - with open(os.path.join(worktree_controldir, COMMONDIR), 'w') as f: - f.write('../..\n') - with open(os.path.join(worktree_controldir, 'HEAD'), 'w') as f: - f.write('{}\n'.format(main.refs[b'HEAD'].decode('ascii'))) - main.close() + with open(os.path.join(worktree_controldir, GITDIR), 'wb') as f: + f.write(gitdirfile + b'\n') + with open(os.path.join(worktree_controldir, COMMONDIR), 'wb') as f: + f.write(b'../..\n') + with open(os.path.join(worktree_controldir, 'HEAD'), 'wb') as f: + f.write(main_repo.head() + b'\n') r = cls(path) - from dulwich.index import ( - build_index_from_tree, - ) - indexfile = r.index_path() - tree = r[b'HEAD'].tree - build_index_from_tree(r.path, indexfile, r.object_store, tree) + r.reset_index() return r @classmethod def init_bare(cls, path): """Create a new bare repository. ``path`` should already exist and be an emty directory. :param path: Path to create bare repository in :return: a `Repo` instance """ return cls._init_maybe_bare(path, True) create = init_bare def close(self): """Close any files opened by this repository.""" self.object_store.close() class MemoryRepo(BaseRepo): """Repo that stores refs, objects, and named files in memory. MemoryRepos are always bare: they have no working tree and no index, since those have a stronger dependency on the filesystem. """ def __init__(self): from dulwich.config import ConfigFile BaseRepo.__init__(self, MemoryObjectStore(), DictRefsContainer({})) self._named_files = {} self.bare = True self._config = ConfigFile() def _put_named_file(self, path, contents): """Write a file to the control dir with the given name and contents. :param path: The path to the file, relative to the control dir. :param contents: A string to write to the file. """ self._named_files[path] = contents def get_named_file(self, path): """Get a file from the control dir with a specific name. Although the filename should be interpreted as a filename relative to the control dir in a disk-baked Repo, the object returned need not be pointing to a file in that location. :param path: The path to the file, relative to the control dir. :return: An open file object, or None if the file does not exist. """ contents = self._named_files.get(path, None) if contents is None: return None return BytesIO(contents) def open_index(self): """Fail to open index for this repo, since it is bare. :raise NoIndexPresent: Raised when no index is present """ raise NoIndexPresent() def get_config(self): """Retrieve the config object. :return: `ConfigFile` object. """ return self._config def get_description(self): """Retrieve the repository description. This defaults to None, for no description. """ return None @classmethod def init_bare(cls, objects, refs): """Create a new bare repository in memory. :param objects: Objects for the new repository, as iterable :param refs: Refs as dictionary, mapping names to object SHA1s """ ret = cls() for obj in objects: ret.object_store.add_object(obj) for refname, sha in refs.items(): ret.refs[refname] = sha ret._init_files(bare=True) return ret diff --git a/dulwich/tests/compat/test_client.py b/dulwich/tests/compat/test_client.py index 00e1dca3..85c22a23 100644 --- a/dulwich/tests/compat/test_client.py +++ b/dulwich/tests/compat/test_client.py @@ -1,535 +1,533 @@ # test_client.py -- Compatibilty tests for git client. # Copyright (C) 2010 Google, Inc. # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Compatibilty tests between the Dulwich client and the cgit server.""" from contextlib import closing import copy from io import BytesIO import os import select import signal import subprocess import sys import tarfile import tempfile import threading try: from urlparse import unquote except ImportError: from urllib.parse import unquote try: import BaseHTTPServer import SimpleHTTPServer except ImportError: import http.server BaseHTTPServer = http.server SimpleHTTPServer = http.server if sys.platform == 'win32': import ctypes from dulwich import ( client, errors, file, index, protocol, objects, repo, ) from dulwich.tests import ( SkipTest, expectedFailure, ) -from dulwich.tests.utils import ( - rmtree_ro, -) from dulwich.tests.compat.utils import ( CompatTestCase, check_for_daemon, import_repo_to_dir, + rmtree_ro, run_git_or_fail, _DEFAULT_GIT, ) class DulwichClientTestBase(object): """Tests for client/server compatibility.""" def setUp(self): self.gitroot = os.path.dirname(import_repo_to_dir('server_new.export').rstrip(os.sep)) self.dest = os.path.join(self.gitroot, 'dest') file.ensure_dir_exists(self.dest) run_git_or_fail(['init', '--quiet', '--bare'], cwd=self.dest) def tearDown(self): rmtree_ro(self.gitroot) def assertDestEqualsSrc(self): repo_dir = os.path.join(self.gitroot, 'server_new.export') dest_repo_dir = os.path.join(self.gitroot, 'dest') with closing(repo.Repo(repo_dir)) as src: with closing(repo.Repo(dest_repo_dir)) as dest: self.assertReposEqual(src, dest) def _client(self): raise NotImplementedError() def _build_path(self): raise NotImplementedError() def _do_send_pack(self): c = self._client() srcpath = os.path.join(self.gitroot, 'server_new.export') with closing(repo.Repo(srcpath)) as src: sendrefs = dict(src.get_refs()) del sendrefs[b'HEAD'] c.send_pack(self._build_path(b'/dest'), lambda _: sendrefs, src.object_store.generate_pack_contents) def test_send_pack(self): self._do_send_pack() self.assertDestEqualsSrc() def test_send_pack_nothing_to_send(self): self._do_send_pack() self.assertDestEqualsSrc() # nothing to send, but shouldn't raise either. self._do_send_pack() def test_send_without_report_status(self): c = self._client() c._send_capabilities.remove(b'report-status') srcpath = os.path.join(self.gitroot, 'server_new.export') with closing(repo.Repo(srcpath)) as src: sendrefs = dict(src.get_refs()) del sendrefs[b'HEAD'] c.send_pack(self._build_path(b'/dest'), lambda _: sendrefs, src.object_store.generate_pack_contents) self.assertDestEqualsSrc() def make_dummy_commit(self, dest): b = objects.Blob.from_string(b'hi') dest.object_store.add_object(b) t = index.commit_tree(dest.object_store, [(b'hi', b.id, 0o100644)]) c = objects.Commit() c.author = c.committer = b'Foo Bar ' c.author_time = c.commit_time = 0 c.author_timezone = c.commit_timezone = 0 c.message = b'hi' c.tree = t dest.object_store.add_object(c) return c.id def disable_ff_and_make_dummy_commit(self): # disable non-fast-forward pushes to the server dest = repo.Repo(os.path.join(self.gitroot, 'dest')) run_git_or_fail(['config', 'receive.denyNonFastForwards', 'true'], cwd=dest.path) commit_id = self.make_dummy_commit(dest) return dest, commit_id def compute_send(self, src): sendrefs = dict(src.get_refs()) del sendrefs[b'HEAD'] return sendrefs, src.object_store.generate_pack_contents def test_send_pack_one_error(self): dest, dummy_commit = self.disable_ff_and_make_dummy_commit() dest.refs[b'refs/heads/master'] = dummy_commit repo_dir = os.path.join(self.gitroot, 'server_new.export') with closing(repo.Repo(repo_dir)) as src: sendrefs, gen_pack = self.compute_send(src) c = self._client() try: c.send_pack(self._build_path(b'/dest'), lambda _: sendrefs, gen_pack) except errors.UpdateRefsError as e: self.assertEqual('refs/heads/master failed to update', e.args[0]) self.assertEqual({b'refs/heads/branch': b'ok', b'refs/heads/master': b'non-fast-forward'}, e.ref_status) def test_send_pack_multiple_errors(self): dest, dummy = self.disable_ff_and_make_dummy_commit() # set up for two non-ff errors branch, master = b'refs/heads/branch', b'refs/heads/master' dest.refs[branch] = dest.refs[master] = dummy repo_dir = os.path.join(self.gitroot, 'server_new.export') with closing(repo.Repo(repo_dir)) as src: sendrefs, gen_pack = self.compute_send(src) c = self._client() try: c.send_pack(self._build_path(b'/dest'), lambda _: sendrefs, gen_pack) except errors.UpdateRefsError as e: self.assertIn(str(e), ['{0}, {1} failed to update'.format( branch.decode('ascii'), master.decode('ascii')), '{1}, {0} failed to update'.format( branch.decode('ascii'), master.decode('ascii'))]) self.assertEqual({branch: b'non-fast-forward', master: b'non-fast-forward'}, e.ref_status) def test_archive(self): c = self._client() f = BytesIO() c.archive(self._build_path(b'/server_new.export'), b'HEAD', f.write) f.seek(0) tf = tarfile.open(fileobj=f) self.assertEqual(['baz', 'foo'], tf.getnames()) def test_fetch_pack(self): c = self._client() with closing(repo.Repo(os.path.join(self.gitroot, 'dest'))) as dest: refs = c.fetch(self._build_path(b'/server_new.export'), dest) for r in refs.items(): dest.refs.set_if_equals(r[0], None, r[1]) self.assertDestEqualsSrc() def test_incremental_fetch_pack(self): self.test_fetch_pack() dest, dummy = self.disable_ff_and_make_dummy_commit() dest.refs[b'refs/heads/master'] = dummy c = self._client() repo_dir = os.path.join(self.gitroot, 'server_new.export') with closing(repo.Repo(repo_dir)) as dest: refs = c.fetch(self._build_path(b'/dest'), dest) for r in refs.items(): dest.refs.set_if_equals(r[0], None, r[1]) self.assertDestEqualsSrc() def test_fetch_pack_no_side_band_64k(self): c = self._client() c._fetch_capabilities.remove(b'side-band-64k') with closing(repo.Repo(os.path.join(self.gitroot, 'dest'))) as dest: refs = c.fetch(self._build_path(b'/server_new.export'), dest) for r in refs.items(): dest.refs.set_if_equals(r[0], None, r[1]) self.assertDestEqualsSrc() def test_fetch_pack_zero_sha(self): # zero sha1s are already present on the client, and should # be ignored c = self._client() with closing(repo.Repo(os.path.join(self.gitroot, 'dest'))) as dest: refs = c.fetch(self._build_path(b'/server_new.export'), dest, lambda refs: [protocol.ZERO_SHA]) for r in refs.items(): dest.refs.set_if_equals(r[0], None, r[1]) def test_send_remove_branch(self): with closing(repo.Repo(os.path.join(self.gitroot, 'dest'))) as dest: dummy_commit = self.make_dummy_commit(dest) dest.refs[b'refs/heads/master'] = dummy_commit dest.refs[b'refs/heads/abranch'] = dummy_commit sendrefs = dict(dest.refs) sendrefs[b'refs/heads/abranch'] = b"00" * 20 del sendrefs[b'HEAD'] gen_pack = lambda have, want: [] c = self._client() self.assertEqual(dest.refs[b"refs/heads/abranch"], dummy_commit) c.send_pack(self._build_path(b'/dest'), lambda _: sendrefs, gen_pack) self.assertFalse(b"refs/heads/abranch" in dest.refs) def test_get_refs(self): c = self._client() refs = c.get_refs(self._build_path(b'/server_new.export')) repo_dir = os.path.join(self.gitroot, 'server_new.export') with closing(repo.Repo(repo_dir)) as dest: self.assertDictEqual(dest.refs.as_dict(), refs) class DulwichTCPClientTest(CompatTestCase, DulwichClientTestBase): def setUp(self): CompatTestCase.setUp(self) DulwichClientTestBase.setUp(self) if check_for_daemon(limit=1): raise SkipTest('git-daemon was already running on port %s' % protocol.TCP_GIT_PORT) fd, self.pidfile = tempfile.mkstemp(prefix='dulwich-test-git-client', suffix=".pid") os.fdopen(fd).close() args = [_DEFAULT_GIT, 'daemon', '--verbose', '--export-all', '--pid-file=%s' % self.pidfile, '--base-path=%s' % self.gitroot, '--enable=receive-pack', '--enable=upload-archive', '--listen=localhost', '--reuseaddr', self.gitroot] self.process = subprocess.Popen( args, cwd=self.gitroot, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if not check_for_daemon(): raise SkipTest('git-daemon failed to start') def tearDown(self): with open(self.pidfile) as f: pid = int(f.read().strip()) if sys.platform == 'win32': PROCESS_TERMINATE = 1 handle = ctypes.windll.kernel32.OpenProcess( PROCESS_TERMINATE, False, pid) ctypes.windll.kernel32.TerminateProcess(handle, -1) ctypes.windll.kernel32.CloseHandle(handle) else: try: os.kill(pid, signal.SIGKILL) os.unlink(self.pidfile) except (OSError, IOError): pass self.process.wait() self.process.stdout.close() self.process.stderr.close() DulwichClientTestBase.tearDown(self) CompatTestCase.tearDown(self) def _client(self): return client.TCPGitClient('localhost') def _build_path(self, path): return path if sys.platform == 'win32': @expectedFailure def test_fetch_pack_no_side_band_64k(self): DulwichClientTestBase.test_fetch_pack_no_side_band_64k(self) class TestSSHVendor(object): @staticmethod def run_command(host, command, username=None, port=None): cmd, path = command.split(b' ') cmd = cmd.split(b'-', 1) path = path.replace(b"'", b"") p = subprocess.Popen(cmd + [path], bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return client.SubprocessWrapper(p) class DulwichMockSSHClientTest(CompatTestCase, DulwichClientTestBase): def setUp(self): CompatTestCase.setUp(self) DulwichClientTestBase.setUp(self) self.real_vendor = client.get_ssh_vendor client.get_ssh_vendor = TestSSHVendor def tearDown(self): DulwichClientTestBase.tearDown(self) CompatTestCase.tearDown(self) client.get_ssh_vendor = self.real_vendor def _client(self): return client.SSHGitClient('localhost') def _build_path(self, path): return self.gitroot.encode(sys.getfilesystemencoding()) + path class DulwichSubprocessClientTest(CompatTestCase, DulwichClientTestBase): def setUp(self): CompatTestCase.setUp(self) DulwichClientTestBase.setUp(self) def tearDown(self): DulwichClientTestBase.tearDown(self) CompatTestCase.tearDown(self) def _client(self): return client.SubprocessGitClient(stderr=subprocess.PIPE) def _build_path(self, path): return self.gitroot.encode(sys.getfilesystemencoding()) + path class GitHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): """HTTP Request handler that calls out to 'git http-backend'.""" # Make rfile unbuffered -- we need to read one line and then pass # the rest to a subprocess, so we can't use buffered input. rbufsize = 0 def do_POST(self): self.run_backend() def do_GET(self): self.run_backend() def send_head(self): return self.run_backend() def log_request(self, code='-', size='-'): # Let's be quiet, the test suite is noisy enough already pass def run_backend(self): """Call out to git http-backend.""" # Based on CGIHTTPServer.CGIHTTPRequestHandler.run_cgi: # Copyright (c) 2001-2010 Python Software Foundation; All Rights Reserved # Licensed under the Python Software Foundation License. rest = self.path # find an explicit query string, if present. i = rest.rfind('?') if i >= 0: rest, query = rest[:i], rest[i+1:] else: query = '' env = copy.deepcopy(os.environ) env['SERVER_SOFTWARE'] = self.version_string() env['SERVER_NAME'] = self.server.server_name env['GATEWAY_INTERFACE'] = 'CGI/1.1' env['SERVER_PROTOCOL'] = self.protocol_version env['SERVER_PORT'] = str(self.server.server_port) env['GIT_PROJECT_ROOT'] = self.server.root_path env["GIT_HTTP_EXPORT_ALL"] = "1" env['REQUEST_METHOD'] = self.command uqrest = unquote(rest) env['PATH_INFO'] = uqrest env['SCRIPT_NAME'] = "/" if query: env['QUERY_STRING'] = query host = self.address_string() if host != self.client_address[0]: env['REMOTE_HOST'] = host env['REMOTE_ADDR'] = self.client_address[0] authorization = self.headers.get("authorization") if authorization: authorization = authorization.split() if len(authorization) == 2: import base64, binascii env['AUTH_TYPE'] = authorization[0] if authorization[0].lower() == "basic": try: authorization = base64.decodestring(authorization[1]) except binascii.Error: pass else: authorization = authorization.split(':') if len(authorization) == 2: env['REMOTE_USER'] = authorization[0] # XXX REMOTE_IDENT env['CONTENT_TYPE'] = self.headers.get('content-type') length = self.headers.get('content-length') if length: env['CONTENT_LENGTH'] = length referer = self.headers.get('referer') if referer: env['HTTP_REFERER'] = referer accept = [] for line in self.headers.getallmatchingheaders('accept'): if line[:1] in "\t\n\r ": accept.append(line.strip()) else: accept = accept + line[7:].split(',') env['HTTP_ACCEPT'] = ','.join(accept) ua = self.headers.get('user-agent') if ua: env['HTTP_USER_AGENT'] = ua co = self.headers.get('cookie') if co: env['HTTP_COOKIE'] = co # XXX Other HTTP_* headers # Since we're setting the env in the parent, provide empty # values to override previously set values for k in ('QUERY_STRING', 'REMOTE_HOST', 'CONTENT_LENGTH', 'HTTP_USER_AGENT', 'HTTP_COOKIE', 'HTTP_REFERER'): env.setdefault(k, "") self.wfile.write(b"HTTP/1.1 200 Script output follows\r\n") self.wfile.write( ("Server: %s\r\n" % self.server.server_name).encode('ascii')) self.wfile.write( ("Date: %s\r\n" % self.date_time_string()).encode('ascii')) decoded_query = query.replace('+', ' ') try: nbytes = int(length) except (TypeError, ValueError): nbytes = 0 if self.command.lower() == "post" and nbytes > 0: data = self.rfile.read(nbytes) else: data = None # throw away additional data [see bug #427345] while select.select([self.rfile._sock], [], [], 0)[0]: if not self.rfile._sock.recv(1): break args = ['http-backend'] if '=' not in decoded_query: args.append(decoded_query) stdout = run_git_or_fail(args, input=data, env=env, stderr=subprocess.PIPE) self.wfile.write(stdout) class HTTPGitServer(BaseHTTPServer.HTTPServer): allow_reuse_address = True def __init__(self, server_address, root_path): BaseHTTPServer.HTTPServer.__init__(self, server_address, GitHTTPRequestHandler) self.root_path = root_path self.server_name = "localhost" def get_url(self): return 'http://%s:%s/' % (self.server_name, self.server_port) class DulwichHttpClientTest(CompatTestCase, DulwichClientTestBase): min_git_version = (1, 7, 0, 2) def setUp(self): CompatTestCase.setUp(self) DulwichClientTestBase.setUp(self) self._httpd = HTTPGitServer(("localhost", 0), self.gitroot) self.addCleanup(self._httpd.shutdown) threading.Thread(target=self._httpd.serve_forever).start() run_git_or_fail(['config', 'http.uploadpack', 'true'], cwd=self.dest) run_git_or_fail(['config', 'http.receivepack', 'true'], cwd=self.dest) def tearDown(self): DulwichClientTestBase.tearDown(self) CompatTestCase.tearDown(self) self._httpd.shutdown() self._httpd.socket.close() def _client(self): return client.HttpGitClient(self._httpd.get_url()) def _build_path(self, path): if sys.version_info[0] == 3: return path.decode('ascii') else: return path def test_archive(self): raise SkipTest("exporting archives not supported over http") diff --git a/dulwich/tests/compat/test_repository.py b/dulwich/tests/compat/test_repository.py index a0c5b737..956eeefe 100644 --- a/dulwich/tests/compat/test_repository.py +++ b/dulwich/tests/compat/test_repository.py @@ -1,208 +1,215 @@ # test_repo.py -- Git repo compatibility tests # Copyright (C) 2010 Google, Inc. # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Compatibility tests for dulwich repositories.""" from io import BytesIO from itertools import chain -import os, tempfile +import os +import tempfile from dulwich.objects import ( hex_to_sha, ) from dulwich.repo import ( check_ref_format, Repo, ) -from dulwich.tests.utils import ( - rmtree_ro, - create_empty_commit, -) from dulwich.tests.compat.utils import ( + rmtree_ro, run_git_or_fail, CompatTestCase, - git_version -) + ) class ObjectStoreTestCase(CompatTestCase): """Tests for git repository compatibility.""" def setUp(self): super(ObjectStoreTestCase, self).setUp() self._repo = self.import_repo('server_new.export') - def repo_path(self): - return self._repo.path - def _run_git(self, args): return run_git_or_fail(args, cwd=self._repo.path) def _parse_refs(self, output): refs = {} for line in BytesIO(output): fields = line.rstrip(b'\n').split(b' ') self.assertEqual(3, len(fields)) refname, type_name, sha = fields check_ref_format(refname[5:]) hex_to_sha(sha) refs[refname] = (type_name, sha) return refs def _parse_objects(self, output): return set(s.rstrip(b'\n').split(b' ')[0] for s in BytesIO(output)) def _parse_worktree_list(self, output): worktrees = [] for line in BytesIO(output): fields = line.rstrip(b'\n').split() worktrees.append(tuple(f.decode() for f in fields)) return worktrees def test_bare(self): self.assertTrue(self._repo.bare) self.assertFalse(os.path.exists(os.path.join(self._repo.path, '.git'))) def test_head(self): output = self._run_git(['rev-parse', 'HEAD']) head_sha = output.rstrip(b'\n') hex_to_sha(head_sha) self.assertEqual(head_sha, self._repo.refs[b'HEAD']) def test_refs(self): output = self._run_git( ['for-each-ref', '--format=%(refname) %(objecttype) %(objectname)']) expected_refs = self._parse_refs(output) actual_refs = {} for refname, sha in self._repo.refs.as_dict().items(): if refname == b'HEAD': continue # handled in test_head obj = self._repo[sha] self.assertEqual(sha, obj.id) actual_refs[refname] = (obj.type_name, obj.id) self.assertEqual(expected_refs, actual_refs) # TODO(dborowitz): peeled ref tests def _get_loose_shas(self): output = self._run_git(['rev-list', '--all', '--objects', '--unpacked']) return self._parse_objects(output) def _get_all_shas(self): output = self._run_git(['rev-list', '--all', '--objects']) return self._parse_objects(output) def assertShasMatch(self, expected_shas, actual_shas_iter): actual_shas = set() for sha in actual_shas_iter: obj = self._repo[sha] self.assertEqual(sha, obj.id) actual_shas.add(sha) self.assertEqual(expected_shas, actual_shas) def test_loose_objects(self): # TODO(dborowitz): This is currently not very useful since fast-imported # repos only contained packed objects. expected_shas = self._get_loose_shas() self.assertShasMatch(expected_shas, self._repo.object_store._iter_loose_objects()) def test_packed_objects(self): expected_shas = self._get_all_shas() - self._get_loose_shas() self.assertShasMatch(expected_shas, chain(*self._repo.object_store.packs)) def test_all_objects(self): expected_shas = self._get_all_shas() self.assertShasMatch(expected_shas, iter(self._repo.object_store)) class WorkingTreeTestCase(ObjectStoreTestCase): """Test for compatibility with git-worktree.""" min_git_version = (2, 5, 0) + def create_new_worktree(self, repo_dir, branch): + """Create a new worktree using git-worktree. + + :param repo_dir: The directory of the main working tree. + :param branch: The branch or commit to checkout in the new worktree. + + :returns: The path to the new working tree. + """ + temp_dir = tempfile.mkdtemp() + run_git_or_fail(['worktree', 'add', temp_dir, branch], + cwd=repo_dir) + return temp_dir + def setUp(self): super(WorkingTreeTestCase, self).setUp() - self._worktree_path = self.create_new_worktree(self.repo_path(), 'branch') + self._worktree_path = self.create_new_worktree(self._repo.path, 'branch') self._worktree_repo = Repo(self._worktree_path) self._mainworktree_repo = self._repo self._number_of_working_tree = 2 self._repo = self._worktree_repo def tearDown(self): self._worktree_repo.close() rmtree_ro(self._worktree_path) self._repo = self._mainworktree_repo super(WorkingTreeTestCase, self).tearDown() def test_refs(self): super(WorkingTreeTestCase, self).test_refs() self.assertEqual(self._mainworktree_repo.refs.allkeys(), self._repo.refs.allkeys()) def test_head_equality(self): self.assertNotEqual(self._repo.refs[b'HEAD'], self._mainworktree_repo.refs[b'HEAD']) def test_bare(self): self.assertFalse(self._repo.bare) self.assertTrue(os.path.isfile(os.path.join(self._repo.path, '.git'))) def test_git_worktree_list(self): output = run_git_or_fail(['worktree', 'list'], cwd=self._repo.path) worktrees = self._parse_worktree_list(output) self.assertEqual(len(worktrees), self._number_of_working_tree) self.assertEqual(worktrees[0][1], '(bare)') self.assertEqual(worktrees[0][0], self._mainworktree_repo.path) - + output = run_git_or_fail(['worktree', 'list'], cwd=self._mainworktree_repo.path) worktrees = self._parse_worktree_list(output) self.assertEqual(len(worktrees), self._number_of_working_tree) self.assertEqual(worktrees[0][1], '(bare)') self.assertEqual(worktrees[0][0], self._mainworktree_repo.path) + class InitNewWorkingDirectoryTestCase(WorkingTreeTestCase): """Test compatibility of Repo.init_new_working_directory.""" min_git_version = (2, 5, 0) def setUp(self): super(InitNewWorkingDirectoryTestCase, self).setUp() self._other_worktree = self._repo - self._repo = Repo.init_new_working_directory(tempfile.mkdtemp(), - self._mainworktree_repo.path) + self._repo = Repo._init_new_working_directory( + tempfile.mkdtemp(), self._mainworktree_repo) self._number_of_working_tree = 3 - + def tearDown(self): self._repo.close() rmtree_ro(self._repo.path) - self._repo = self._other_worktree super(InitNewWorkingDirectoryTestCase, self).tearDown() def test_head_equality(self): self.assertEqual(self._repo.refs[b'HEAD'], self._mainworktree_repo.refs[b'HEAD']) def test_bare(self): self.assertFalse(self._repo.bare) self.assertTrue(os.path.isfile(os.path.join(self._repo.path, '.git'))) diff --git a/dulwich/tests/compat/utils.py b/dulwich/tests/compat/utils.py index 8ffdcc37..6fd7e30f 100644 --- a/dulwich/tests/compat/utils.py +++ b/dulwich/tests/compat/utils.py @@ -1,266 +1,254 @@ # utils.py -- Git compatibility utilities # Copyright (C) 2010 Google, Inc. # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Utilities for interacting with cgit.""" import errno +import functools import os import shutil import socket import stat import subprocess +import sys import tempfile import time from dulwich.repo import Repo from dulwich.protocol import TCP_GIT_PORT -from dulwich.tests.utils import ( - rmtree_ro, -) from dulwich.tests import ( SkipTest, TestCase, ) _DEFAULT_GIT = 'git' _VERSION_LEN = 4 _REPOS_DATA_DIR = os.path.abspath(os.path.join( os.path.dirname(__file__), os.pardir, 'data', 'repos')) def git_version(git_path=_DEFAULT_GIT): """Attempt to determine the version of git currently installed. :param git_path: Path to the git executable; defaults to the version in the system path. :return: A tuple of ints of the form (major, minor, point, sub-point), or None if no git installation was found. """ try: output = run_git_or_fail(['--version'], git_path=git_path) except OSError: return None version_prefix = b'git version ' if not output.startswith(version_prefix): return None parts = output[len(version_prefix):].split(b'.') nums = [] for part in parts: try: nums.append(int(part)) except ValueError: break while len(nums) < _VERSION_LEN: nums.append(0) return tuple(nums[:_VERSION_LEN]) def require_git_version(required_version, git_path=_DEFAULT_GIT): """Require git version >= version, or skip the calling test. :param required_version: A tuple of ints of the form (major, minor, point, sub-point); ommitted components default to 0. :param git_path: Path to the git executable; defaults to the version in the system path. :raise ValueError: if the required version tuple has too many parts. :raise SkipTest: if no suitable git version was found at the given path. """ found_version = git_version(git_path=git_path) if found_version is None: raise SkipTest('Test requires git >= %s, but c git not found' % (required_version, )) if len(required_version) > _VERSION_LEN: raise ValueError('Invalid version tuple %s, expected %i parts' % (required_version, _VERSION_LEN)) required_version = list(required_version) while len(found_version) < len(required_version): required_version.append(0) required_version = tuple(required_version) if found_version < required_version: required_version = '.'.join(map(str, required_version)) found_version = '.'.join(map(str, found_version)) raise SkipTest('Test requires git >= %s, found %s' % (required_version, found_version)) def run_git(args, git_path=_DEFAULT_GIT, input=None, capture_stdout=False, **popen_kwargs): """Run a git command. Input is piped from the input parameter and output is sent to the standard streams, unless capture_stdout is set. :param args: A list of args to the git command. :param git_path: Path to to the git executable. :param input: Input data to be sent to stdin. :param capture_stdout: Whether to capture and return stdout. :param popen_kwargs: Additional kwargs for subprocess.Popen; stdin/stdout args are ignored. :return: A tuple of (returncode, stdout contents). If capture_stdout is False, None will be returned as stdout contents. :raise OSError: if the git executable was not found. """ env = popen_kwargs.pop('env', {}) env['LC_ALL'] = env['LANG'] = 'C' args = [git_path] + args popen_kwargs['stdin'] = subprocess.PIPE if capture_stdout: popen_kwargs['stdout'] = subprocess.PIPE else: popen_kwargs.pop('stdout', None) p = subprocess.Popen(args, env=env, **popen_kwargs) stdout, stderr = p.communicate(input=input) return (p.returncode, stdout) def run_git_or_fail(args, git_path=_DEFAULT_GIT, input=None, **popen_kwargs): """Run a git command, capture stdout/stderr, and fail if git fails.""" if 'stderr' not in popen_kwargs: popen_kwargs['stderr'] = subprocess.STDOUT returncode, stdout = run_git(args, git_path=git_path, input=input, capture_stdout=True, **popen_kwargs) if returncode != 0: raise AssertionError("git with args %r failed with %d: %r" % ( args, returncode, stdout)) return stdout def import_repo_to_dir(name): """Import a repo from a fast-export file in a temporary directory. These are used rather than binary repos for compat tests because they are more compact and human-editable, and we already depend on git. :param name: The name of the repository export file, relative to dulwich/tests/data/repos. :returns: The path to the imported repository. """ temp_dir = tempfile.mkdtemp() export_path = os.path.join(_REPOS_DATA_DIR, name) temp_repo_dir = os.path.join(temp_dir, name) export_file = open(export_path, 'rb') run_git_or_fail(['init', '--quiet', '--bare', temp_repo_dir]) run_git_or_fail(['fast-import'], input=export_file.read(), cwd=temp_repo_dir) export_file.close() return temp_repo_dir def check_for_daemon(limit=10, delay=0.1, timeout=0.1, port=TCP_GIT_PORT): """Check for a running TCP daemon. Defaults to checking 10 times with a delay of 0.1 sec between tries. :param limit: Number of attempts before deciding no daemon is running. :param delay: Delay between connection attempts. :param timeout: Socket timeout for connection attempts. :param port: Port on which we expect the daemon to appear. :returns: A boolean, true if a daemon is running on the specified port, false if not. """ for _ in range(limit): time.sleep(delay) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(delay) try: s.connect(('localhost', port)) return True except socket.timeout: pass except socket.error as e: if getattr(e, 'errno', False) and e.errno != errno.ECONNREFUSED: raise elif e.args[0] != errno.ECONNREFUSED: raise finally: s.close() return False class CompatTestCase(TestCase): """Test case that requires git for compatibility checks. Subclasses can change the git version required by overriding min_git_version. """ min_git_version = (1, 5, 0) def setUp(self): super(CompatTestCase, self).setUp() require_git_version(self.min_git_version) def assertObjectStoreEqual(self, store1, store2): self.assertEqual(sorted(set(store1)), sorted(set(store2))) def assertReposEqual(self, repo1, repo2): self.assertEqual(repo1.get_refs(), repo2.get_refs()) self.assertObjectStoreEqual(repo1.object_store, repo2.object_store) def assertReposNotEqual(self, repo1, repo2): refs1 = repo1.get_refs() objs1 = set(repo1.object_store) refs2 = repo2.get_refs() objs2 = set(repo2.object_store) self.assertFalse(refs1 == refs2 and objs1 == objs2) def import_repo(self, name): """Import a repo from a fast-export file in a temporary directory. :param name: The name of the repository export file, relative to dulwich/tests/data/repos. :returns: An initialized Repo object that lives in a temporary directory. """ path = import_repo_to_dir(name) repo = Repo(path) def cleanup(): repo.close() rmtree_ro(os.path.dirname(path.rstrip(os.sep))) self.addCleanup(cleanup) return repo - def create_new_worktree(self, repo_dir, branch): - """Create a new worktree using git-worktree. - :param repo_dir: The directory of the main working tree. - :param branch: The branch or commit to checkout in the new worktree. - - :returns: The path to the new working tree. - """ - temp_dir = tempfile.mkdtemp() - run_git_or_fail(['worktree', 'add', temp_dir, branch], - cwd=repo_dir) - return temp_dir - - def debug_repo(self, s, repo): - print('{}: {}'.format(s, repo)) - print('controldir: {}'.format(repo.controldir())) - print(' commondir: {}'.format(repo.commondir())) - for r in repo.get_refs().keys(): - print('{}: {}'.format(r, repo.refs[r])) +if sys.platform == 'win32': + def remove_ro(action, name, exc): + os.chmod(name, stat.S_IWRITE) + os.remove(name) + rmtree_ro = functools.partial(shutil.rmtree, onerror=remove_ro) +else: + rmtree_ro = shutil.rmtree diff --git a/dulwich/tests/data/repos/a.git/worktrees/b/HEAD b/dulwich/tests/data/repos/a.git/worktrees/b/HEAD deleted file mode 100644 index 98d60a12..00000000 --- a/dulwich/tests/data/repos/a.git/worktrees/b/HEAD +++ /dev/null @@ -1 +0,0 @@ -2a72d929692c41d8554c07f6301757ba18a65d91 diff --git a/dulwich/tests/data/repos/a.git/worktrees/b/commondir b/dulwich/tests/data/repos/a.git/worktrees/b/commondir deleted file mode 100644 index aab0408c..00000000 --- a/dulwich/tests/data/repos/a.git/worktrees/b/commondir +++ /dev/null @@ -1 +0,0 @@ -../.. diff --git a/dulwich/tests/data/repos/a.git/worktrees/b/gitdir b/dulwich/tests/data/repos/a.git/worktrees/b/gitdir deleted file mode 100644 index 6168c8fd..00000000 --- a/dulwich/tests/data/repos/a.git/worktrees/b/gitdir +++ /dev/null @@ -1 +0,0 @@ -../../../b diff --git a/dulwich/tests/data/repos/a.git/worktrees/b/index b/dulwich/tests/data/repos/a.git/worktrees/b/index deleted file mode 100644 index 461247bd..00000000 Binary files a/dulwich/tests/data/repos/a.git/worktrees/b/index and /dev/null differ diff --git a/dulwich/tests/test_repository.py b/dulwich/tests/test_repository.py index 07e6deb2..4c4040da 100644 --- a/dulwich/tests/test_repository.py +++ b/dulwich/tests/test_repository.py @@ -1,823 +1,837 @@ # -*- coding: utf-8 -*- # test_repository.py -- tests for repository.py # Copyright (C) 2007 James Westby # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Tests for the repository.""" from contextlib import closing import locale import os import stat import shutil import sys import tempfile import warnings from dulwich import errors from dulwich.object_store import ( tree_lookup_path, ) from dulwich import objects from dulwich.config import Config from dulwich.errors import NotGitRepository from dulwich.repo import ( Repo, MemoryRepo, ) from dulwich.tests import ( TestCase, skipIf, ) from dulwich.tests.utils import ( open_repo, - rmtree_ro, tear_down_repo, setup_warning_catcher, - create_empty_commit, - create_repo_and_worktree, ) missing_sha = b'b91fa4d900e17e99b433218e988c4eb4a3e9a097' class CreateRepositoryTests(TestCase): def assertFileContentsEqual(self, expected, repo, path): f = repo.get_named_file(path) if not f: self.assertEqual(expected, None) else: with f: self.assertEqual(expected, f.read()) def _check_repo_contents(self, repo, expect_bare): self.assertEqual(expect_bare, repo.bare) self.assertFileContentsEqual(b'Unnamed repository', repo, 'description') self.assertFileContentsEqual(b'', repo, os.path.join('info', 'exclude')) self.assertFileContentsEqual(None, repo, 'nonexistent file') barestr = b'bare = ' + str(expect_bare).lower().encode('ascii') with repo.get_named_file('config') as f: config_text = f.read() self.assertTrue(barestr in config_text, "%r" % config_text) def test_create_memory(self): repo = MemoryRepo.init_bare([], {}) self._check_repo_contents(repo, True) def test_create_disk_bare(self): tmp_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, tmp_dir) repo = Repo.init_bare(tmp_dir) self.assertEqual(tmp_dir, repo._controldir) self._check_repo_contents(repo, True) def test_create_disk_non_bare(self): tmp_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, tmp_dir) repo = Repo.init(tmp_dir) self.assertEqual(os.path.join(tmp_dir, '.git'), repo._controldir) self._check_repo_contents(repo, False) class RepositoryRootTests(TestCase): def mkdtemp(self): return tempfile.mkdtemp() def open_repo(self, name): temp_dir = self.mkdtemp() repo = open_repo(name, temp_dir) self.addCleanup(tear_down_repo, repo) return repo def test_simple_props(self): r = self.open_repo('a.git') self.assertEqual(r.controldir(), r.path) def test_setitem(self): r = self.open_repo('a.git') r[b"refs/tags/foo"] = b'a90fa2d900a17e99b433217e988c4eb4a2e9a097' self.assertEqual(b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', r[b"refs/tags/foo"].id) def test_getitem_unicode(self): r = self.open_repo('a.git') test_keys = [ (b'refs/heads/master', True), (b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', True), (b'11' * 19 + b'--', False), ] for k, contained in test_keys: self.assertEqual(k in r, contained) # Avoid deprecation warning under Py3.2+ if getattr(self, 'assertRaisesRegex', None): assertRaisesRegexp = self.assertRaisesRegex else: assertRaisesRegexp = self.assertRaisesRegexp for k, _ in test_keys: assertRaisesRegexp( TypeError, "'name' must be bytestring, not int", r.__getitem__, 12 ) def test_delitem(self): r = self.open_repo('a.git') del r[b'refs/heads/master'] self.assertRaises(KeyError, lambda: r[b'refs/heads/master']) del r[b'HEAD'] self.assertRaises(KeyError, lambda: r[b'HEAD']) self.assertRaises(ValueError, r.__delitem__, b'notrefs/foo') def test_get_refs(self): r = self.open_repo('a.git') self.assertEqual({ b'HEAD': b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', b'refs/heads/master': b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', b'refs/tags/mytag': b'28237f4dc30d0d462658d6b937b08a0f0b6ef55a', b'refs/tags/mytag-packed': b'b0931cadc54336e78a1d980420e3268903b57a50', }, r.get_refs()) def test_head(self): r = self.open_repo('a.git') self.assertEqual(r.head(), b'a90fa2d900a17e99b433217e988c4eb4a2e9a097') def test_get_object(self): r = self.open_repo('a.git') obj = r.get_object(r.head()) self.assertEqual(obj.type_name, b'commit') def test_get_object_non_existant(self): r = self.open_repo('a.git') self.assertRaises(KeyError, r.get_object, missing_sha) def test_contains_object(self): r = self.open_repo('a.git') self.assertTrue(r.head() in r) def test_contains_ref(self): r = self.open_repo('a.git') self.assertTrue(b"HEAD" in r) def test_get_no_description(self): r = self.open_repo('a.git') self.assertIs(None, r.get_description()) def test_get_description(self): r = self.open_repo('a.git') with open(os.path.join(r.path, 'description'), 'wb') as f: f.write(b"Some description") self.assertEqual(b"Some description", r.get_description()) def test_set_description(self): r = self.open_repo('a.git') description = b"Some description" r.set_description(description) self.assertEqual(description, r.get_description()) def test_contains_missing(self): r = self.open_repo('a.git') self.assertFalse(b"bar" in r) def test_get_peeled(self): # unpacked ref r = self.open_repo('a.git') tag_sha = b'28237f4dc30d0d462658d6b937b08a0f0b6ef55a' self.assertNotEqual(r[tag_sha].sha().hexdigest(), r.head()) self.assertEqual(r.get_peeled(b'refs/tags/mytag'), r.head()) # packed ref with cached peeled value packed_tag_sha = b'b0931cadc54336e78a1d980420e3268903b57a50' parent_sha = r[r.head()].parents[0] self.assertNotEqual(r[packed_tag_sha].sha().hexdigest(), parent_sha) self.assertEqual(r.get_peeled(b'refs/tags/mytag-packed'), parent_sha) # TODO: add more corner cases to test repo def test_get_peeled_not_tag(self): r = self.open_repo('a.git') self.assertEqual(r.get_peeled(b'HEAD'), r.head()) def test_get_walker(self): r = self.open_repo('a.git') # include defaults to [r.head()] self.assertEqual([e.commit.id for e in r.get_walker()], [r.head(), b'2a72d929692c41d8554c07f6301757ba18a65d91']) self.assertEqual( [e.commit.id for e in r.get_walker([b'2a72d929692c41d8554c07f6301757ba18a65d91'])], [b'2a72d929692c41d8554c07f6301757ba18a65d91']) self.assertEqual( [e.commit.id for e in r.get_walker(b'2a72d929692c41d8554c07f6301757ba18a65d91')], [b'2a72d929692c41d8554c07f6301757ba18a65d91']) def test_clone(self): r = self.open_repo('a.git') tmp_dir = self.mkdtemp() self.addCleanup(shutil.rmtree, tmp_dir) with closing(r.clone(tmp_dir, mkdir=False)) as t: self.assertEqual({ b'HEAD': b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', b'refs/remotes/origin/master': b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', b'refs/heads/master': b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', b'refs/tags/mytag': b'28237f4dc30d0d462658d6b937b08a0f0b6ef55a', b'refs/tags/mytag-packed': b'b0931cadc54336e78a1d980420e3268903b57a50', }, t.refs.as_dict()) shas = [e.commit.id for e in r.get_walker()] self.assertEqual(shas, [t.head(), b'2a72d929692c41d8554c07f6301757ba18a65d91']) def test_clone_no_head(self): temp_dir = self.mkdtemp() self.addCleanup(shutil.rmtree, temp_dir) repo_dir = os.path.join(os.path.dirname(__file__), 'data', 'repos') dest_dir = os.path.join(temp_dir, 'a.git') shutil.copytree(os.path.join(repo_dir, 'a.git'), dest_dir, symlinks=True) r = Repo(dest_dir) del r.refs[b"refs/heads/master"] del r.refs[b"HEAD"] t = r.clone(os.path.join(temp_dir, 'b.git'), mkdir=True) self.assertEqual({ b'refs/tags/mytag': b'28237f4dc30d0d462658d6b937b08a0f0b6ef55a', b'refs/tags/mytag-packed': b'b0931cadc54336e78a1d980420e3268903b57a50', }, t.refs.as_dict()) def test_clone_empty(self): """Test clone() doesn't crash if HEAD points to a non-existing ref. This simulates cloning server-side bare repository either when it is still empty or if user renames master branch and pushes private repo to the server. Non-bare repo HEAD always points to an existing ref. """ r = self.open_repo('empty.git') tmp_dir = self.mkdtemp() self.addCleanup(shutil.rmtree, tmp_dir) r.clone(tmp_dir, mkdir=False, bare=True) def test_merge_history(self): r = self.open_repo('simple_merge.git') shas = [e.commit.id for e in r.get_walker()] self.assertEqual(shas, [b'5dac377bdded4c9aeb8dff595f0faeebcc8498cc', b'ab64bbdcc51b170d21588e5c5d391ee5c0c96dfd', b'4cffe90e0a41ad3f5190079d7c8f036bde29cbe6', b'60dacdc733de308bb77bb76ce0fb0f9b44c9769e', b'0d89f20333fbb1d2f3a94da77f4981373d8f4310']) def test_out_of_order_merge(self): """Test that revision history is ordered by date, not parent order.""" r = self.open_repo('ooo_merge.git') shas = [e.commit.id for e in r.get_walker()] self.assertEqual(shas, [b'7601d7f6231db6a57f7bbb79ee52e4d462fd44d1', b'f507291b64138b875c28e03469025b1ea20bc614', b'fb5b0425c7ce46959bec94d54b9a157645e114f5', b'f9e39b120c68182a4ba35349f832d0e4e61f485c']) def test_get_tags_empty(self): r = self.open_repo('ooo_merge.git') self.assertEqual({}, r.refs.as_dict(b'refs/tags')) def test_get_config(self): r = self.open_repo('ooo_merge.git') self.assertIsInstance(r.get_config(), Config) def test_get_config_stack(self): r = self.open_repo('ooo_merge.git') self.assertIsInstance(r.get_config_stack(), Config) @skipIf(not getattr(os, 'symlink', None), 'Requires symlink support') def test_submodule(self): temp_dir = self.mkdtemp() self.addCleanup(shutil.rmtree, temp_dir) repo_dir = os.path.join(os.path.dirname(__file__), 'data', 'repos') shutil.copytree(os.path.join(repo_dir, 'a.git'), os.path.join(temp_dir, 'a.git'), symlinks=True) rel = os.path.relpath(os.path.join(repo_dir, 'submodule'), temp_dir) os.symlink(os.path.join(rel, 'dotgit'), os.path.join(temp_dir, '.git')) with closing(Repo(temp_dir)) as r: self.assertEqual(r.head(), b'a90fa2d900a17e99b433217e988c4eb4a2e9a097') def test_common_revisions(self): """ This test demonstrates that ``find_common_revisions()`` actually returns common heads, not revisions; dulwich already uses ``find_common_revisions()`` in such a manner (see ``Repo.fetch_objects()``). """ expected_shas = set([b'60dacdc733de308bb77bb76ce0fb0f9b44c9769e']) # Source for objects. r_base = self.open_repo('simple_merge.git') # Re-create each-side of the merge in simple_merge.git. # # Since the trees and blobs are missing, the repository created is # corrupted, but we're only checking for commits for the purpose of this # test, so it's immaterial. r1_dir = self.mkdtemp() self.addCleanup(shutil.rmtree, r1_dir) r1_commits = [b'ab64bbdcc51b170d21588e5c5d391ee5c0c96dfd', # HEAD b'60dacdc733de308bb77bb76ce0fb0f9b44c9769e', b'0d89f20333fbb1d2f3a94da77f4981373d8f4310'] r2_dir = self.mkdtemp() self.addCleanup(shutil.rmtree, r2_dir) r2_commits = [b'4cffe90e0a41ad3f5190079d7c8f036bde29cbe6', # HEAD b'60dacdc733de308bb77bb76ce0fb0f9b44c9769e', b'0d89f20333fbb1d2f3a94da77f4981373d8f4310'] r1 = Repo.init_bare(r1_dir) for c in r1_commits: r1.object_store.add_object(r_base.get_object(c)) r1.refs[b'HEAD'] = r1_commits[0] r2 = Repo.init_bare(r2_dir) for c in r2_commits: r2.object_store.add_object(r_base.get_object(c)) r2.refs[b'HEAD'] = r2_commits[0] # Finally, the 'real' testing! shas = r2.object_store.find_common_revisions(r1.get_graph_walker()) self.assertEqual(set(shas), expected_shas) shas = r1.object_store.find_common_revisions(r2.get_graph_walker()) self.assertEqual(set(shas), expected_shas) def test_shell_hook_pre_commit(self): if os.name != 'posix': self.skipTest('shell hook tests requires POSIX shell') pre_commit_fail = """#!/bin/sh exit 1 """ pre_commit_success = """#!/bin/sh exit 0 """ repo_dir = os.path.join(self.mkdtemp()) r = Repo.init(repo_dir) self.addCleanup(shutil.rmtree, repo_dir) pre_commit = os.path.join(r.controldir(), 'hooks', 'pre-commit') with open(pre_commit, 'w') as f: f.write(pre_commit_fail) os.chmod(pre_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) self.assertRaises(errors.CommitError, r.do_commit, 'failed commit', committer='Test Committer ', author='Test Author ', commit_timestamp=12345, commit_timezone=0, author_timestamp=12345, author_timezone=0) with open(pre_commit, 'w') as f: f.write(pre_commit_success) os.chmod(pre_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) commit_sha = r.do_commit( b'empty commit', committer=b'Test Committer ', author=b'Test Author ', commit_timestamp=12395, commit_timezone=0, author_timestamp=12395, author_timezone=0) self.assertEqual([], r[commit_sha].parents) def test_shell_hook_commit_msg(self): if os.name != 'posix': self.skipTest('shell hook tests requires POSIX shell') commit_msg_fail = """#!/bin/sh exit 1 """ commit_msg_success = """#!/bin/sh exit 0 """ repo_dir = self.mkdtemp() r = Repo.init(repo_dir) self.addCleanup(shutil.rmtree, repo_dir) commit_msg = os.path.join(r.controldir(), 'hooks', 'commit-msg') with open(commit_msg, 'w') as f: f.write(commit_msg_fail) os.chmod(commit_msg, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) self.assertRaises(errors.CommitError, r.do_commit, b'failed commit', committer=b'Test Committer ', author=b'Test Author ', commit_timestamp=12345, commit_timezone=0, author_timestamp=12345, author_timezone=0) with open(commit_msg, 'w') as f: f.write(commit_msg_success) os.chmod(commit_msg, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) commit_sha = r.do_commit( b'empty commit', committer=b'Test Committer ', author=b'Test Author ', commit_timestamp=12395, commit_timezone=0, author_timestamp=12395, author_timezone=0) self.assertEqual([], r[commit_sha].parents) def test_shell_hook_post_commit(self): if os.name != 'posix': self.skipTest('shell hook tests requires POSIX shell') repo_dir = self.mkdtemp() r = Repo.init(repo_dir) self.addCleanup(shutil.rmtree, repo_dir) (fd, path) = tempfile.mkstemp(dir=repo_dir) os.close(fd) post_commit_msg = """#!/bin/sh rm """ + path + """ """ root_sha = r.do_commit( b'empty commit', committer=b'Test Committer ', author=b'Test Author ', commit_timestamp=12345, commit_timezone=0, author_timestamp=12345, author_timezone=0) self.assertEqual([], r[root_sha].parents) post_commit = os.path.join(r.controldir(), 'hooks', 'post-commit') with open(post_commit, 'wb') as f: f.write(post_commit_msg.encode(locale.getpreferredencoding())) os.chmod(post_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) commit_sha = r.do_commit( b'empty commit', committer=b'Test Committer ', author=b'Test Author ', commit_timestamp=12345, commit_timezone=0, author_timestamp=12345, author_timezone=0) self.assertEqual([root_sha], r[commit_sha].parents) self.assertFalse(os.path.exists(path)) post_commit_msg_fail = """#!/bin/sh exit 1 """ with open(post_commit, 'w') as f: f.write(post_commit_msg_fail) os.chmod(post_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) warnings.simplefilter("always", UserWarning) self.addCleanup(warnings.resetwarnings) warnings_list, restore_warnings = setup_warning_catcher() self.addCleanup(restore_warnings) commit_sha2 = r.do_commit( b'empty commit', committer=b'Test Committer ', author=b'Test Author ', commit_timestamp=12345, commit_timezone=0, author_timestamp=12345, author_timezone=0) self.assertEqual(len(warnings_list), 1, warnings_list) self.assertIsInstance(warnings_list[-1], UserWarning) self.assertTrue("post-commit hook failed: " in str(warnings_list[-1])) self.assertEqual([commit_sha], r[commit_sha2].parents) def test_as_dict(self): def check(repo): self.assertEqual(repo.refs.subkeys(b'refs/tags'), repo.refs.subkeys(b'refs/tags/')) self.assertEqual(repo.refs.as_dict(b'refs/tags'), repo.refs.as_dict(b'refs/tags/')) self.assertEqual(repo.refs.as_dict(b'refs/heads'), repo.refs.as_dict(b'refs/heads/')) bare = self.open_repo('a.git') tmp_dir = self.mkdtemp() self.addCleanup(shutil.rmtree, tmp_dir) with closing(bare.clone(tmp_dir, mkdir=False)) as nonbare: check(nonbare) check(bare) def test_working_tree(self): - (r, w) = create_repo_and_worktree() - self.addCleanup(rmtree_ro, r.path) - self.addCleanup(rmtree_ro, w.path) + temp_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, temp_dir) + worktree_temp_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, worktree_temp_dir) + r = Repo.init(temp_dir) + root_sha = r.do_commit( + b'empty commit', + committer=b'Test Committer ', + author=b'Test Author ', + commit_timestamp=12345, commit_timezone=0, + author_timestamp=12345, author_timezone=0) + r.refs[b'refs/heads/master'] = root_sha + w = Repo._init_new_working_directory(worktree_temp_dir, r) + new_sha = w.do_commit( + b'new commit', + committer=b'Test Committer ', + author=b'Test Author ', + commit_timestamp=12345, commit_timezone=0, + author_timestamp=12345, author_timezone=0) + w.refs[b'HEAD'] = new_sha self.assertEqual(os.path.abspath(r.controldir()), os.path.abspath(w.commondir())) self.assertEqual(r.refs.keys(), w.refs.keys()) self.assertNotEqual(r.head(), w.head()) - self.assertEqual(w.get_parents(w.head()), [r.head()]) + class BuildRepoRootTests(TestCase): """Tests that build on-disk repos from scratch. Repos live in a temp dir and are torn down after each test. They start with a single commit in master having single file named 'a'. """ def get_repo_dir(self): return os.path.join(tempfile.mkdtemp(), 'test') def setUp(self): super(BuildRepoRootTests, self).setUp() self._repo_dir = self.get_repo_dir() os.makedirs(self._repo_dir) r = self._repo = Repo.init(self._repo_dir) self.addCleanup(tear_down_repo, r) self.assertFalse(r.bare) self.assertEqual(b'ref: refs/heads/master', r.refs.read_ref(b'HEAD')) self.assertRaises(KeyError, lambda: r.refs[b'refs/heads/master']) with open(os.path.join(r.path, 'a'), 'wb') as f: f.write(b'file contents') r.stage(['a']) commit_sha = r.do_commit(b'msg', committer=b'Test Committer ', author=b'Test Author ', commit_timestamp=12345, commit_timezone=0, author_timestamp=12345, author_timezone=0) self.assertEqual([], r[commit_sha].parents) self._root_commit = commit_sha def test_build_repo(self): r = self._repo self.assertEqual(b'ref: refs/heads/master', r.refs.read_ref(b'HEAD')) self.assertEqual(self._root_commit, r.refs[b'refs/heads/master']) expected_blob = objects.Blob.from_string(b'file contents') self.assertEqual(expected_blob.data, r[expected_blob.id].data) actual_commit = r[self._root_commit] self.assertEqual(b'msg', actual_commit.message) def test_commit_modified(self): r = self._repo with open(os.path.join(r.path, 'a'), 'wb') as f: f.write(b'new contents') r.stage(['a']) commit_sha = r.do_commit(b'modified a', committer=b'Test Committer ', author=b'Test Author ', commit_timestamp=12395, commit_timezone=0, author_timestamp=12395, author_timezone=0) self.assertEqual([self._root_commit], r[commit_sha].parents) a_mode, a_id = tree_lookup_path(r.get_object, r[commit_sha].tree, b'a') self.assertEqual(stat.S_IFREG | 0o644, a_mode) self.assertEqual(b'new contents', r[a_id].data) @skipIf(not getattr(os, 'symlink', None), 'Requires symlink support') def test_commit_symlink(self): r = self._repo os.symlink('a', os.path.join(r.path, 'b')) r.stage(['a', 'b']) commit_sha = r.do_commit(b'Symlink b', committer=b'Test Committer ', author=b'Test Author ', commit_timestamp=12395, commit_timezone=0, author_timestamp=12395, author_timezone=0) self.assertEqual([self._root_commit], r[commit_sha].parents) b_mode, b_id = tree_lookup_path(r.get_object, r[commit_sha].tree, b'b') self.assertTrue(stat.S_ISLNK(b_mode)) self.assertEqual(b'a', r[b_id].data) def test_commit_deleted(self): r = self._repo os.remove(os.path.join(r.path, 'a')) r.stage(['a']) commit_sha = r.do_commit(b'deleted a', committer=b'Test Committer ', author=b'Test Author ', commit_timestamp=12395, commit_timezone=0, author_timestamp=12395, author_timezone=0) self.assertEqual([self._root_commit], r[commit_sha].parents) self.assertEqual([], list(r.open_index())) tree = r[r[commit_sha].tree] self.assertEqual([], list(tree.iteritems())) def test_commit_follows(self): r = self._repo r.refs.set_symbolic_ref(b'HEAD', b'refs/heads/bla') commit_sha = r.do_commit(b'commit with strange character', committer=b'Test Committer ', author=b'Test Author ', commit_timestamp=12395, commit_timezone=0, author_timestamp=12395, author_timezone=0, ref=b'HEAD') self.assertEqual(commit_sha, r[b'refs/heads/bla'].id) def test_commit_encoding(self): r = self._repo commit_sha = r.do_commit(b'commit with strange character \xee', committer=b'Test Committer ', author=b'Test Author ', commit_timestamp=12395, commit_timezone=0, author_timestamp=12395, author_timezone=0, encoding=b"iso8859-1") self.assertEqual(b"iso8859-1", r[commit_sha].encoding) def test_commit_config_identity(self): # commit falls back to the users' identity if it wasn't specified r = self._repo c = r.get_config() c.set((b"user", ), b"name", b"Jelmer") c.set((b"user", ), b"email", b"jelmer@apache.org") c.write_to_path() commit_sha = r.do_commit(b'message') self.assertEqual( b"Jelmer ", r[commit_sha].author) self.assertEqual( b"Jelmer ", r[commit_sha].committer) def test_commit_config_identity_in_memoryrepo(self): # commit falls back to the users' identity if it wasn't specified r = MemoryRepo.init_bare([], {}) c = r.get_config() c.set((b"user", ), b"name", b"Jelmer") c.set((b"user", ), b"email", b"jelmer@apache.org") commit_sha = r.do_commit(b'message', tree=objects.Tree().id) self.assertEqual( b"Jelmer ", r[commit_sha].author) self.assertEqual( b"Jelmer ", r[commit_sha].committer) def test_commit_fail_ref(self): r = self._repo def set_if_equals(name, old_ref, new_ref): return False r.refs.set_if_equals = set_if_equals def add_if_new(name, new_ref): self.fail('Unexpected call to add_if_new') r.refs.add_if_new = add_if_new old_shas = set(r.object_store) self.assertRaises(errors.CommitError, r.do_commit, b'failed commit', committer=b'Test Committer ', author=b'Test Author ', commit_timestamp=12345, commit_timezone=0, author_timestamp=12345, author_timezone=0) new_shas = set(r.object_store) - old_shas self.assertEqual(1, len(new_shas)) # Check that the new commit (now garbage) was added. new_commit = r[new_shas.pop()] self.assertEqual(r[self._root_commit].tree, new_commit.tree) self.assertEqual(b'failed commit', new_commit.message) def test_commit_branch(self): r = self._repo commit_sha = r.do_commit(b'commit to branch', committer=b'Test Committer ', author=b'Test Author ', commit_timestamp=12395, commit_timezone=0, author_timestamp=12395, author_timezone=0, ref=b"refs/heads/new_branch") self.assertEqual(self._root_commit, r[b"HEAD"].id) self.assertEqual(commit_sha, r[b"refs/heads/new_branch"].id) self.assertEqual([], r[commit_sha].parents) self.assertTrue(b"refs/heads/new_branch" in r) new_branch_head = commit_sha commit_sha = r.do_commit(b'commit to branch 2', committer=b'Test Committer ', author=b'Test Author ', commit_timestamp=12395, commit_timezone=0, author_timestamp=12395, author_timezone=0, ref=b"refs/heads/new_branch") self.assertEqual(self._root_commit, r[b"HEAD"].id) self.assertEqual(commit_sha, r[b"refs/heads/new_branch"].id) self.assertEqual([new_branch_head], r[commit_sha].parents) def test_commit_merge_heads(self): r = self._repo merge_1 = r.do_commit(b'commit to branch 2', committer=b'Test Committer ', author=b'Test Author ', commit_timestamp=12395, commit_timezone=0, author_timestamp=12395, author_timezone=0, ref=b"refs/heads/new_branch") commit_sha = r.do_commit(b'commit with merge', committer=b'Test Committer ', author=b'Test Author ', commit_timestamp=12395, commit_timezone=0, author_timestamp=12395, author_timezone=0, merge_heads=[merge_1]) self.assertEqual( [self._root_commit, merge_1], r[commit_sha].parents) def test_commit_dangling_commit(self): r = self._repo old_shas = set(r.object_store) old_refs = r.get_refs() commit_sha = r.do_commit(b'commit with no ref', committer=b'Test Committer ', author=b'Test Author ', commit_timestamp=12395, commit_timezone=0, author_timestamp=12395, author_timezone=0, ref=None) new_shas = set(r.object_store) - old_shas # New sha is added, but no new refs self.assertEqual(1, len(new_shas)) new_commit = r[new_shas.pop()] self.assertEqual(r[self._root_commit].tree, new_commit.tree) self.assertEqual([], r[commit_sha].parents) self.assertEqual(old_refs, r.get_refs()) def test_commit_dangling_commit_with_parents(self): r = self._repo old_shas = set(r.object_store) old_refs = r.get_refs() commit_sha = r.do_commit(b'commit with no ref', committer=b'Test Committer ', author=b'Test Author ', commit_timestamp=12395, commit_timezone=0, author_timestamp=12395, author_timezone=0, ref=None, merge_heads=[self._root_commit]) new_shas = set(r.object_store) - old_shas # New sha is added, but no new refs self.assertEqual(1, len(new_shas)) new_commit = r[new_shas.pop()] self.assertEqual(r[self._root_commit].tree, new_commit.tree) self.assertEqual([self._root_commit], r[commit_sha].parents) self.assertEqual(old_refs, r.get_refs()) def test_stage_deleted(self): r = self._repo os.remove(os.path.join(r.path, 'a')) r.stage(['a']) r.stage(['a']) # double-stage a deleted path def test_commit_no_encode_decode(self): r = self._repo repo_path_bytes = r.path.encode(sys.getfilesystemencoding()) encodings = ('utf8', 'latin1') names = [u'À'.encode(encoding) for encoding in encodings] for name, encoding in zip(names, encodings): full_path = os.path.join(repo_path_bytes, name) with open(full_path, 'wb') as f: f.write(encoding.encode('ascii')) # These files are break tear_down_repo, so cleanup these files # ourselves. self.addCleanup(os.remove, full_path) r.stage(names) commit_sha = r.do_commit(b'Files with different encodings', committer=b'Test Committer ', author=b'Test Author ', commit_timestamp=12395, commit_timezone=0, author_timestamp=12395, author_timezone=0, ref=None, merge_heads=[self._root_commit]) for name, encoding in zip(names, encodings): mode, id = tree_lookup_path(r.get_object, r[commit_sha].tree, name) self.assertEqual(stat.S_IFREG | 0o644, mode) self.assertEqual(encoding.encode('ascii'), r[id].data) def test_discover_intended(self): path = os.path.join(self._repo_dir, 'b/c') r = Repo.discover(path) self.assertEqual(r.head(), self._repo.head()) def test_discover_isrepo(self): r = Repo.discover(self._repo_dir) self.assertEqual(r.head(), self._repo.head()) def test_discover_notrepo(self): with self.assertRaises(NotGitRepository): Repo.discover('/') diff --git a/dulwich/tests/utils.py b/dulwich/tests/utils.py index c28aa26c..74e83fd4 100644 --- a/dulwich/tests/utils.py +++ b/dulwich/tests/utils.py @@ -1,393 +1,362 @@ # utils.py -- Test utilities for Dulwich. # Copyright (C) 2010 Google, Inc. # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Utility functions common to Dulwich tests.""" import datetime -import functools import os import shutil -import sys import tempfile import time import types import warnings from dulwich.index import ( commit_tree, ) from dulwich.objects import ( FixedSha, Commit, Tag, object_class, ) from dulwich.pack import ( OFS_DELTA, REF_DELTA, DELTA_TYPES, obj_sha, SHA1Writer, write_pack_header, write_pack_object, create_delta, ) from dulwich.repo import Repo from dulwich.tests import ( SkipTest, skipIf, ) # Plain files are very frequently used in tests, so let the mode be very short. F = 0o100644 # Shorthand mode for Files. def open_repo(name, temp_dir=None): """Open a copy of a repo in a temporary directory. Use this function for accessing repos in dulwich/tests/data/repos to avoid accidentally or intentionally modifying those repos in place. Use tear_down_repo to delete any temp files created. :param name: The name of the repository, relative to dulwich/tests/data/repos :param temp_dir: temporary directory to initialize to. If not provided, a temporary directory will be created. :returns: An initialized Repo object that lives in a temporary directory. """ if temp_dir is None: temp_dir = tempfile.mkdtemp() repo_dir = os.path.join(os.path.dirname(__file__), 'data', 'repos', name) temp_repo_dir = os.path.join(temp_dir, name) shutil.copytree(repo_dir, temp_repo_dir, symlinks=True) return Repo(temp_repo_dir) def tear_down_repo(repo): """Tear down a test repository.""" repo.close() temp_dir = os.path.dirname(repo.path.rstrip(os.sep)) shutil.rmtree(temp_dir) def make_object(cls, **attrs): """Make an object for testing and assign some members. This method creates a new subclass to allow arbitrary attribute reassignment, which is not otherwise possible with objects having __slots__. :param attrs: dict of attributes to set on the new object. :return: A newly initialized object of type cls. """ class TestObject(cls): """Class that inherits from the given class, but without __slots__. Note that classes with __slots__ can't have arbitrary attributes monkey- patched in, so this is a class that is exactly the same only with a __dict__ instead of __slots__. """ pass TestObject.__name__ = 'TestObject_' + cls.__name__ obj = TestObject() for name, value in attrs.items(): if name == 'id': # id property is read-only, so we overwrite sha instead. sha = FixedSha(value) obj.sha = lambda: sha else: setattr(obj, name, value) return obj def make_commit(**attrs): """Make a Commit object with a default set of members. :param attrs: dict of attributes to overwrite from the default values. :return: A newly initialized Commit object. """ default_time = int(time.mktime(datetime.datetime(2010, 1, 1).timetuple())) all_attrs = {'author': b'Test Author ', 'author_time': default_time, 'author_timezone': 0, 'committer': b'Test Committer ', 'commit_time': default_time, 'commit_timezone': 0, 'message': b'Test message.', 'parents': [], 'tree': b'0' * 40} all_attrs.update(attrs) return make_object(Commit, **all_attrs) def make_tag(target, **attrs): """Make a Tag object with a default set of values. :param target: object to be tagged (Commit, Blob, Tree, etc) :param attrs: dict of attributes to overwrite from the default values. :return: A newly initialized Tag object. """ target_id = target.id target_type = object_class(target.type_name) default_time = int(time.mktime(datetime.datetime(2010, 1, 1).timetuple())) all_attrs = {'tagger': b'Test Author ', 'tag_time': default_time, 'tag_timezone': 0, 'message': b'Test message.', 'object': (target_type, target_id), 'name': b'Test Tag', } all_attrs.update(attrs) return make_object(Tag, **all_attrs) def functest_builder(method, func): """Generate a test method that tests the given function.""" def do_test(self): method(self, func) return do_test def ext_functest_builder(method, func): """Generate a test method that tests the given extension function. This is intended to generate test methods that test both a pure-Python version and an extension version using common test code. The extension test will raise SkipTest if the extension is not found. Sample usage: class MyTest(TestCase); def _do_some_test(self, func_impl): self.assertEqual('foo', func_impl()) test_foo = functest_builder(_do_some_test, foo_py) test_foo_extension = ext_functest_builder(_do_some_test, _foo_c) :param method: The method to run. It must must two parameters, self and the function implementation to test. :param func: The function implementation to pass to method. """ def do_test(self): if not isinstance(func, types.BuiltinFunctionType): raise SkipTest("%s extension not found" % func) method(self, func) return do_test def build_pack(f, objects_spec, store=None): """Write test pack data from a concise spec. :param f: A file-like object to write the pack to. :param objects_spec: A list of (type_num, obj). For non-delta types, obj is the string of that object's data. For delta types, obj is a tuple of (base, data), where: * base can be either an index in objects_spec of the base for that * delta; or for a ref delta, a SHA, in which case the resulting pack * will be thin and the base will be an external ref. * data is a string of the full, non-deltified data for that object. Note that offsets/refs and deltas are computed within this function. :param store: An optional ObjectStore for looking up external refs. :return: A list of tuples in the order specified by objects_spec: (offset, type num, data, sha, CRC32) """ sf = SHA1Writer(f) num_objects = len(objects_spec) write_pack_header(sf, num_objects) full_objects = {} offsets = {} crc32s = {} while len(full_objects) < num_objects: for i, (type_num, data) in enumerate(objects_spec): if type_num not in DELTA_TYPES: full_objects[i] = (type_num, data, obj_sha(type_num, [data])) continue base, data = data if isinstance(base, int): if base not in full_objects: continue base_type_num, _, _ = full_objects[base] else: base_type_num, _ = store.get_raw(base) full_objects[i] = (base_type_num, data, obj_sha(base_type_num, [data])) for i, (type_num, obj) in enumerate(objects_spec): offset = f.tell() if type_num == OFS_DELTA: base_index, data = obj base = offset - offsets[base_index] _, base_data, _ = full_objects[base_index] obj = (base, create_delta(base_data, data)) elif type_num == REF_DELTA: base_ref, data = obj if isinstance(base_ref, int): _, base_data, base = full_objects[base_ref] else: base_type_num, base_data = store.get_raw(base_ref) base = obj_sha(base_type_num, base_data) obj = (base, create_delta(base_data, data)) crc32 = write_pack_object(sf, type_num, obj) offsets[i] = offset crc32s[i] = crc32 expected = [] for i in range(num_objects): type_num, data, sha = full_objects[i] assert len(sha) == 20 expected.append((offsets[i], type_num, data, sha, crc32s[i])) sf.write_sha() f.seek(0) return expected def build_commit_graph(object_store, commit_spec, trees=None, attrs=None): """Build a commit graph from a concise specification. Sample usage: >>> c1, c2, c3 = build_commit_graph(store, [[1], [2, 1], [3, 1, 2]]) >>> store[store[c3].parents[0]] == c1 True >>> store[store[c3].parents[1]] == c2 True If not otherwise specified, commits will refer to the empty tree and have commit times increasing in the same order as the commit spec. :param object_store: An ObjectStore to commit objects to. :param commit_spec: An iterable of iterables of ints defining the commit graph. Each entry defines one commit, and entries must be in topological order. The first element of each entry is a commit number, and the remaining elements are its parents. The commit numbers are only meaningful for the call to make_commits; since real commit objects are created, they will get created with real, opaque SHAs. :param trees: An optional dict of commit number -> tree spec for building trees for commits. The tree spec is an iterable of (path, blob, mode) or (path, blob) entries; if mode is omitted, it defaults to the normal file mode (0100644). :param attrs: A dict of commit number -> (dict of attribute -> value) for assigning additional values to the commits. :return: The list of commit objects created. :raise ValueError: If an undefined commit identifier is listed as a parent. """ if trees is None: trees = {} if attrs is None: attrs = {} commit_time = 0 nums = {} commits = [] for commit in commit_spec: commit_num = commit[0] try: parent_ids = [nums[pn] for pn in commit[1:]] except KeyError as e: missing_parent, = e.args raise ValueError('Unknown parent %i' % missing_parent) blobs = [] for entry in trees.get(commit_num, []): if len(entry) == 2: path, blob = entry entry = (path, blob, F) path, blob, mode = entry blobs.append((path, blob.id, mode)) object_store.add_object(blob) tree_id = commit_tree(object_store, blobs) commit_attrs = { 'message': ('Commit %i' % commit_num).encode('ascii'), 'parents': parent_ids, 'tree': tree_id, 'commit_time': commit_time, } commit_attrs.update(attrs.get(commit_num, {})) commit_obj = make_commit(**commit_attrs) # By default, increment the time by a lot. Out-of-order commits should # be closer together than this because their main cause is clock skew. commit_time = commit_attrs['commit_time'] + 100 nums[commit_num] = commit_obj.id object_store.add_object(commit_obj) commits.append(commit_obj) return commits def setup_warning_catcher(): """Wrap warnings.showwarning with code that records warnings.""" caught_warnings = [] original_showwarning = warnings.showwarning def custom_showwarning(*args, **kwargs): caught_warnings.append(args[0]) warnings.showwarning = custom_showwarning def restore_showwarning(): warnings.showwarning = original_showwarning return caught_warnings, restore_showwarning - -def create_empty_commit(r): - return r.do_commit( - b'empty commit', - committer=b'Test Committer ', - author=b'Test Author ', - commit_timestamp=12345, commit_timezone=0, - author_timestamp=12345, author_timezone=0) - -def create_repo_and_worktree(): - temp_dir = tempfile.mkdtemp() - worktree_temp_dir = tempfile.mkdtemp() - r = Repo.init(temp_dir) - root_sha = create_empty_commit(r) - second_sha = create_empty_commit(r) - r.refs[b'refs/heads/master'] = second_sha - w = Repo.init_new_working_directory(worktree_temp_dir, temp_dir) - new_sha = create_empty_commit(w) - w.refs[b'refs/heads/branch'] = new_sha - return (r, w) - -if sys.platform == 'win32': - def remove_ro(action, name, exc): - os.chmod(name, stat.S_IWRITE) - os.remove(name) - - rmtree_ro = functools.partial(shutil.rmtree, onerror=remove_ro) -else: - rmtree_ro = shutil.rmtree