diff --git a/dulwich/objects.py b/dulwich/objects.py index 9e3a0e80..661ddf58 100644 --- a/dulwich/objects.py +++ b/dulwich/objects.py @@ -1,1258 +1,1257 @@ # objects.py -- Access to base git objects # Copyright (C) 2007 James Westby # Copyright (C) 2008-2013 Jelmer Vernooij # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; version 2 # of the License or (at your option) a later version of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, # MA 02110-1301, USA. """Access to base git objects.""" import binascii from io import BytesIO from collections import namedtuple import os import posixpath import stat -import sys import warnings import zlib from hashlib import sha1 from dulwich.errors import ( ChecksumMismatch, NotBlobError, NotCommitError, NotTagError, NotTreeError, ObjectFormatException, ) from dulwich.file import GitFile ZERO_SHA = b'0' * 40 # Header fields for commits _TREE_HEADER = b'tree' _PARENT_HEADER = b'parent' _AUTHOR_HEADER = b'author' _COMMITTER_HEADER = b'committer' _ENCODING_HEADER = b'encoding' _MERGETAG_HEADER = b'mergetag' _GPGSIG_HEADER = b'gpgsig' # Header fields for objects _OBJECT_HEADER = b'object' _TYPE_HEADER = b'type' _TAG_HEADER = b'tag' _TAGGER_HEADER = b'tagger' S_IFGITLINK = 0o160000 def S_ISGITLINK(m): """Check if a mode indicates a submodule. :param m: Mode to check :return: a ``boolean`` """ return (stat.S_IFMT(m) == S_IFGITLINK) def _decompress(string): dcomp = zlib.decompressobj() dcomped = dcomp.decompress(string) dcomped += dcomp.flush() return dcomped def sha_to_hex(sha): """Takes a string and returns the hex of the sha within""" hexsha = binascii.hexlify(sha) assert len(hexsha) == 40, "Incorrect length of sha1 string: %d" % hexsha return hexsha def hex_to_sha(hex): """Takes a hex sha and returns a binary sha""" assert len(hex) == 40, "Incorrect length of hexsha: %s" % hex try: return binascii.unhexlify(hex) except TypeError as exc: if not isinstance(hex, bytes): raise raise ValueError(exc.args[0]) def valid_hexsha(hex): if len(hex) != 40: return False try: binascii.unhexlify(hex) except (TypeError, binascii.Error): return False else: return True def hex_to_filename(path, hex): """Takes a hex sha and returns its filename relative to the given path.""" # os.path.join accepts bytes or unicode, but all args must be of the same # type. Make sure that hex which is expected to be bytes, is the same type # as path. if getattr(path, 'encode', None) is not None: hex = hex.decode('ascii') dir = hex[:2] file = hex[2:] # Check from object dir return os.path.join(path, dir, file) def filename_to_hex(filename): """Takes an object filename and returns its corresponding hex sha.""" # grab the last (up to) two path components names = filename.rsplit(os.path.sep, 2)[-2:] errmsg = "Invalid object filename: %s" % filename assert len(names) == 2, errmsg base, rest = names assert len(base) == 2 and len(rest) == 38, errmsg hex = (base + rest).encode('ascii') hex_to_sha(hex) return hex def object_header(num_type, length): """Return an object header for the given numeric type and text length.""" return object_class(num_type).type_name + b' ' + str(length).encode('ascii') + b'\0' def serializable_property(name, docstring=None): """A property that helps tracking whether serialization is necessary. """ def set(obj, value): obj._ensure_parsed() setattr(obj, "_"+name, value) obj._needs_serialization = True def get(obj): obj._ensure_parsed() return getattr(obj, "_"+name) return property(get, set, doc=docstring) def object_class(type): """Get the object class corresponding to the given type. :param type: Either a type name string or a numeric type. :return: The ShaFile subclass corresponding to the given type, or None if type is not a valid type name/number. """ return _TYPE_MAP.get(type, None) def check_hexsha(hex, error_msg): """Check if a string is a valid hex sha string. :param hex: Hex string to check :param error_msg: Error message to use in exception :raise ObjectFormatException: Raised when the string is not valid """ if not valid_hexsha(hex): raise ObjectFormatException("%s %s" % (error_msg, hex)) def check_identity(identity, error_msg): """Check if the specified identity is valid. This will raise an exception if the identity is not valid. :param identity: Identity string :param error_msg: Error message to use in exception """ email_start = identity.find(b'<') email_end = identity.find(b'>') if (email_start < 0 or email_end < 0 or email_end <= email_start or identity.find(b'<', email_start + 1) >= 0 or identity.find(b'>', email_end + 1) >= 0 or not identity.endswith(b'>')): raise ObjectFormatException(error_msg) def git_line(*items): """Formats items into a space sepreated line.""" return b' '.join(items) + b'\n' class FixedSha(object): """SHA object that behaves like hashlib's but is given a fixed value.""" __slots__ = ('_hexsha', '_sha') def __init__(self, hexsha): if getattr(hexsha, 'encode', None) is not None: hexsha = hexsha.encode('ascii') if not isinstance(hexsha, bytes): raise TypeError('Expected bytes for hexsha, got %r' % hexsha) self._hexsha = hexsha self._sha = hex_to_sha(hexsha) def digest(self): """Return the raw SHA digest.""" return self._sha def hexdigest(self): """Return the hex SHA digest.""" return self._hexsha.decode('ascii') class ShaFile(object): """A git SHA file.""" __slots__ = ('_needs_parsing', '_chunked_text', '_sha', '_needs_serialization') @staticmethod def _parse_legacy_object_header(magic, f): """Parse a legacy object, creating it but not reading the file.""" bufsize = 1024 decomp = zlib.decompressobj() header = decomp.decompress(magic) start = 0 end = -1 while end < 0: extra = f.read(bufsize) header += decomp.decompress(extra) magic += extra end = header.find(b'\0', start) start = len(header) header = header[:end] type_name, size = header.split(b' ', 1) size = int(size) # sanity check obj_class = object_class(type_name) if not obj_class: raise ObjectFormatException("Not a known type: %s" % type_name) return obj_class() def _parse_legacy_object(self, map): """Parse a legacy object, setting the raw string.""" text = _decompress(map) header_end = text.find(b'\0') if header_end < 0: raise ObjectFormatException("Invalid object header, no \\0") self.set_raw_string(text[header_end+1:]) def as_legacy_object_chunks(self): """Return chunks representing the object in the experimental format. :return: List of strings """ compobj = zlib.compressobj() yield compobj.compress(self._header()) for chunk in self.as_raw_chunks(): yield compobj.compress(chunk) yield compobj.flush() def as_legacy_object(self): """Return string representing the object in the experimental format. """ return b''.join(self.as_legacy_object_chunks()) def as_raw_chunks(self): """Return chunks with serialization of the object. :return: List of strings, not necessarily one per line """ if self._needs_parsing: self._ensure_parsed() elif self._needs_serialization: self._chunked_text = self._serialize() self._needs_serialization = False return self._chunked_text def as_raw_string(self): """Return raw string with serialization of the object. :return: String object """ return b''.join(self.as_raw_chunks()) def __str__(self): """Return raw string serialization of this object.""" return self.as_raw_string() def __hash__(self): """Return unique hash for this object.""" return hash(self.id) def as_pretty_string(self): """Return a string representing this object, fit for display.""" return self.as_raw_string() def _ensure_parsed(self): if self._needs_parsing: if not self._chunked_text: raise AssertionError("ShaFile needs chunked text") self._deserialize(self._chunked_text) self._needs_parsing = False def set_raw_string(self, text, sha=None): """Set the contents of this object from a serialized string.""" if not isinstance(text, bytes): raise TypeError('Expected bytes for text, got %r' % text) self.set_raw_chunks([text], sha) def set_raw_chunks(self, chunks, sha=None): """Set the contents of this object from a list of chunks.""" self._chunked_text = chunks self._deserialize(chunks) if sha is None: self._sha = None else: self._sha = FixedSha(sha) self._needs_parsing = False self._needs_serialization = False @staticmethod def _parse_object_header(magic, f): """Parse a new style object, creating it but not reading the file.""" num_type = (ord(magic[0:1]) >> 4) & 7 obj_class = object_class(num_type) if not obj_class: raise ObjectFormatException("Not a known type %d" % num_type) return obj_class() def _parse_object(self, map): """Parse a new style object, setting self._text.""" # skip type and size; type must have already been determined, and # we trust zlib to fail if it's otherwise corrupted byte = ord(map[0:1]) used = 1 while (byte & 0x80) != 0: byte = ord(map[used:used+1]) used += 1 raw = map[used:] self.set_raw_string(_decompress(raw)) @classmethod def _is_legacy_object(cls, magic): b0 = ord(magic[0:1]) b1 = ord(magic[1:2]) word = (b0 << 8) + b1 return (b0 & 0x8F) == 0x08 and (word % 31) == 0 @classmethod def _parse_file(cls, f): map = f.read() if cls._is_legacy_object(map): obj = cls._parse_legacy_object_header(map, f) obj._parse_legacy_object(map) else: obj = cls._parse_object_header(map, f) obj._parse_object(map) return obj def __init__(self): """Don't call this directly""" self._sha = None self._chunked_text = [] self._needs_parsing = False self._needs_serialization = True def _deserialize(self, chunks): raise NotImplementedError(self._deserialize) def _serialize(self): raise NotImplementedError(self._serialize) @classmethod def from_path(cls, path): """Open a SHA file from disk.""" with GitFile(path, 'rb') as f: return cls.from_file(f) @classmethod def from_file(cls, f): """Get the contents of a SHA file on disk.""" try: obj = cls._parse_file(f) obj._sha = None return obj except (IndexError, ValueError): raise ObjectFormatException("invalid object header") @staticmethod def from_raw_string(type_num, string, sha=None): """Creates an object of the indicated type from the raw string given. :param type_num: The numeric type of the object. :param string: The raw uncompressed contents. :param sha: Optional known sha for the object """ obj = object_class(type_num)() obj.set_raw_string(string, sha) return obj @staticmethod def from_raw_chunks(type_num, chunks, sha=None): """Creates an object of the indicated type from the raw chunks given. :param type_num: The numeric type of the object. :param chunks: An iterable of the raw uncompressed contents. :param sha: Optional known sha for the object """ obj = object_class(type_num)() obj.set_raw_chunks(chunks, sha) return obj @classmethod def from_string(cls, string): """Create a ShaFile from a string.""" obj = cls() obj.set_raw_string(string) return obj def _check_has_member(self, member, error_msg): """Check that the object has a given member variable. :param member: the member variable to check for :param error_msg: the message for an error if the member is missing :raise ObjectFormatException: with the given error_msg if member is missing or is None """ if getattr(self, member, None) is None: raise ObjectFormatException(error_msg) def check(self): """Check this object for internal consistency. :raise ObjectFormatException: if the object is malformed in some way :raise ChecksumMismatch: if the object was created with a SHA that does not match its contents """ # TODO: if we find that error-checking during object parsing is a # performance bottleneck, those checks should be moved to the class's # check() method during optimization so we can still check the object # when necessary. old_sha = self.id try: self._deserialize(self.as_raw_chunks()) self._sha = None new_sha = self.id except Exception as e: raise ObjectFormatException(e) if old_sha != new_sha: raise ChecksumMismatch(new_sha, old_sha) def _header(self): return object_header(self.type, self.raw_length()) def raw_length(self): """Returns the length of the raw string of this object.""" ret = 0 for chunk in self.as_raw_chunks(): ret += len(chunk) return ret def _make_sha(self): ret = sha1() ret.update(self._header()) for chunk in self.as_raw_chunks(): ret.update(chunk) return ret def sha(self): """The SHA1 object that is the name of this object.""" if self._sha is None or self._needs_serialization: # this is a local because as_raw_chunks() overwrites self._sha new_sha = sha1() new_sha.update(self._header()) for chunk in self.as_raw_chunks(): new_sha.update(chunk) self._sha = new_sha return self._sha def copy(self): """Create a new copy of this SHA1 object from its raw string""" obj_class = object_class(self.get_type()) return obj_class.from_raw_string( self.get_type(), self.as_raw_string(), self.id) @property def id(self): """The hex SHA of this object.""" return self.sha().hexdigest().encode('ascii') def get_type(self): """Return the type number for this object class.""" return self.type_num def set_type(self, type): """Set the type number for this object class.""" self.type_num = type # DEPRECATED: use type_num or type_name as needed. type = property(get_type, set_type) def __repr__(self): return "<%s %s>" % (self.__class__.__name__, self.id) def __ne__(self, other): return not isinstance(other, ShaFile) or self.id != other.id def __eq__(self, other): """Return True if the SHAs of the two objects match. It doesn't make sense to talk about an order on ShaFiles, so we don't override the rich comparison methods (__le__, etc.). """ return isinstance(other, ShaFile) and self.id == other.id class Blob(ShaFile): """A Git Blob object.""" __slots__ = () type_name = b'blob' type_num = 3 def __init__(self): super(Blob, self).__init__() self._chunked_text = [] self._needs_parsing = False self._needs_serialization = False def _get_data(self): return self.as_raw_string() def _set_data(self, data): self.set_raw_string(data) data = property(_get_data, _set_data, "The text contained within the blob object.") def _get_chunked(self): self._ensure_parsed() return self._chunked_text def _set_chunked(self, chunks): self._chunked_text = chunks def _serialize(self): if not self._chunked_text: self._ensure_parsed() return self._chunked_text def _deserialize(self, chunks): self._chunked_text = chunks chunked = property(_get_chunked, _set_chunked, "The text within the blob object, as chunks (not necessarily lines).") @classmethod def from_path(cls, path): blob = ShaFile.from_path(path) if not isinstance(blob, cls): raise NotBlobError(path) return blob def check(self): """Check this object for internal consistency. :raise ObjectFormatException: if the object is malformed in some way """ super(Blob, self).check() def _parse_message(chunks): """Parse a message with a list of fields and a body. :param chunks: the raw chunks of the tag or commit object. :return: iterator of tuples of (field, value), one per header line, in the order read from the text, possibly including duplicates. Includes a field named None for the freeform tag/commit text. """ f = BytesIO(b''.join(chunks)) k = None v = "" for l in f: if l.startswith(b' '): v += l[1:] else: if k is not None: yield (k, v.rstrip(b'\n')) if l == b'\n': # Empty line indicates end of headers break (k, v) = l.split(b' ', 1) yield (None, f.read()) f.close() class Tag(ShaFile): """A Git Tag object.""" type_name = b'tag' type_num = 4 __slots__ = ('_tag_timezone_neg_utc', '_name', '_object_sha', '_object_class', '_tag_time', '_tag_timezone', '_tagger', '_message') def __init__(self): super(Tag, self).__init__() self._tag_timezone_neg_utc = False @classmethod def from_path(cls, filename): tag = ShaFile.from_path(filename) if not isinstance(tag, cls): raise NotTagError(filename) return tag def check(self): """Check this object for internal consistency. :raise ObjectFormatException: if the object is malformed in some way """ super(Tag, self).check() self._check_has_member("_object_sha", "missing object sha") self._check_has_member("_object_class", "missing object type") self._check_has_member("_name", "missing tag name") if not self._name: raise ObjectFormatException("empty tag name") check_hexsha(self._object_sha, "invalid object sha") if getattr(self, "_tagger", None): check_identity(self._tagger, "invalid tagger") last = None for field, _ in _parse_message(self._chunked_text): if field == _OBJECT_HEADER and last is not None: raise ObjectFormatException("unexpected object") elif field == _TYPE_HEADER and last != _OBJECT_HEADER: raise ObjectFormatException("unexpected type") elif field == _TAG_HEADER and last != _TYPE_HEADER: raise ObjectFormatException("unexpected tag name") elif field == _TAGGER_HEADER and last != _TAG_HEADER: raise ObjectFormatException("unexpected tagger") last = field def _serialize(self): chunks = [] chunks.append(git_line(_OBJECT_HEADER, self._object_sha)) chunks.append(git_line(_TYPE_HEADER, self._object_class.type_name)) chunks.append(git_line(_TAG_HEADER, self._name)) if self._tagger: if self._tag_time is None: chunks.append(git_line(_TAGGER_HEADER, self._tagger)) else: chunks.append(git_line( _TAGGER_HEADER, self._tagger, str(self._tag_time).encode('ascii'), format_timezone(self._tag_timezone, self._tag_timezone_neg_utc))) chunks.append(b'\n') # To close headers chunks.append(self._message) return chunks def _deserialize(self, chunks): """Grab the metadata attached to the tag""" self._tagger = None for field, value in _parse_message(chunks): if field == _OBJECT_HEADER: self._object_sha = value elif field == _TYPE_HEADER: obj_class = object_class(value) if not obj_class: raise ObjectFormatException("Not a known type: %s" % value) self._object_class = obj_class elif field == _TAG_HEADER: self._name = value elif field == _TAGGER_HEADER: try: sep = value.index(b'> ') except ValueError: self._tagger = value self._tag_time = None self._tag_timezone = None self._tag_timezone_neg_utc = False else: self._tagger = value[0:sep+1] try: (timetext, timezonetext) = value[sep+2:].rsplit(b' ', 1) self._tag_time = int(timetext) self._tag_timezone, self._tag_timezone_neg_utc = \ parse_timezone(timezonetext) except ValueError as e: raise ObjectFormatException(e) elif field is None: self._message = value else: raise ObjectFormatException("Unknown field %s" % field) def _get_object(self): """Get the object pointed to by this tag. :return: tuple of (object class, sha). """ self._ensure_parsed() return (self._object_class, self._object_sha) def _set_object(self, value): self._ensure_parsed() (self._object_class, self._object_sha) = value self._needs_serialization = True object = property(_get_object, _set_object) name = serializable_property("name", "The name of this tag") tagger = serializable_property("tagger", "Returns the name of the person who created this tag") tag_time = serializable_property("tag_time", "The creation timestamp of the tag. As the number of seconds " "since the epoch") tag_timezone = serializable_property("tag_timezone", "The timezone that tag_time is in.") message = serializable_property( "message", "The message attached to this tag") class TreeEntry(namedtuple('TreeEntry', ['path', 'mode', 'sha'])): """Named tuple encapsulating a single tree entry.""" def in_path(self, path): """Return a copy of this entry with the given path prepended.""" if not isinstance(self.path, bytes): raise TypeError('Expected bytes for path, got %r' % path) return TreeEntry(posixpath.join(path, self.path), self.mode, self.sha) def parse_tree(text, strict=False): """Parse a tree text. :param text: Serialized text to parse :return: iterator of tuples of (name, mode, sha) :raise ObjectFormatException: if the object was malformed in some way """ count = 0 l = len(text) while count < l: mode_end = text.index(b' ', count) mode_text = text[count:mode_end] if strict and mode_text.startswith(b'0'): raise ObjectFormatException("Invalid mode '%s'" % mode_text) try: mode = int(mode_text, 8) except ValueError: raise ObjectFormatException("Invalid mode '%s'" % mode_text) name_end = text.index(b'\0', mode_end) name = text[mode_end+1:name_end] count = name_end+21 sha = text[name_end+1:count] if len(sha) != 20: raise ObjectFormatException("Sha has invalid length") hexsha = sha_to_hex(sha) yield (name, mode, hexsha) def serialize_tree(items): """Serialize the items in a tree to a text. :param items: Sorted iterable over (name, mode, sha) tuples :return: Serialized tree text as chunks """ for name, mode, hexsha in items: yield ("%04o" % mode).encode('ascii') + b' ' + name + b'\0' + hex_to_sha(hexsha) def sorted_tree_items(entries, name_order): """Iterate over a tree entries dictionary. :param name_order: If True, iterate entries in order of their name. If False, iterate entries in tree order, that is, treat subtree entries as having '/' appended. :param entries: Dictionary mapping names to (mode, sha) tuples :return: Iterator over (name, mode, hexsha) """ key_func = name_order and key_entry_name_order or key_entry for name, entry in sorted(entries.items(), key=key_func): mode, hexsha = entry # Stricter type checks than normal to mirror checks in the C version. mode = int(mode) if not isinstance(hexsha, bytes): raise TypeError('Expected bytes for SHA, got %r' % hexsha) yield TreeEntry(name, mode, hexsha) def key_entry(entry): """Sort key for tree entry. :param entry: (name, value) tuplee """ (name, value) = entry if stat.S_ISDIR(value[0]): name += b'/' return name def key_entry_name_order(entry): """Sort key for tree entry in name order.""" return entry[0] class Tree(ShaFile): """A Git tree object""" type_name = b'tree' type_num = 2 __slots__ = ('_entries') def __init__(self): super(Tree, self).__init__() self._entries = {} @classmethod def from_path(cls, filename): tree = ShaFile.from_path(filename) if not isinstance(tree, cls): raise NotTreeError(filename) return tree def __contains__(self, name): self._ensure_parsed() return name in self._entries def __getitem__(self, name): self._ensure_parsed() return self._entries[name] def __setitem__(self, name, value): """Set a tree entry by name. :param name: The name of the entry, as a string. :param value: A tuple of (mode, hexsha), where mode is the mode of the entry as an integral type and hexsha is the hex SHA of the entry as a string. """ mode, hexsha = value self._ensure_parsed() self._entries[name] = (mode, hexsha) self._needs_serialization = True def __delitem__(self, name): self._ensure_parsed() del self._entries[name] self._needs_serialization = True def __len__(self): self._ensure_parsed() return len(self._entries) def __iter__(self): self._ensure_parsed() return iter(self._entries) def add(self, name, mode, hexsha): """Add an entry to the tree. :param mode: The mode of the entry as an integral type. Not all possible modes are supported by git; see check() for details. :param name: The name of the entry, as a string. :param hexsha: The hex SHA of the entry as a string. """ if isinstance(name, int) and isinstance(mode, bytes): (name, mode) = (mode, name) warnings.warn( "Please use Tree.add(name, mode, hexsha)", category=DeprecationWarning, stacklevel=2) self._ensure_parsed() self._entries[name] = mode, hexsha self._needs_serialization = True def iteritems(self, name_order=False): """Iterate over entries. :param name_order: If True, iterate in name order instead of tree order. :return: Iterator over (name, mode, sha) tuples """ self._ensure_parsed() return sorted_tree_items(self._entries, name_order) def items(self): """Return the sorted entries in this tree. :return: List with (name, mode, sha) tuples """ return list(self.iteritems()) def _deserialize(self, chunks): """Grab the entries in the tree""" try: parsed_entries = parse_tree(b''.join(chunks)) except ValueError as e: raise ObjectFormatException(e) # TODO: list comprehension is for efficiency in the common (small) # case; if memory efficiency in the large case is a concern, use a genexp. self._entries = dict([(n, (m, s)) for n, m, s in parsed_entries]) def check(self): """Check this object for internal consistency. :raise ObjectFormatException: if the object is malformed in some way """ super(Tree, self).check() last = None allowed_modes = (stat.S_IFREG | 0o755, stat.S_IFREG | 0o644, stat.S_IFLNK, stat.S_IFDIR, S_IFGITLINK, # TODO: optionally exclude as in git fsck --strict stat.S_IFREG | 0o664) for name, mode, sha in parse_tree(b''.join(self._chunked_text), True): check_hexsha(sha, 'invalid sha %s' % sha) if b'/' in name or name in (b'', b'.', b'..'): raise ObjectFormatException('invalid name %s' % name) if mode not in allowed_modes: raise ObjectFormatException('invalid mode %06o' % mode) entry = (name, (mode, sha)) if last: if key_entry(last) > key_entry(entry): raise ObjectFormatException('entries not sorted') if name == last[0]: raise ObjectFormatException('duplicate entry %s' % name) last = entry def _serialize(self): return list(serialize_tree(self.iteritems())) def as_pretty_string(self): text = [] for name, mode, hexsha in self.iteritems(): if mode & stat.S_IFDIR: kind = "tree" else: kind = "blob" text.append("%04o %s %s\t%s\n" % (mode, kind, hexsha, name)) return "".join(text) def lookup_path(self, lookup_obj, path): """Look up an object in a Git tree. :param lookup_obj: Callback for retrieving object by SHA1 :param path: Path to lookup :return: A tuple of (mode, SHA) of the resulting path. """ parts = path.split(b'/') sha = self.id mode = None for p in parts: if not p: continue obj = lookup_obj(sha) if not isinstance(obj, Tree): raise NotTreeError(sha) mode, sha = obj[p] return mode, sha def parse_timezone(text): """Parse a timezone text fragment (e.g. '+0100'). :param text: Text to parse. :return: Tuple with timezone as seconds difference to UTC and a boolean indicating whether this was a UTC timezone prefixed with a negative sign (-0000). """ # cgit parses the first character as the sign, and the rest # as an integer (using strtol), which could also be negative. # We do the same for compatibility. See #697828. if not text[0] in b'+-': raise ValueError("Timezone must start with + or - (%(text)s)" % vars()) sign = text[:1] offset = int(text[1:]) if sign == b'-': offset = -offset unnecessary_negative_timezone = (offset >= 0 and sign == b'-') signum = (offset < 0) and -1 or 1 offset = abs(offset) hours = int(offset / 100) minutes = (offset % 100) return (signum * (hours * 3600 + minutes * 60), unnecessary_negative_timezone) def format_timezone(offset, unnecessary_negative_timezone=False): """Format a timezone for Git serialization. :param offset: Timezone offset as seconds difference to UTC :param unnecessary_negative_timezone: Whether to use a minus sign for UTC or positive timezones (-0000 and --700 rather than +0000 / +0700). """ if offset % 60 != 0: raise ValueError("Unable to handle non-minute offset.") if offset < 0 or unnecessary_negative_timezone: sign = '-' offset = -offset else: sign = '+' return ('%c%02d%02d' % (sign, offset / 3600, (offset / 60) % 60)).encode('ascii') def parse_commit(chunks): """Parse a commit object from chunks. :param chunks: Chunks to parse :return: Tuple of (tree, parents, author_info, commit_info, encoding, mergetag, gpgsig, message, extra) """ parents = [] extra = [] tree = None author_info = (None, None, (None, None)) commit_info = (None, None, (None, None)) encoding = None mergetag = [] message = None gpgsig = None for field, value in _parse_message(chunks): # TODO(jelmer): Enforce ordering if field == _TREE_HEADER: tree = value elif field == _PARENT_HEADER: parents.append(value) elif field == _AUTHOR_HEADER: author, timetext, timezonetext = value.rsplit(b' ', 2) author_time = int(timetext) author_info = (author, author_time, parse_timezone(timezonetext)) elif field == _COMMITTER_HEADER: committer, timetext, timezonetext = value.rsplit(b' ', 2) commit_time = int(timetext) commit_info = (committer, commit_time, parse_timezone(timezonetext)) elif field == _ENCODING_HEADER: encoding = value elif field == _MERGETAG_HEADER: mergetag.append(Tag.from_string(value + b'\n')) elif field == _GPGSIG_HEADER: gpgsig = value elif field is None: message = value else: extra.append((field, value)) return (tree, parents, author_info, commit_info, encoding, mergetag, gpgsig, message, extra) class Commit(ShaFile): """A git commit object""" type_name = b'commit' type_num = 1 __slots__ = ('_parents', '_encoding', '_extra', '_author_timezone_neg_utc', '_commit_timezone_neg_utc', '_commit_time', '_author_time', '_author_timezone', '_commit_timezone', '_author', '_committer', '_parents', '_extra', '_encoding', '_tree', '_message', '_mergetag', '_gpgsig') def __init__(self): super(Commit, self).__init__() self._parents = [] self._encoding = None self._mergetag = [] self._gpgsig = None self._extra = [] self._author_timezone_neg_utc = False self._commit_timezone_neg_utc = False @classmethod def from_path(cls, path): commit = ShaFile.from_path(path) if not isinstance(commit, cls): raise NotCommitError(path) return commit def _deserialize(self, chunks): (self._tree, self._parents, author_info, commit_info, self._encoding, self._mergetag, self._gpgsig, self._message, self._extra) = ( parse_commit(chunks)) (self._author, self._author_time, (self._author_timezone, self._author_timezone_neg_utc)) = author_info (self._committer, self._commit_time, (self._commit_timezone, self._commit_timezone_neg_utc)) = commit_info def check(self): """Check this object for internal consistency. :raise ObjectFormatException: if the object is malformed in some way """ super(Commit, self).check() self._check_has_member("_tree", "missing tree") self._check_has_member("_author", "missing author") self._check_has_member("_committer", "missing committer") # times are currently checked when set for parent in self._parents: check_hexsha(parent, "invalid parent sha") check_hexsha(self._tree, "invalid tree sha") check_identity(self._author, "invalid author") check_identity(self._committer, "invalid committer") last = None for field, _ in _parse_message(self._chunked_text): if field == _TREE_HEADER and last is not None: raise ObjectFormatException("unexpected tree") elif field == _PARENT_HEADER and last not in (_PARENT_HEADER, _TREE_HEADER): raise ObjectFormatException("unexpected parent") elif field == _AUTHOR_HEADER and last not in (_TREE_HEADER, _PARENT_HEADER): raise ObjectFormatException("unexpected author") elif field == _COMMITTER_HEADER and last != _AUTHOR_HEADER: raise ObjectFormatException("unexpected committer") elif field == _ENCODING_HEADER and last != _COMMITTER_HEADER: raise ObjectFormatException("unexpected encoding") last = field # TODO: optionally check for duplicate parents def _serialize(self): chunks = [] tree_bytes = self._tree.as_raw_string() if isinstance(self._tree, Tree) else self._tree chunks.append(git_line(_TREE_HEADER, tree_bytes)) for p in self._parents: chunks.append(git_line(_PARENT_HEADER, p)) chunks.append(git_line( _AUTHOR_HEADER, self._author, str(self._author_time).encode('ascii'), format_timezone(self._author_timezone, self._author_timezone_neg_utc))) chunks.append(git_line( _COMMITTER_HEADER, self._committer, str(self._commit_time).encode('ascii'), format_timezone(self._commit_timezone, self._commit_timezone_neg_utc))) if self.encoding: chunks.append(git_line(_ENCODING_HEADER, self.encoding)) for mergetag in self.mergetag: mergetag_chunks = mergetag.as_raw_string().split(b'\n') chunks.append(git_line(_MERGETAG_HEADER, mergetag_chunks[0])) # Embedded extra header needs leading space for chunk in mergetag_chunks[1:]: chunks.append(b' ' + chunk + b'\n') # No trailing empty line chunks[-1] = chunks[-1].rstrip(b' \n') for k, v in self.extra: if b'\n' in k or b'\n' in v: raise AssertionError( "newline in extra data: %r -> %r" % (k, v)) chunks.append(git_line(k, v)) if self.gpgsig: sig_chunks = self.gpgsig.split(b'\n') chunks.append(git_line(_GPGSIG_HEADER, sig_chunks[0])) for chunk in sig_chunks[1:]: chunks.append(git_line(b'', chunk)) chunks.append(b'\n') # There must be a new line after the headers chunks.append(self._message) return chunks tree = serializable_property( "tree", "Tree that is the state of this commit") def _get_parents(self): """Return a list of parents of this commit.""" self._ensure_parsed() return self._parents def _set_parents(self, value): """Set a list of parents of this commit.""" self._ensure_parsed() self._needs_serialization = True self._parents = value parents = property(_get_parents, _set_parents, doc="Parents of this commit, by their SHA1.") def _get_extra(self): """Return extra settings of this commit.""" self._ensure_parsed() return self._extra extra = property(_get_extra, doc="Extra header fields not understood (presumably added in a " "newer version of git). Kept verbatim so the object can " "be correctly reserialized. For private commit metadata, use " "pseudo-headers in Commit.message, rather than this field.") author = serializable_property("author", "The name of the author of the commit") committer = serializable_property("committer", "The name of the committer of the commit") message = serializable_property( "message", "The commit message") commit_time = serializable_property("commit_time", "The timestamp of the commit. As the number of seconds since the epoch.") commit_timezone = serializable_property("commit_timezone", "The zone the commit time is in") author_time = serializable_property("author_time", "The timestamp the commit was written. As the number of " "seconds since the epoch.") author_timezone = serializable_property( "author_timezone", "Returns the zone the author time is in.") encoding = serializable_property( "encoding", "Encoding of the commit message.") mergetag = serializable_property( "mergetag", "Associated signed tag.") gpgsig = serializable_property( "gpgsig", "GPG Signature.") OBJECT_CLASSES = ( Commit, Tree, Blob, Tag, ) _TYPE_MAP = {} for cls in OBJECT_CLASSES: _TYPE_MAP[cls.type_name] = cls _TYPE_MAP[cls.type_num] = cls # Hold on to the pure-python implementations for testing _parse_tree_py = parse_tree _sorted_tree_items_py = sorted_tree_items try: # Try to import C versions from dulwich._objects import parse_tree, sorted_tree_items except ImportError: pass diff --git a/dulwich/porcelain.py b/dulwich/porcelain.py index 5fe63009..f13beab4 100644 --- a/dulwich/porcelain.py +++ b/dulwich/porcelain.py @@ -1,770 +1,789 @@ # porcelain.py -- Porcelain-like layer on top of Dulwich # Copyright (C) 2013 Jelmer Vernooij # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # or (at your option) a later version of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, # MA 02110-1301, USA. """Simple wrapper that provides porcelain-like functions on top of Dulwich. Currently implemented: * archive * add * branch{_create,_delete,_list} * clone * commit * commit-tree * daemon * diff-tree * fetch * init * ls-remote * pull * push * rm * receive-pack * reset * rev-list * tag{_create,_delete,_list} * upload-pack * update-server-info * status * symbolic-ref These functions are meant to behave similarly to the git subcommands. Differences in behaviour are considered bugs. """ __docformat__ = 'restructuredText' from collections import namedtuple from contextlib import ( closing, contextmanager, ) import os import sys import time from dulwich.client import ( get_transport_and_path, SubprocessGitClient, ) from dulwich.errors import ( SendPackError, UpdateRefsError, ) from dulwich.index import get_unstaged_changes from dulwich.objects import ( Tag, parse_timezone, ) -from dulwich.objectspec import parse_object +from dulwich.objectspec import ( + parse_object, + parse_reftuples, + ) from dulwich.patch import write_tree_diff from dulwich.protocol import Protocol from dulwich.repo import (BaseRepo, Repo) from dulwich.server import ( FileSystemBackend, TCPGitServer, ReceivePackHandler, UploadPackHandler, update_server_info as server_update_server_info, ) # Module level tuple definition for status output GitStatus = namedtuple('GitStatus', 'staged unstaged untracked') def open_repo(path_or_repo): """Open an argument that can be a repository or a path for a repository.""" if isinstance(path_or_repo, BaseRepo): return path_or_repo return Repo(path_or_repo) @contextmanager def _noop_context_manager(obj): """Context manager that has the same api as closing but does nothing.""" yield obj def open_repo_closing(path_or_repo): """Open an argument that can be a repository or a path for a repository. returns a context manager that will close the repo on exit if the argument is a path, else does nothing if the argument is a repo. """ if isinstance(path_or_repo, BaseRepo): return _noop_context_manager(path_or_repo) return closing(Repo(path_or_repo)) def archive(path, committish=None, outstream=sys.stdout, errstream=sys.stderr): """Create an archive. :param path: Path of repository for which to generate an archive. :param committish: Commit SHA1 or ref to use :param outstream: Output stream (defaults to stdout) :param errstream: Error stream (defaults to stderr) """ client = SubprocessGitClient() if committish is None: committish = "HEAD" # TODO(jelmer): This invokes C git; this introduces a dependency. # Instead, dulwich should have its own archiver implementation. client.archive(path, committish, outstream.write, errstream.write, errstream.write) def update_server_info(repo="."): """Update server info files for a repository. :param repo: path to the repository """ with open_repo_closing(repo) as r: server_update_server_info(r) def symbolic_ref(repo, ref_name, force=False): """Set git symbolic ref into HEAD. :param repo: path to the repository :param ref_name: short name of the new ref :param force: force settings without checking if it exists in refs/heads """ with open_repo_closing(repo) as repo_obj: ref_path = b'refs/heads/' + ref_name if not force and ref_path not in repo_obj.refs.keys(): raise ValueError('fatal: ref `%s` is not a ref' % ref_name) repo_obj.refs.set_symbolic_ref(b'HEAD', ref_path) def commit(repo=".", message=None, author=None, committer=None): """Create a new commit. :param repo: Path to repository :param message: Optional commit message :param author: Optional author name and email :param committer: Optional committer name and email :return: SHA1 of the new commit """ # FIXME: Support --all argument # FIXME: Support --signoff argument with open_repo_closing(repo) as r: return r.do_commit(message=message, author=author, committer=committer) def commit_tree(repo, tree, message=None, author=None, committer=None): """Create a new commit object. :param repo: Path to repository :param tree: An existing tree object :param author: Optional author name and email :param committer: Optional committer name and email """ with open_repo_closing(repo) as r: return r.do_commit(message=message, tree=tree, committer=committer, author=author) def init(path=".", bare=False): """Create a new git repository. :param path: Path to repository. :param bare: Whether to create a bare repository. :return: A Repo instance """ if not os.path.exists(path): os.mkdir(path) if bare: return Repo.init_bare(path) else: return Repo.init(path) def clone(source, target=None, bare=False, checkout=None, errstream=sys.stdout, outstream=None): """Clone a local or remote git repository. :param source: Path or URL for source repository :param target: Path to target repository (optional) :param bare: Whether or not to create a bare repository :param errstream: Optional stream to write progress to :param outstream: Optional stream to write progress to (deprecated) :return: The new repository """ if outstream is not None: import warnings warnings.warn("outstream= has been deprecated in favour of errstream=.", DeprecationWarning, stacklevel=3) errstream = outstream if checkout is None: checkout = (not bare) if checkout and bare: raise ValueError("checkout and bare are incompatible") client, host_path = get_transport_and_path(source) if target is None: target = host_path.split("/")[-1] if not os.path.exists(target): os.mkdir(target) if bare: r = Repo.init_bare(target) else: r = Repo.init(target) try: remote_refs = client.fetch(host_path, r, determine_wants=r.object_store.determine_wants_all, progress=errstream.write) r[b"HEAD"] = remote_refs[b"HEAD"] if checkout: errstream.write(b'Checking out HEAD') r.reset_index() except: r.close() raise return r def add(repo=".", paths=None): """Add files to the staging area. :param repo: Repository for the files :param paths: Paths to add. No value passed stages all modified files. """ # FIXME: Support patterns, directories. with open_repo_closing(repo) as r: if not paths: # If nothing is specified, add all non-ignored files. paths = [] for dirpath, dirnames, filenames in os.walk(r.path): # Skip .git and below. if '.git' in dirnames: dirnames.remove('.git') for filename in filenames: paths.append(os.path.join(dirpath[len(r.path)+1:], filename)) r.stage(paths) def rm(repo=".", paths=None): """Remove files from the staging area. :param repo: Repository for the files :param paths: Paths to remove """ with open_repo_closing(repo) as r: index = r.open_index() for p in paths: del index[p.encode(sys.getfilesystemencoding())] index.write() def commit_decode(commit, contents): if commit.encoding is not None: return contents.decode(commit.encoding, "replace") return contents.decode("utf-8", "replace") def print_commit(commit, outstream=sys.stdout): """Write a human-readable commit log entry. :param commit: A `Commit` object :param outstream: A stream file to write to """ outstream.write(b"-" * 50 + b"\n") outstream.write(b"commit: " + commit.id + b"\n") if len(commit.parents) > 1: outstream.write(b"merge: " + b"...".join(commit.parents[1:]) + b"\n") outstream.write(b"author: " + commit.author + b"\n") outstream.write(b"committer: " + commit.committer + b"\n") outstream.write(b"\n") outstream.write(commit.message + b"\n") outstream.write(b"\n") def print_tag(tag, outstream=sys.stdout): """Write a human-readable tag. :param tag: A `Tag` object :param outstream: A stream to write to """ outstream.write(b"Tagger: " + tag.tagger + b"\n") outstream.write(b"Date: " + tag.tag_time + b"\n") outstream.write(b"\n") outstream.write(tag.message + b"\n") outstream.write(b"\n") def show_blob(repo, blob, outstream=sys.stdout): """Write a blob to a stream. :param repo: A `Repo` object :param blob: A `Blob` object :param outstream: A stream file to write to """ outstream.write(blob.data) def show_commit(repo, commit, outstream=sys.stdout): """Show a commit to a stream. :param repo: A `Repo` object :param commit: A `Commit` object :param outstream: Stream to write to """ print_commit(commit, outstream) parent_commit = repo[commit.parents[0]] write_tree_diff(outstream, repo.object_store, parent_commit.tree, commit.tree) def show_tree(repo, tree, outstream=sys.stdout): """Print a tree to a stream. :param repo: A `Repo` object :param tree: A `Tree` object :param outstream: Stream to write to """ for n in tree: outstream.write("%s\n" % n) def show_tag(repo, tag, outstream=sys.stdout): """Print a tag to a stream. :param repo: A `Repo` object :param tag: A `Tag` object :param outstream: Stream to write to """ print_tag(tag, outstream) show_object(repo, repo[tag.object[1]], outstream) def show_object(repo, obj, outstream): return { b"tree": show_tree, b"blob": show_blob, b"commit": show_commit, b"tag": show_tag, }[obj.type_name](repo, obj, outstream) def log(repo=".", outstream=sys.stdout, max_entries=None): """Write commit logs. :param repo: Path to repository :param outstream: Stream to write log output to :param max_entries: Optional maximum number of entries to display """ with open_repo_closing(repo) as r: walker = r.get_walker(max_entries=max_entries) for entry in walker: print_commit(entry.commit, outstream) def show(repo=".", objects=None, outstream=sys.stdout): """Print the changes in a commit. :param repo: Path to repository :param objects: Objects to show (defaults to [HEAD]) :param outstream: Stream to write to """ if objects is None: objects = ["HEAD"] if not isinstance(objects, list): objects = [objects] with open_repo_closing(repo) as r: for objectish in objects: show_object(r, parse_object(r, objectish), outstream) def diff_tree(repo, old_tree, new_tree, outstream=sys.stdout): """Compares the content and mode of blobs found via two tree objects. :param repo: Path to repository :param old_tree: Id of old tree :param new_tree: Id of new tree :param outstream: Stream to write to """ with open_repo_closing(repo) as r: write_tree_diff(outstream, r.object_store, old_tree, new_tree) def rev_list(repo, commits, outstream=sys.stdout): """Lists commit objects in reverse chronological order. :param repo: Path to repository :param commits: Commits over which to iterate :param outstream: Stream to write to """ with open_repo_closing(repo) as r: for entry in r.get_walker(include=[r[c].id for c in commits]): outstream.write(entry.commit.id + b"\n") def tag(*args, **kwargs): import warnings warnings.warn("tag has been deprecated in favour of tag_create.", DeprecationWarning) return tag_create(*args, **kwargs) def tag_create(repo, tag, author=None, message=None, annotated=False, objectish="HEAD", tag_time=None, tag_timezone=None): """Creates a tag in git via dulwich calls: :param repo: Path to repository :param tag: tag string :param author: tag author (optional, if annotated is set) :param message: tag message (optional) :param annotated: whether to create an annotated tag :param objectish: object the tag should point at, defaults to HEAD :param tag_time: Optional time for annotated tag :param tag_timezone: Optional timezone for annotated tag """ with open_repo_closing(repo) as r: object = parse_object(r, objectish) if annotated: # Create the tag object tag_obj = Tag() if author is None: # TODO(jelmer): Don't use repo private method. author = r._get_user_identity() tag_obj.tagger = author tag_obj.message = message tag_obj.name = tag tag_obj.object = (type(object), object.id) tag_obj.tag_time = tag_time if tag_time is None: tag_time = int(time.time()) if tag_timezone is None: # TODO(jelmer) Use current user timezone rather than UTC tag_timezone = 0 elif isinstance(tag_timezone, str): tag_timezone = parse_timezone(tag_timezone) tag_obj.tag_timezone = tag_timezone r.object_store.add_object(tag_obj) tag_id = tag_obj.id else: tag_id = object.id r.refs[b'refs/tags/' + tag] = tag_id def list_tags(*args, **kwargs): import warnings warnings.warn("list_tags has been deprecated in favour of tag_list.", DeprecationWarning) return tag_list(*args, **kwargs) def tag_list(repo, outstream=sys.stdout): """List all tags. :param repo: Path to repository :param outstream: Stream to write tags to """ with open_repo_closing(repo) as r: tags = list(r.refs.as_dict(b"refs/tags")) tags.sort() return tags def tag_delete(repo, name): """Remove a tag. :param repo: Path to repository :param name: Name of tag to remove """ with open_repo_closing(repo) as r: if isinstance(name, bytes): names = [name] elif isinstance(name, list): names = name else: raise TypeError("Unexpected tag name type %r" % name) for name in names: del r.refs[b"refs/tags/" + name] def reset(repo, mode, committish="HEAD"): """Reset current HEAD to the specified state. :param repo: Path to repository :param mode: Mode ("hard", "soft", "mixed") """ if mode != "hard": raise ValueError("hard is the only mode currently supported") with open_repo_closing(repo) as r: tree = r[committish].tree r.reset_index() -def push(repo, remote_location, refspec, +def push(repo, remote_location, refspecs=None, outstream=sys.stdout, errstream=sys.stderr): """Remote push with dulwich via dulwich.client :param repo: Path to repository :param remote_location: Location of the remote - :param refspec: relative path to the refs to push to remote + :param refspecs: relative path to the refs to push to remote :param outstream: A stream file to write output :param errstream: A stream file to write errors """ # Open the repo with open_repo_closing(repo) as r: # Get the client and path client, path = get_transport_and_path(remote_location) + selected_refs = [] + def update_refs(refs): - new_refs = r.get_refs() - refs[refspec] = new_refs[b'HEAD'] - del new_refs[b'HEAD'] + selected_refs.extend(parse_reftuples(r.refs, refs, refspecs)) + # TODO: Handle selected_refs == {None: None} + for (lh, rh, force) in selected_refs: + if lh is None: + del refs[rh] + else: + refs[rh] = r.refs[lh] return refs err_encoding = getattr(errstream, 'encoding', 'utf-8') remote_location_bytes = remote_location.encode(err_encoding) try: client.send_pack(path, update_refs, r.object_store.generate_pack_contents, progress=errstream.write) - errstream.write(b"Push to " + remote_location_bytes + b" successful.\n") + errstream.write(b"Push to " + remote_location_bytes + + b" successful.\n") except (UpdateRefsError, SendPackError) as e: - errstream.write(b"Push to " + remote_location_bytes + b" failed -> " + e.message.encode(err_encoding) + b"\n") + errstream.write(b"Push to " + remote_location_bytes + + b" failed -> " + e.message.encode(err_encoding) + + b"\n") -def pull(repo, remote_location, refspec, +def pull(repo, remote_location, refspecs=None, outstream=sys.stdout, errstream=sys.stderr): """Pull from remote via dulwich.client :param repo: Path to repository :param remote_location: Location of the remote - :param refspec: relative path to the fetched refs + :param refspec: refspecs to fetch :param outstream: A stream file to write to output :param errstream: A stream file to write to errors """ - # Open the repo with open_repo_closing(repo) as r: + selected_refs = [] + def determine_wants(remote_refs): + selected_refs.extend(parse_reftuples(remote_refs, r.refs, refspecs)) + return [remote_refs[lh] for (lh, rh, force) in selected_refs] client, path = get_transport_and_path(remote_location) - remote_refs = client.fetch(path, r, progress=errstream.write) - r[b'HEAD'] = remote_refs[refspec] + remote_refs = client.fetch(path, r, progress=errstream.write, + determine_wants=determine_wants) + for (lh, rh, force) in selected_refs: + r.refs[rh] = remote_refs[lh] + if selected_refs: + r[b'HEAD'] = remote_refs[selected_refs[0][1]] # Perform 'git checkout .' - syncs staged changes tree = r[b"HEAD"].tree r.reset_index() def status(repo="."): """Returns staged, unstaged, and untracked changes relative to the HEAD. :param repo: Path to repository or repository object :return: GitStatus tuple, staged - list of staged paths (diff index/HEAD) unstaged - list of unstaged paths (diff index/working-tree) untracked - list of untracked, un-ignored & non-.git paths """ with open_repo_closing(repo) as r: # 1. Get status of staged tracked_changes = get_tree_changes(r) # 2. Get status of unstaged unstaged_changes = list(get_unstaged_changes(r.open_index(), r.path)) # TODO - Status of untracked - add untracked changes, need gitignore. untracked_changes = [] return GitStatus(tracked_changes, unstaged_changes, untracked_changes) def get_tree_changes(repo): """Return add/delete/modify changes to tree by comparing index to HEAD. :param repo: repo path or object :return: dict with lists for each type of change """ with open_repo_closing(repo) as r: index = r.open_index() # Compares the Index to the HEAD & determines changes # Iterate through the changes and report add/delete/modify # TODO: call out to dulwich.diff_tree somehow. tracked_changes = { 'add': [], 'delete': [], 'modify': [], } for change in index.changes_from_tree(r.object_store, r[b'HEAD'].tree): if not change[0][0]: tracked_changes['add'].append(change[0][1]) elif not change[0][1]: tracked_changes['delete'].append(change[0][0]) elif change[0][0] == change[0][1]: tracked_changes['modify'].append(change[0][0]) else: raise AssertionError('git mv ops not yet supported') return tracked_changes def daemon(path=".", address=None, port=None): """Run a daemon serving Git requests over TCP/IP. :param path: Path to the directory to serve. :param address: Optional address to listen on (defaults to ::) :param port: Optional port to listen on (defaults to TCP_GIT_PORT) """ # TODO(jelmer): Support git-daemon-export-ok and --export-all. backend = FileSystemBackend(path) server = TCPGitServer(backend, address, port) server.serve_forever() def web_daemon(path=".", address=None, port=None): """Run a daemon serving Git requests over HTTP. :param path: Path to the directory to serve :param address: Optional address to listen on (defaults to ::) :param port: Optional port to listen on (defaults to 80) """ from dulwich.web import ( make_wsgi_chain, make_server, WSGIRequestHandlerLogger, WSGIServerLogger) backend = FileSystemBackend(path) app = make_wsgi_chain(backend) server = make_server(address, port, app, handler_class=WSGIRequestHandlerLogger, server_class=WSGIServerLogger) server.serve_forever() def upload_pack(path=".", inf=None, outf=None): """Upload a pack file after negotiating its contents using smart protocol. :param path: Path to the repository :param inf: Input stream to communicate with client :param outf: Output stream to communicate with client """ if outf is None: outf = getattr(sys.stdout, 'buffer', sys.stdout) if inf is None: inf = getattr(sys.stdin, 'buffer', sys.stdin) backend = FileSystemBackend(path) def send_fn(data): outf.write(data) outf.flush() proto = Protocol(inf.read, send_fn) handler = UploadPackHandler(backend, [path], proto) # FIXME: Catch exceptions and write a single-line summary to outf. handler.handle() return 0 def receive_pack(path=".", inf=None, outf=None): """Receive a pack file after negotiating its contents using smart protocol. :param path: Path to the repository :param inf: Input stream to communicate with client :param outf: Output stream to communicate with client """ if outf is None: outf = getattr(sys.stdout, 'buffer', sys.stdout) if inf is None: inf = getattr(sys.stdin, 'buffer', sys.stdin) backend = FileSystemBackend(path) def send_fn(data): outf.write(data) outf.flush() proto = Protocol(inf.read, send_fn) handler = ReceivePackHandler(backend, [path], proto) # FIXME: Catch exceptions and write a single-line summary to outf. handler.handle() return 0 def branch_delete(repo, name): """Delete a branch. :param repo: Path to the repository :param name: Name of the branch """ with open_repo_closing(repo) as r: if isinstance(name, bytes): names = [name] elif isinstance(name, list): names = name else: raise TypeError("Unexpected branch name type %r" % name) for name in names: del r.refs[b"refs/heads/" + name] def branch_create(repo, name, objectish=None, force=False): """Create a branch. :param repo: Path to the repository :param name: Name of the new branch :param objectish: Target object to point new branch at (defaults to HEAD) :param force: Force creation of branch, even if it already exists """ with open_repo_closing(repo) as r: if isinstance(name, bytes): names = [name] elif isinstance(name, list): names = name else: raise TypeError("Unexpected branch name type %r" % name) if objectish is None: objectish = "HEAD" object = parse_object(r, objectish) refname = b"refs/heads/" + name if refname in r.refs and not force: raise KeyError("Branch with name %s already exists." % name) r.refs[refname] = object.id def branch_list(repo): """List all branches. :param repo: Path to the repository """ with open_repo_closing(repo) as r: return r.refs.keys(base=b"refs/heads/") def fetch(repo, remote_location, outstream=sys.stdout, errstream=sys.stderr): """Fetch objects from a remote server. :param repo: Path to the repository :param remote_location: String identifying a remote server :param outstream: Output stream (defaults to stdout) :param errstream: Error stream (defaults to stderr) :return: Dictionary with refs on the remote """ with open_repo_closing(repo) as r: client, path = get_transport_and_path(remote_location) remote_refs = client.fetch(path, r, progress=errstream.write) return remote_refs def ls_remote(remote): client, host_path = get_transport_and_path(remote) return client.get_refs(host_path) diff --git a/dulwich/repo.py b/dulwich/repo.py index 32eab1d4..8851fbdf 100644 --- a/dulwich/repo.py +++ b/dulwich/repo.py @@ -1,1009 +1,1009 @@ # repo.py -- For dealing with git repositories. # Copyright (C) 2007 James Westby # Copyright (C) 2008-2013 Jelmer Vernooij # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; version 2 # of the License or (at your option) any later version of # the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, # MA 02110-1301, USA. """Repository access. This module contains the base class for git repositories (BaseRepo) and an implementation which uses a repository on local disk (Repo). """ from io import BytesIO import errno import os import sys from dulwich.errors import ( NoIndexPresent, NotBlobError, NotCommitError, NotGitRepository, NotTreeError, NotTagError, CommitError, RefFormatError, HookError, ) from dulwich.file import ( GitFile, ) from dulwich.object_store import ( DiskObjectStore, MemoryObjectStore, ObjectStoreGraphWalker, ) from dulwich.objects import ( check_hexsha, Blob, Commit, ShaFile, Tag, Tree, ) from dulwich.hooks import ( PreCommitShellHook, PostCommitShellHook, CommitMsgShellHook, ) from dulwich.refs import ( check_ref_format, RefsContainer, DictRefsContainer, InfoRefsContainer, DiskRefsContainer, read_packed_refs, read_packed_refs_with_peeled, write_packed_refs, SYMREF, ) import warnings OBJECTDIR = 'objects' REFSDIR = 'refs' REFSDIR_TAGS = 'tags' REFSDIR_HEADS = 'heads' INDEX_FILENAME = "index" BASE_DIRECTORIES = [ ["branches"], [REFSDIR], [REFSDIR, REFSDIR_TAGS], [REFSDIR, REFSDIR_HEADS], ["hooks"], ["info"] ] def parse_graftpoints(graftpoints): """Convert a list of graftpoints into a dict :param graftpoints: Iterator of graftpoint lines Each line is formatted as: []* Resulting dictionary is: : [*] https://git.wiki.kernel.org/index.php/GraftPoint """ grafts = {} for l in graftpoints: raw_graft = l.split(None, 1) commit = raw_graft[0] if len(raw_graft) == 2: parents = raw_graft[1].split() else: parents = [] for sha in [commit] + parents: check_hexsha(sha, 'Invalid graftpoint') grafts[commit] = parents return grafts def serialize_graftpoints(graftpoints): """Convert a dictionary of grafts into string The graft dictionary is: : [*] Each line is formatted as: []* https://git.wiki.kernel.org/index.php/GraftPoint """ graft_lines = [] for commit, parents in graftpoints.items(): if parents: graft_lines.append(commit + b' ' + b' '.join(parents)) else: graft_lines.append(commit) return b'\n'.join(graft_lines) class BaseRepo(object): """Base class for a git repository. :ivar object_store: Dictionary-like object for accessing the objects :ivar refs: Dictionary-like object with the refs in this repository """ def __init__(self, object_store, refs): """Open a repository. This shouldn't be called directly, but rather through one of the base classes, such as MemoryRepo or Repo. :param object_store: Object store to use :param refs: Refs container to use """ self.object_store = object_store self.refs = refs self._graftpoints = {} self.hooks = {} def _init_files(self, bare): """Initialize a default set of named files.""" from dulwich.config import ConfigFile self._put_named_file('description', b"Unnamed repository") f = BytesIO() cf = ConfigFile() cf.set(b"core", b"repositoryformatversion", b"0") cf.set(b"core", b"filemode", b"true") cf.set(b"core", b"bare", bare) cf.set(b"core", b"logallrefupdates", True) cf.write_to_file(f) self._put_named_file('config', f.getvalue()) self._put_named_file(os.path.join('info', 'exclude'), b'') def get_named_file(self, path): """Get a file from the control dir with a specific name. Although the filename should be interpreted as a filename relative to the control dir in a disk-based Repo, the object returned need not be pointing to a file in that location. :param path: The path to the file, relative to the control dir. :return: An open file object, or None if the file does not exist. """ raise NotImplementedError(self.get_named_file) def _put_named_file(self, path, contents): """Write a file to the control dir with the given name and contents. :param path: The path to the file, relative to the control dir. :param contents: A string to write to the file. """ raise NotImplementedError(self._put_named_file) def open_index(self): """Open the index for this repository. :raise NoIndexPresent: If no index is present :return: The matching `Index` """ raise NotImplementedError(self.open_index) def fetch(self, target, determine_wants=None, progress=None): """Fetch objects into another repository. :param target: The target repository :param determine_wants: Optional function to determine what refs to fetch. :param progress: Optional progress function :return: The local refs """ if determine_wants is None: determine_wants = target.object_store.determine_wants_all target.object_store.add_objects( - self.fetch_objects(determine_wants, target.get_graph_walker(), - progress)) + self.fetch_objects(determine_wants, target.get_graph_walker(), + progress)) return self.get_refs() def fetch_objects(self, determine_wants, graph_walker, progress, get_tagged=None): """Fetch the missing objects required for a set of revisions. :param determine_wants: Function that takes a dictionary with heads and returns the list of heads to fetch. :param graph_walker: Object that can iterate over the list of revisions to fetch and has an "ack" method that will be called to acknowledge that a revision is present. :param progress: Simple progress function that will be called with updated progress strings. :param get_tagged: Function that returns a dict of pointed-to sha -> tag sha for including tags. :return: iterator over objects, with __len__ implemented """ wants = determine_wants(self.get_refs()) if not isinstance(wants, list): raise TypeError("determine_wants() did not return a list") shallows = getattr(graph_walker, 'shallow', frozenset()) unshallows = getattr(graph_walker, 'unshallow', frozenset()) if wants == []: # TODO(dborowitz): find a way to short-circuit that doesn't change # this interface. if shallows or unshallows: # Do not send a pack in shallow short-circuit path return None return [] # If the graph walker is set up with an implementation that can # ACK/NAK to the wire, it will write data to the client through # this call as a side-effect. haves = self.object_store.find_common_revisions(graph_walker) # Deal with shallow requests separately because the haves do # not reflect what objects are missing if shallows or unshallows: haves = [] # TODO: filter the haves commits from iter_shas. # the specific commits aren't missing. def get_parents(commit): if commit.id in shallows: return [] return self.get_parents(commit.id, commit) return self.object_store.iter_shas( self.object_store.find_missing_objects( haves, wants, progress, get_tagged, get_parents=get_parents)) def get_graph_walker(self, heads=None): """Retrieve a graph walker. A graph walker is used by a remote repository (or proxy) to find out which objects are present in this repository. :param heads: Repository heads to use (optional) :return: A graph walker object """ if heads is None: heads = self.refs.as_dict(b'refs/heads').values() return ObjectStoreGraphWalker(heads, self.get_parents) def get_refs(self): """Get dictionary with all refs. :return: A ``dict`` mapping ref names to SHA1s """ return self.refs.as_dict() def head(self): """Return the SHA1 pointed at by HEAD.""" return self.refs[b'HEAD'] def _get_object(self, sha, cls): assert len(sha) in (20, 40) ret = self.get_object(sha) if not isinstance(ret, cls): if cls is Commit: raise NotCommitError(ret) elif cls is Blob: raise NotBlobError(ret) elif cls is Tree: raise NotTreeError(ret) elif cls is Tag: raise NotTagError(ret) else: raise Exception("Type invalid: %r != %r" % ( ret.type_name, cls.type_name)) return ret def get_object(self, sha): """Retrieve the object with the specified SHA. :param sha: SHA to retrieve :return: A ShaFile object :raise KeyError: when the object can not be found """ return self.object_store[sha] def get_parents(self, sha, commit=None): """Retrieve the parents of a specific commit. If the specific commit is a graftpoint, the graft parents will be returned instead. :param sha: SHA of the commit for which to retrieve the parents :param commit: Optional commit matching the sha :return: List of parents """ try: return self._graftpoints[sha] except KeyError: if commit is None: commit = self[sha] return commit.parents def get_config(self): """Retrieve the config object. :return: `ConfigFile` object for the ``.git/config`` file. """ raise NotImplementedError(self.get_config) def get_description(self): """Retrieve the description for this repository. :return: String with the description of the repository as set by the user. """ raise NotImplementedError(self.get_description) def set_description(self, description): """Set the description for this repository. :param description: Text to set as description for this repository. """ raise NotImplementedError(self.set_description) def get_config_stack(self): """Return a config stack for this repository. This stack accesses the configuration for both this repository itself (.git/config) and the global configuration, which usually lives in ~/.gitconfig. :return: `Config` instance for this repository """ from dulwich.config import StackedConfig backends = [self.get_config()] + StackedConfig.default_backends() return StackedConfig(backends, writable=backends[0]) def get_peeled(self, ref): """Get the peeled value of a ref. :param ref: The refname to peel. :return: The fully-peeled SHA1 of a tag object, after peeling all intermediate tags; if the original ref does not point to a tag, this will equal the original SHA1. """ cached = self.refs.get_peeled(ref) if cached is not None: return cached return self.object_store.peel_sha(self.refs[ref]).id def get_walker(self, include=None, *args, **kwargs): """Obtain a walker for this repository. :param include: Iterable of SHAs of commits to include along with their ancestors. Defaults to [HEAD] :param exclude: Iterable of SHAs of commits to exclude along with their ancestors, overriding includes. :param order: ORDER_* constant specifying the order of results. Anything other than ORDER_DATE may result in O(n) memory usage. :param reverse: If True, reverse the order of output, requiring O(n) memory. :param max_entries: The maximum number of entries to yield, or None for no limit. :param paths: Iterable of file or subtree paths to show entries for. :param rename_detector: diff.RenameDetector object for detecting renames. :param follow: If True, follow path across renames/copies. Forces a default rename_detector. :param since: Timestamp to list commits after. :param until: Timestamp to list commits before. :param queue_cls: A class to use for a queue of commits, supporting the iterator protocol. The constructor takes a single argument, the Walker. :return: A `Walker` object """ from dulwich.walk import Walker if include is None: include = [self.head()] if isinstance(include, str): include = [include] kwargs['get_parents'] = lambda commit: self.get_parents(commit.id, commit) return Walker(self.object_store, include, *args, **kwargs) def __getitem__(self, name): """Retrieve a Git object by SHA1 or ref. :param name: A Git object SHA1 or a ref name :return: A `ShaFile` object, such as a Commit or Blob :raise KeyError: when the specified ref or object does not exist """ if not isinstance(name, bytes): raise TypeError("'name' must be bytestring, not %.80s" % type(name).__name__) if len(name) in (20, 40): try: return self.object_store[name] except (KeyError, ValueError): pass try: return self.object_store[self.refs[name]] except RefFormatError: raise KeyError(name) def __contains__(self, name): """Check if a specific Git object or ref is present. :param name: Git object SHA1 or ref name """ if len(name) in (20, 40): return name in self.object_store or name in self.refs else: return name in self.refs def __setitem__(self, name, value): """Set a ref. :param name: ref name :param value: Ref value - either a ShaFile object, or a hex sha """ if name.startswith(b"refs/") or name == b'HEAD': if isinstance(value, ShaFile): self.refs[name] = value.id elif isinstance(value, bytes): self.refs[name] = value else: raise TypeError(value) else: raise ValueError(name) def __delitem__(self, name): """Remove a ref. :param name: Name of the ref to remove """ if name.startswith(b"refs/") or name == b"HEAD": del self.refs[name] else: raise ValueError(name) def _get_user_identity(self): """Determine the identity to use for new commits. """ config = self.get_config_stack() return (config.get((b"user", ), b"name") + b" <" + config.get((b"user", ), b"email") + b">") def _add_graftpoints(self, updated_graftpoints): """Add or modify graftpoints :param updated_graftpoints: Dict of commit shas to list of parent shas """ # Simple validation for commit, parents in updated_graftpoints.items(): for sha in [commit] + parents: check_hexsha(sha, 'Invalid graftpoint') self._graftpoints.update(updated_graftpoints) def _remove_graftpoints(self, to_remove=[]): """Remove graftpoints :param to_remove: List of commit shas """ for sha in to_remove: del self._graftpoints[sha] def do_commit(self, message=None, committer=None, author=None, commit_timestamp=None, commit_timezone=None, author_timestamp=None, author_timezone=None, tree=None, encoding=None, ref=b'HEAD', merge_heads=None): """Create a new commit. :param message: Commit message :param committer: Committer fullname :param author: Author fullname (defaults to committer) :param commit_timestamp: Commit timestamp (defaults to now) :param commit_timezone: Commit timestamp timezone (defaults to GMT) :param author_timestamp: Author timestamp (defaults to commit timestamp) :param author_timezone: Author timestamp timezone (defaults to commit timestamp timezone) :param tree: SHA1 of the tree root to use (if not specified the current index will be committed). :param encoding: Encoding :param ref: Optional ref to commit to (defaults to current branch) :param merge_heads: Merge heads (defaults to .git/MERGE_HEADS) :return: New commit SHA1 """ import time c = Commit() if tree is None: index = self.open_index() c.tree = index.commit(self.object_store) else: if len(tree) != 40: raise ValueError("tree must be a 40-byte hex sha string") c.tree = tree try: self.hooks['pre-commit'].execute() except HookError as e: raise CommitError(e) except KeyError: # no hook defined, silent fallthrough pass if merge_heads is None: # FIXME: Read merge heads from .git/MERGE_HEADS merge_heads = [] if committer is None: # FIXME: Support GIT_COMMITTER_NAME/GIT_COMMITTER_EMAIL environment # variables committer = self._get_user_identity() c.committer = committer if commit_timestamp is None: # FIXME: Support GIT_COMMITTER_DATE environment variable commit_timestamp = time.time() c.commit_time = int(commit_timestamp) if commit_timezone is None: # FIXME: Use current user timezone rather than UTC commit_timezone = 0 c.commit_timezone = commit_timezone if author is None: # FIXME: Support GIT_AUTHOR_NAME/GIT_AUTHOR_EMAIL environment # variables author = committer c.author = author if author_timestamp is None: # FIXME: Support GIT_AUTHOR_DATE environment variable author_timestamp = commit_timestamp c.author_time = int(author_timestamp) if author_timezone is None: author_timezone = commit_timezone c.author_timezone = author_timezone if encoding is not None: c.encoding = encoding if message is None: # FIXME: Try to read commit message from .git/MERGE_MSG raise ValueError("No commit message specified") try: c.message = self.hooks['commit-msg'].execute(message) if c.message is None: c.message = message except HookError as e: raise CommitError(e) except KeyError: # no hook defined, message not modified c.message = message if ref is None: # Create a dangling commit c.parents = merge_heads self.object_store.add_object(c) else: try: old_head = self.refs[ref] c.parents = [old_head] + merge_heads self.object_store.add_object(c) ok = self.refs.set_if_equals(ref, old_head, c.id) except KeyError: c.parents = merge_heads self.object_store.add_object(c) ok = self.refs.add_if_new(ref, c.id) if not ok: # Fail if the atomic compare-and-swap failed, leaving the commit and # all its objects as garbage. raise CommitError("%s changed during commit" % (ref,)) try: self.hooks['post-commit'].execute() except HookError as e: # silent failure warnings.warn("post-commit hook failed: %s" % e, UserWarning) except KeyError: # no hook defined, silent fallthrough pass return c.id class Repo(BaseRepo): """A git repository backed by local disk. To open an existing repository, call the contructor with the path of the repository. To create a new repository, use the Repo.init class method. """ def __init__(self, root): if os.path.isdir(os.path.join(root, ".git", OBJECTDIR)): self.bare = False self._controldir = os.path.join(root, ".git") elif (os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir(os.path.join(root, REFSDIR))): self.bare = True self._controldir = root elif (os.path.isfile(os.path.join(root, ".git"))): import re with open(os.path.join(root, ".git"), 'r') as f: _, path = re.match('(gitdir: )(.+$)', f.read()).groups() self.bare = False self._controldir = os.path.join(root, path) else: raise NotGitRepository( "No git repository was found at %(path)s" % dict(path=root) ) self.path = root object_store = DiskObjectStore(os.path.join(self.controldir(), OBJECTDIR)) refs = DiskRefsContainer(self.controldir()) BaseRepo.__init__(self, object_store, refs) self._graftpoints = {} graft_file = self.get_named_file(os.path.join("info", "grafts")) if graft_file: with graft_file: self._graftpoints.update(parse_graftpoints(graft_file)) graft_file = self.get_named_file("shallow") if graft_file: with graft_file: self._graftpoints.update(parse_graftpoints(graft_file)) self.hooks['pre-commit'] = PreCommitShellHook(self.controldir()) self.hooks['commit-msg'] = CommitMsgShellHook(self.controldir()) self.hooks['post-commit'] = PostCommitShellHook(self.controldir()) @classmethod def discover(cls, start='.'): """Iterate parent directories to discover a repository Return a Repo object for the first parent directory that looks like a Git repository. :param start: The directory to start discovery from (defaults to '.') """ remaining = True path = os.path.abspath(start) while remaining: try: return cls(path) except NotGitRepository: path, remaining = os.path.split(path) raise NotGitRepository( "No git repository was found at %(path)s" % dict(path=start) ) def controldir(self): """Return the path of the control directory.""" return self._controldir def _put_named_file(self, path, contents): """Write a file to the control dir with the given name and contents. :param path: The path to the file, relative to the control dir. :param contents: A string to write to the file. """ path = path.lstrip(os.path.sep) with GitFile(os.path.join(self.controldir(), path), 'wb') as f: f.write(contents) def get_named_file(self, path): """Get a file from the control dir with a specific name. Although the filename should be interpreted as a filename relative to the control dir in a disk-based Repo, the object returned need not be pointing to a file in that location. :param path: The path to the file, relative to the control dir. :return: An open file object, or None if the file does not exist. """ # TODO(dborowitz): sanitize filenames, since this is used directly by # the dumb web serving code. path = path.lstrip(os.path.sep) try: return open(os.path.join(self.controldir(), path), 'rb') except (IOError, OSError) as e: if e.errno == errno.ENOENT: return None raise def index_path(self): """Return path to the index file.""" return os.path.join(self.controldir(), INDEX_FILENAME) def open_index(self): """Open the index for this repository. :raise NoIndexPresent: If no index is present :return: The matching `Index` """ from dulwich.index import Index if not self.has_index(): raise NoIndexPresent() return Index(self.index_path()) def has_index(self): """Check if an index is present.""" # Bare repos must never have index files; non-bare repos may have a # missing index file, which is treated as empty. return not self.bare def stage(self, fs_paths): """Stage a set of paths. :param fs_paths: List of paths, relative to the repository path """ root_path_bytes = self.path.encode(sys.getfilesystemencoding()) if not isinstance(fs_paths, list): fs_paths = [fs_paths] from dulwich.index import ( blob_from_path_and_stat, index_entry_from_stat, _fs_to_tree_path, ) index = self.open_index() for fs_path in fs_paths: if not isinstance(fs_path, bytes): fs_path = fs_path.encode(sys.getfilesystemencoding()) tree_path = _fs_to_tree_path(fs_path) full_path = os.path.join(root_path_bytes, fs_path) try: st = os.lstat(full_path) except OSError: # File no longer exists try: del index[tree_path] except KeyError: pass # already removed else: blob = blob_from_path_and_stat(full_path, st) self.object_store.add_object(blob) index[tree_path] = index_entry_from_stat(st, blob.id, 0) index.write() def clone(self, target_path, mkdir=True, bare=False, origin=b"origin"): """Clone this repository. :param target_path: Target path :param mkdir: Create the target directory :param bare: Whether to create a bare repository :param origin: Base name for refs in target repository cloned from this repository :return: Created repository as `Repo` """ if not bare: target = self.init(target_path, mkdir=mkdir) else: target = self.init_bare(target_path) self.fetch(target) target.refs.import_refs( b'refs/remotes/' + origin, self.refs.as_dict(b'refs/heads')) target.refs.import_refs( b'refs/tags', self.refs.as_dict(b'refs/tags')) try: target.refs.add_if_new( b'refs/heads/master', self.refs[b'refs/heads/master']) except KeyError: pass # Update target head head, head_sha = self.refs._follow(b'HEAD') if head is not None and head_sha is not None: target.refs.set_symbolic_ref(b'HEAD', head) target[b'HEAD'] = head_sha if not bare: # Checkout HEAD to target dir target.reset_index() return target def reset_index(self, tree=None): """Reset the index back to a specific tree. :param tree: Tree SHA to reset to, None for current HEAD tree. """ from dulwich.index import ( build_index_from_tree, validate_path_element_default, validate_path_element_ntfs, ) if tree is None: tree = self[b'HEAD'].tree config = self.get_config() honor_filemode = config.get_boolean('core', 'filemode', os.name != "nt") if config.get_boolean('core', 'core.protectNTFS', os.name == "nt"): validate_path_element = validate_path_element_ntfs else: validate_path_element = validate_path_element_default return build_index_from_tree(self.path, self.index_path(), self.object_store, tree, honor_filemode=honor_filemode, validate_path_element=validate_path_element) def get_config(self): """Retrieve the config object. :return: `ConfigFile` object for the ``.git/config`` file. """ from dulwich.config import ConfigFile path = os.path.join(self._controldir, 'config') try: return ConfigFile.from_path(path) except (IOError, OSError) as e: if e.errno != errno.ENOENT: raise ret = ConfigFile() ret.path = path return ret def get_description(self): """Retrieve the description of this repository. :return: A string describing the repository or None. """ path = os.path.join(self._controldir, 'description') try: with GitFile(path, 'rb') as f: return f.read() except (IOError, OSError) as e: if e.errno != errno.ENOENT: raise return None def __repr__(self): return "" % self.path def set_description(self, description): """Set the description for this repository. :param description: Text to set as description for this repository. """ self._put_named_file('description', description) @classmethod def _init_maybe_bare(cls, path, bare): for d in BASE_DIRECTORIES: os.mkdir(os.path.join(path, *d)) DiskObjectStore.init(os.path.join(path, OBJECTDIR)) ret = cls(path) ret.refs.set_symbolic_ref(b'HEAD', b"refs/heads/master") ret._init_files(bare) return ret @classmethod def init(cls, path, mkdir=False): """Create a new repository. :param path: Path in which to create the repository :param mkdir: Whether to create the directory :return: `Repo` instance """ if mkdir: os.mkdir(path) controldir = os.path.join(path, ".git") os.mkdir(controldir) cls._init_maybe_bare(controldir, False) return cls(path) @classmethod def init_bare(cls, path): """Create a new bare repository. ``path`` should already exist and be an emty directory. :param path: Path to create bare repository in :return: a `Repo` instance """ return cls._init_maybe_bare(path, True) create = init_bare def close(self): """Close any files opened by this repository.""" self.object_store.close() class MemoryRepo(BaseRepo): """Repo that stores refs, objects, and named files in memory. MemoryRepos are always bare: they have no working tree and no index, since those have a stronger dependency on the filesystem. """ def __init__(self): from dulwich.config import ConfigFile BaseRepo.__init__(self, MemoryObjectStore(), DictRefsContainer({})) self._named_files = {} self.bare = True self._config = ConfigFile() def _put_named_file(self, path, contents): """Write a file to the control dir with the given name and contents. :param path: The path to the file, relative to the control dir. :param contents: A string to write to the file. """ self._named_files[path] = contents def get_named_file(self, path): """Get a file from the control dir with a specific name. Although the filename should be interpreted as a filename relative to the control dir in a disk-baked Repo, the object returned need not be pointing to a file in that location. :param path: The path to the file, relative to the control dir. :return: An open file object, or None if the file does not exist. """ contents = self._named_files.get(path, None) if contents is None: return None return BytesIO(contents) def open_index(self): """Fail to open index for this repo, since it is bare. :raise NoIndexPresent: Raised when no index is present """ raise NoIndexPresent() def get_config(self): """Retrieve the config object. :return: `ConfigFile` object. """ return self._config def get_description(self): """Retrieve the repository description. This defaults to None, for no description. """ return None @classmethod def init_bare(cls, objects, refs): """Create a new bare repository in memory. :param objects: Objects for the new repository, as iterable :param refs: Refs as dictionary, mapping names to object SHA1s """ ret = cls() for obj in objects: ret.object_store.add_object(obj) for refname, sha in refs.items(): ret.refs[refname] = sha ret._init_files(bare=True) return ret diff --git a/dulwich/tests/test_porcelain.py b/dulwich/tests/test_porcelain.py index fe1bc6e5..6ddda01d 100644 --- a/dulwich/tests/test_porcelain.py +++ b/dulwich/tests/test_porcelain.py @@ -1,735 +1,738 @@ # test_porcelain.py -- porcelain tests # Copyright (C) 2013 Jelmer Vernooij # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; version 2 # of the License or (at your option) a later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, # MA 02110-1301, USA. """Tests for dulwich.porcelain.""" from contextlib import closing from io import BytesIO import os import shutil import tarfile import tempfile from dulwich import porcelain from dulwich.diff_tree import tree_changes from dulwich.objects import ( Blob, Tag, Tree, ) from dulwich.repo import Repo from dulwich.tests import ( TestCase, ) from dulwich.tests.compat.utils import require_git_version from dulwich.tests.utils import ( build_commit_graph, make_object, ) class PorcelainTestCase(TestCase): def setUp(self): super(PorcelainTestCase, self).setUp() repo_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, repo_dir) self.repo = Repo.init(repo_dir) def tearDown(self): super(PorcelainTestCase, self).tearDown() self.repo.close() class ArchiveTests(PorcelainTestCase): """Tests for the archive command.""" def test_simple(self): # TODO(jelmer): Remove this once dulwich has its own implementation of archive. require_git_version((1, 5, 0)) c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"refs/heads/master"] = c3.id out = BytesIO() err = BytesIO() porcelain.archive(self.repo.path, b"refs/heads/master", outstream=out, errstream=err) self.assertEqual(b"", err.getvalue()) tf = tarfile.TarFile(fileobj=out) self.addCleanup(tf.close) self.assertEqual([], tf.getnames()) class UpdateServerInfoTests(PorcelainTestCase): def test_simple(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"refs/heads/foo"] = c3.id porcelain.update_server_info(self.repo.path) self.assertTrue(os.path.exists(os.path.join(self.repo.controldir(), 'info', 'refs'))) class CommitTests(PorcelainTestCase): def test_custom_author(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"refs/heads/foo"] = c3.id sha = porcelain.commit(self.repo.path, message=b"Some message", author=b"Joe ", committer=b"Bob ") self.assertTrue(isinstance(sha, bytes)) self.assertEqual(len(sha), 40) class CloneTests(PorcelainTestCase): def test_simple_local(self): f1_1 = make_object(Blob, data=b'f1') commit_spec = [[1], [2, 1], [3, 1, 2]] trees = {1: [(b'f1', f1_1), (b'f2', f1_1)], 2: [(b'f1', f1_1), (b'f2', f1_1)], 3: [(b'f1', f1_1), (b'f2', f1_1)], } c1, c2, c3 = build_commit_graph(self.repo.object_store, commit_spec, trees) self.repo.refs[b"refs/heads/master"] = c3.id target_path = tempfile.mkdtemp() errstream = BytesIO() self.addCleanup(shutil.rmtree, target_path) r = porcelain.clone(self.repo.path, target_path, checkout=False, errstream=errstream) self.assertEqual(r.path, target_path) self.assertEqual(Repo(target_path).head(), c3.id) self.assertTrue(b'f1' not in os.listdir(target_path)) self.assertTrue(b'f2' not in os.listdir(target_path)) def test_simple_local_with_checkout(self): f1_1 = make_object(Blob, data=b'f1') commit_spec = [[1], [2, 1], [3, 1, 2]] trees = {1: [(b'f1', f1_1), (b'f2', f1_1)], 2: [(b'f1', f1_1), (b'f2', f1_1)], 3: [(b'f1', f1_1), (b'f2', f1_1)], } c1, c2, c3 = build_commit_graph(self.repo.object_store, commit_spec, trees) self.repo.refs[b"refs/heads/master"] = c3.id target_path = tempfile.mkdtemp() errstream = BytesIO() self.addCleanup(shutil.rmtree, target_path) with closing(porcelain.clone(self.repo.path, target_path, checkout=True, errstream=errstream)) as r: self.assertEqual(r.path, target_path) with closing(Repo(target_path)) as r: self.assertEqual(r.head(), c3.id) self.assertTrue('f1' in os.listdir(target_path)) self.assertTrue('f2' in os.listdir(target_path)) def test_bare_local_with_checkout(self): f1_1 = make_object(Blob, data=b'f1') commit_spec = [[1], [2, 1], [3, 1, 2]] trees = {1: [(b'f1', f1_1), (b'f2', f1_1)], 2: [(b'f1', f1_1), (b'f2', f1_1)], 3: [(b'f1', f1_1), (b'f2', f1_1)], } c1, c2, c3 = build_commit_graph(self.repo.object_store, commit_spec, trees) self.repo.refs[b"refs/heads/master"] = c3.id target_path = tempfile.mkdtemp() errstream = BytesIO() self.addCleanup(shutil.rmtree, target_path) r = porcelain.clone(self.repo.path, target_path, bare=True, errstream=errstream) self.assertEqual(r.path, target_path) self.assertEqual(Repo(target_path).head(), c3.id) self.assertFalse(b'f1' in os.listdir(target_path)) self.assertFalse(b'f2' in os.listdir(target_path)) def test_no_checkout_with_bare(self): f1_1 = make_object(Blob, data=b'f1') commit_spec = [[1]] trees = {1: [(b'f1', f1_1), (b'f2', f1_1)]} (c1, ) = build_commit_graph(self.repo.object_store, commit_spec, trees) self.repo.refs[b"refs/heads/master"] = c1.id target_path = tempfile.mkdtemp() errstream = BytesIO() self.addCleanup(shutil.rmtree, target_path) self.assertRaises(ValueError, porcelain.clone, self.repo.path, target_path, checkout=True, bare=True, errstream=errstream) class InitTests(TestCase): def test_non_bare(self): repo_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, repo_dir) porcelain.init(repo_dir) def test_bare(self): repo_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, repo_dir) porcelain.init(repo_dir, bare=True) class AddTests(PorcelainTestCase): def test_add_default_paths(self): # create a file for initial commit with open(os.path.join(self.repo.path, 'blah'), 'w') as f: f.write("\n") porcelain.add(repo=self.repo.path, paths=['blah']) porcelain.commit(repo=self.repo.path, message=b'test', author=b'test', committer=b'test') # Add a second test file and a file in a directory with open(os.path.join(self.repo.path, 'foo'), 'w') as f: f.write("\n") os.mkdir(os.path.join(self.repo.path, 'adir')) with open(os.path.join(self.repo.path, 'adir', 'afile'), 'w') as f: f.write("\n") porcelain.add(self.repo.path) # Check that foo was added and nothing in .git was modified index = self.repo.open_index() self.assertEqual(sorted(index), [b'adir/afile', b'blah', b'foo']) def test_add_file(self): with open(os.path.join(self.repo.path, 'foo'), 'w') as f: f.write("BAR") porcelain.add(self.repo.path, paths=["foo"]) class RemoveTests(PorcelainTestCase): def test_remove_file(self): with open(os.path.join(self.repo.path, 'foo'), 'w') as f: f.write("BAR") porcelain.add(self.repo.path, paths=["foo"]) porcelain.rm(self.repo.path, paths=["foo"]) class LogTests(PorcelainTestCase): def test_simple(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id outstream = BytesIO() porcelain.log(self.repo.path, outstream=outstream) self.assertEqual(3, outstream.getvalue().count(b"-" * 50)) def test_max_entries(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id outstream = BytesIO() porcelain.log(self.repo.path, outstream=outstream, max_entries=1) self.assertEqual(1, outstream.getvalue().count(b"-" * 50)) class ShowTests(PorcelainTestCase): def test_nolist(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id outstream = BytesIO() porcelain.show(self.repo.path, objects=c3.id, outstream=outstream) self.assertTrue(outstream.getvalue().startswith(b"-" * 50)) def test_simple(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id outstream = BytesIO() porcelain.show(self.repo.path, objects=[c3.id], outstream=outstream) self.assertTrue(outstream.getvalue().startswith(b"-" * 50)) def test_blob(self): b = Blob.from_string(b"The Foo\n") self.repo.object_store.add_object(b) outstream = BytesIO() porcelain.show(self.repo.path, objects=[b.id], outstream=outstream) self.assertEqual(outstream.getvalue(), b"The Foo\n") class SymbolicRefTests(PorcelainTestCase): def test_set_wrong_symbolic_ref(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id self.assertRaises(ValueError, porcelain.symbolic_ref, self.repo.path, b'foobar') def test_set_force_wrong_symbolic_ref(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id porcelain.symbolic_ref(self.repo.path, b'force_foobar', force=True) #test if we actually changed the file with self.repo.get_named_file('HEAD') as f: new_ref = f.read() self.assertEqual(new_ref, b'ref: refs/heads/force_foobar\n') def test_set_symbolic_ref(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id porcelain.symbolic_ref(self.repo.path, b'master') def test_set_symbolic_ref_other_than_master(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]], attrs=dict(refs='develop')) self.repo.refs[b"HEAD"] = c3.id self.repo.refs[b"refs/heads/develop"] = c3.id porcelain.symbolic_ref(self.repo.path, b'develop') #test if we actually changed the file with self.repo.get_named_file('HEAD') as f: new_ref = f.read() self.assertEqual(new_ref, b'ref: refs/heads/develop\n') class DiffTreeTests(PorcelainTestCase): def test_empty(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id outstream = BytesIO() porcelain.diff_tree(self.repo.path, c2.tree, c3.tree, outstream=outstream) self.assertEqual(outstream.getvalue(), b"") class CommitTreeTests(PorcelainTestCase): def test_simple(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) b = Blob() b.data = b"foo the bar" t = Tree() t.add(b"somename", 0o100644, b.id) self.repo.object_store.add_object(t) self.repo.object_store.add_object(b) sha = porcelain.commit_tree( self.repo.path, t.id, message=b"Withcommit.", author=b"Joe ", committer=b"Jane ") self.assertTrue(isinstance(sha, bytes)) self.assertEqual(len(sha), 40) class RevListTests(PorcelainTestCase): def test_simple(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) outstream = BytesIO() porcelain.rev_list( self.repo.path, [c3.id], outstream=outstream) self.assertEqual( c3.id + b"\n" + c2.id + b"\n" + c1.id + b"\n", outstream.getvalue()) class TagCreateTests(PorcelainTestCase): def test_annotated(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id porcelain.tag_create(self.repo.path, b"tryme", b'foo ', b'bar', annotated=True) tags = self.repo.refs.as_dict(b"refs/tags") self.assertEqual(list(tags.keys()), [b"tryme"]) tag = self.repo[b'refs/tags/tryme'] self.assertTrue(isinstance(tag, Tag)) self.assertEqual(b"foo ", tag.tagger) self.assertEqual(b"bar", tag.message) def test_unannotated(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id porcelain.tag_create(self.repo.path, b"tryme", annotated=False) tags = self.repo.refs.as_dict(b"refs/tags") self.assertEqual(list(tags.keys()), [b"tryme"]) self.repo[b'refs/tags/tryme'] self.assertEqual(list(tags.values()), [self.repo.head()]) class TagListTests(PorcelainTestCase): def test_empty(self): tags = porcelain.tag_list(self.repo.path) self.assertEqual([], tags) def test_simple(self): self.repo.refs[b"refs/tags/foo"] = b"aa" * 20 self.repo.refs[b"refs/tags/bar/bla"] = b"bb" * 20 tags = porcelain.tag_list(self.repo.path) self.assertEqual([b"bar/bla", b"foo"], tags) class TagDeleteTests(PorcelainTestCase): def test_simple(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo[b"HEAD"] = c1.id porcelain.tag_create(self.repo, b'foo') self.assertTrue(b"foo" in porcelain.tag_list(self.repo)) porcelain.tag_delete(self.repo, b'foo') self.assertFalse(b"foo" in porcelain.tag_list(self.repo)) class ResetTests(PorcelainTestCase): def test_hard_head(self): with open(os.path.join(self.repo.path, 'foo'), 'w') as f: f.write("BAR") porcelain.add(self.repo.path, paths=["foo"]) porcelain.commit(self.repo.path, message=b"Some message", committer=b"Jane ", author=b"John ") with open(os.path.join(self.repo.path, 'foo'), 'wb') as f: f.write(b"OOH") porcelain.reset(self.repo, "hard", b"HEAD") index = self.repo.open_index() changes = list(tree_changes(self.repo, index.commit(self.repo.object_store), self.repo[b'HEAD'].tree)) self.assertEqual([], changes) class PushTests(PorcelainTestCase): def test_simple(self): """ Basic test of porcelain push where self.repo is the remote. First clone the remote, commit a file to the clone, then push the changes back to the remote. """ outstream = BytesIO() errstream = BytesIO() porcelain.commit(repo=self.repo.path, message=b'init', author=b'', committer=b'') # Setup target repo cloned from temp test repo clone_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, clone_path) target_repo = porcelain.clone(self.repo.path, target=clone_path, errstream=errstream) target_repo.close() # create a second file to be pushed back to origin handle, fullpath = tempfile.mkstemp(dir=clone_path) os.close(handle) porcelain.add(repo=clone_path, paths=[os.path.basename(fullpath)]) porcelain.commit(repo=clone_path, message=b'push', author=b'', committer=b'') # Setup a non-checked out branch in the remote refs_path = b"refs/heads/foo" self.repo.refs[refs_path] = self.repo[b'HEAD'].id # Push to the remote porcelain.push(clone_path, self.repo.path, b"HEAD:" + refs_path, outstream=outstream, errstream=errstream) # Check that the target and source with closing(Repo(clone_path)) as r_clone: self.assertEqual(r_clone[b'HEAD'].id, self.repo[refs_path].id) # Get the change in the target repo corresponding to the add # this will be in the foo branch. change = list(tree_changes(self.repo, self.repo[b'HEAD'].tree, self.repo[b'refs/heads/foo'].tree))[0] self.assertEqual(os.path.basename(fullpath), change.new.path.decode('ascii')) class PullTests(PorcelainTestCase): def test_simple(self): outstream = BytesIO() errstream = BytesIO() # create a file for initial commit handle, fullpath = tempfile.mkstemp(dir=self.repo.path) os.close(handle) filename = os.path.basename(fullpath) porcelain.add(repo=self.repo.path, paths=filename) porcelain.commit(repo=self.repo.path, message=b'test', author=b'test', committer=b'test') # Setup target repo target_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, target_path) target_repo = porcelain.clone(self.repo.path, target=target_path, errstream=errstream) target_repo.close() # create a second file to be pushed handle, fullpath = tempfile.mkstemp(dir=self.repo.path) os.close(handle) filename = os.path.basename(fullpath) porcelain.add(repo=self.repo.path, paths=filename) porcelain.commit(repo=self.repo.path, message=b'test2', author=b'test2', committer=b'test2') + self.assertTrue(b'refs/heads/master' in self.repo.refs) + self.assertTrue(b'refs/heads/master' in target_repo.refs) + # Pull changes into the cloned repo porcelain.pull(target_path, self.repo.path, b'refs/heads/master', outstream=outstream, errstream=errstream) # Check the target repo for pushed changes with closing(Repo(target_path)) as r: self.assertEqual(r[b'HEAD'].id, self.repo[b'HEAD'].id) class StatusTests(PorcelainTestCase): def test_status(self): """Integration test for `status` functionality.""" # Commit a dummy file then modify it fullpath = os.path.join(self.repo.path, 'foo') with open(fullpath, 'w') as f: f.write('origstuff') porcelain.add(repo=self.repo.path, paths=['foo']) porcelain.commit(repo=self.repo.path, message=b'test status', author=b'', committer=b'') # modify access and modify time of path os.utime(fullpath, (0, 0)) with open(fullpath, 'wb') as f: f.write(b'stuff') # Make a dummy file and stage it filename_add = 'bar' fullpath = os.path.join(self.repo.path, filename_add) with open(fullpath, 'w') as f: f.write('stuff') porcelain.add(repo=self.repo.path, paths=filename_add) results = porcelain.status(self.repo) self.assertEqual(results.staged['add'][0], filename_add.encode('ascii')) self.assertEqual(results.unstaged, [b'foo']) def test_get_tree_changes_add(self): """Unit test for get_tree_changes add.""" # Make a dummy file, stage filename = 'bar' with open(os.path.join(self.repo.path, filename), 'w') as f: f.write('stuff') porcelain.add(repo=self.repo.path, paths=filename) porcelain.commit(repo=self.repo.path, message=b'test status', author=b'', committer=b'') filename = 'foo' with open(os.path.join(self.repo.path, filename), 'w') as f: f.write('stuff') porcelain.add(repo=self.repo.path, paths=filename) changes = porcelain.get_tree_changes(self.repo.path) self.assertEqual(changes['add'][0], filename.encode('ascii')) self.assertEqual(len(changes['add']), 1) self.assertEqual(len(changes['modify']), 0) self.assertEqual(len(changes['delete']), 0) def test_get_tree_changes_modify(self): """Unit test for get_tree_changes modify.""" # Make a dummy file, stage, commit, modify filename = 'foo' fullpath = os.path.join(self.repo.path, filename) with open(fullpath, 'w') as f: f.write('stuff') porcelain.add(repo=self.repo.path, paths=filename) porcelain.commit(repo=self.repo.path, message=b'test status', author=b'', committer=b'') with open(fullpath, 'w') as f: f.write('otherstuff') porcelain.add(repo=self.repo.path, paths=filename) changes = porcelain.get_tree_changes(self.repo.path) self.assertEqual(changes['modify'][0], filename.encode('ascii')) self.assertEqual(len(changes['add']), 0) self.assertEqual(len(changes['modify']), 1) self.assertEqual(len(changes['delete']), 0) def test_get_tree_changes_delete(self): """Unit test for get_tree_changes delete.""" # Make a dummy file, stage, commit, remove filename = 'foo' with open(os.path.join(self.repo.path, filename), 'w') as f: f.write('stuff') porcelain.add(repo=self.repo.path, paths=filename) porcelain.commit(repo=self.repo.path, message=b'test status', author=b'', committer=b'') porcelain.rm(repo=self.repo.path, paths=[filename]) changes = porcelain.get_tree_changes(self.repo.path) self.assertEqual(changes['delete'][0], filename.encode('ascii')) self.assertEqual(len(changes['add']), 0) self.assertEqual(len(changes['modify']), 0) self.assertEqual(len(changes['delete']), 1) # TODO(jelmer): Add test for dulwich.porcelain.daemon class UploadPackTests(PorcelainTestCase): """Tests for upload_pack.""" def test_upload_pack(self): outf = BytesIO() exitcode = porcelain.upload_pack(self.repo.path, BytesIO(b"0000"), outf) outlines = outf.getvalue().splitlines() self.assertEqual([b"0000"], outlines) self.assertEqual(0, exitcode) class ReceivePackTests(PorcelainTestCase): """Tests for receive_pack.""" def test_receive_pack(self): filename = 'foo' with open(os.path.join(self.repo.path, filename), 'w') as f: f.write('stuff') porcelain.add(repo=self.repo.path, paths=filename) self.repo.do_commit(message=b'test status', author=b'', committer=b'', author_timestamp=1402354300, commit_timestamp=1402354300, author_timezone=0, commit_timezone=0) outf = BytesIO() exitcode = porcelain.receive_pack(self.repo.path, BytesIO(b"0000"), outf) outlines = outf.getvalue().splitlines() self.assertEqual([ b'006d9e65bdcf4a22cdd4f3700604a275cd2aaf146b23 HEAD\x00 report-status ' b'delete-refs ofs-delta side-band-64k no-done', b'003f9e65bdcf4a22cdd4f3700604a275cd2aaf146b23 refs/heads/master', b'0000'], outlines) self.assertEqual(0, exitcode) class BranchListTests(PorcelainTestCase): def test_standard(self): self.assertEqual(set([]), set(porcelain.branch_list(self.repo))) def test_new_branch(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo[b"HEAD"] = c1.id porcelain.branch_create(self.repo, b"foo") self.assertEqual( set([b"master", b"foo"]), set(porcelain.branch_list(self.repo))) class BranchCreateTests(PorcelainTestCase): def test_branch_exists(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo[b"HEAD"] = c1.id porcelain.branch_create(self.repo, b"foo") self.assertRaises(KeyError, porcelain.branch_create, self.repo, b"foo") porcelain.branch_create(self.repo, b"foo", force=True) def test_new_branch(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo[b"HEAD"] = c1.id porcelain.branch_create(self.repo, b"foo") self.assertEqual( set([b"master", b"foo"]), set(porcelain.branch_list(self.repo))) class BranchDeleteTests(PorcelainTestCase): def test_simple(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo[b"HEAD"] = c1.id porcelain.branch_create(self.repo, b'foo') self.assertTrue(b"foo" in porcelain.branch_list(self.repo)) porcelain.branch_delete(self.repo, b'foo') self.assertFalse(b"foo" in porcelain.branch_list(self.repo)) class FetchTests(PorcelainTestCase): def test_simple(self): outstream = BytesIO() errstream = BytesIO() # create a file for initial commit handle, fullpath = tempfile.mkstemp(dir=self.repo.path) os.close(handle) filename = os.path.basename(fullpath) porcelain.add(repo=self.repo.path, paths=filename) porcelain.commit(repo=self.repo.path, message=b'test', author=b'test', committer=b'test') # Setup target repo target_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, target_path) target_repo = porcelain.clone(self.repo.path, target=target_path, errstream=errstream) # create a second file to be pushed handle, fullpath = tempfile.mkstemp(dir=self.repo.path) os.close(handle) filename = os.path.basename(fullpath) porcelain.add(repo=self.repo.path, paths=filename) porcelain.commit(repo=self.repo.path, message=b'test2', author=b'test2', committer=b'test2') self.assertFalse(self.repo[b'HEAD'].id in target_repo) target_repo.close() # Fetch changes into the cloned repo porcelain.fetch(target_path, self.repo.path, outstream=outstream, errstream=errstream) # Check the target repo for pushed changes with closing(Repo(target_path)) as r: self.assertTrue(self.repo[b'HEAD'].id in r)