diff --git a/dulwich/objects.py b/dulwich/objects.py index 07acf7da..8bdb4017 100644 --- a/dulwich/objects.py +++ b/dulwich/objects.py @@ -1,1520 +1,1559 @@ # objects.py -- Access to base git objects # Copyright (C) 2007 James Westby # Copyright (C) 2008-2013 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Access to base git objects.""" import binascii from io import BytesIO from collections import namedtuple import os import posixpath import stat from typing import ( Optional, Dict, + Iterable, Union, Type, ) import warnings import zlib from hashlib import sha1 from dulwich.errors import ( ChecksumMismatch, NotBlobError, NotCommitError, NotTagError, NotTreeError, ObjectFormatException, FileFormatException, ) from dulwich.file import GitFile ZERO_SHA = b"0" * 40 # Header fields for commits _TREE_HEADER = b"tree" _PARENT_HEADER = b"parent" _AUTHOR_HEADER = b"author" _COMMITTER_HEADER = b"committer" _ENCODING_HEADER = b"encoding" _MERGETAG_HEADER = b"mergetag" _GPGSIG_HEADER = b"gpgsig" # Header fields for objects _OBJECT_HEADER = b"object" _TYPE_HEADER = b"type" _TAG_HEADER = b"tag" _TAGGER_HEADER = b"tagger" S_IFGITLINK = 0o160000 MAX_TIME = 9223372036854775807 # (2**63) - 1 - signed long int max BEGIN_PGP_SIGNATURE = b"-----BEGIN PGP SIGNATURE-----" class EmptyFileException(FileFormatException): """An unexpectedly empty file was encountered.""" def S_ISGITLINK(m): """Check if a mode indicates a submodule. Args: m: Mode to check Returns: a ``boolean`` """ return stat.S_IFMT(m) == S_IFGITLINK def _decompress(string): dcomp = zlib.decompressobj() dcomped = dcomp.decompress(string) dcomped += dcomp.flush() return dcomped def sha_to_hex(sha): """Takes a string and returns the hex of the sha within""" hexsha = binascii.hexlify(sha) assert len(hexsha) == 40, "Incorrect length of sha1 string: %d" % hexsha return hexsha def hex_to_sha(hex): """Takes a hex sha and returns a binary sha""" assert len(hex) == 40, "Incorrect length of hexsha: %s" % hex try: return binascii.unhexlify(hex) except TypeError as exc: if not isinstance(hex, bytes): raise raise ValueError(exc.args[0]) def valid_hexsha(hex): if len(hex) != 40: return False try: binascii.unhexlify(hex) except (TypeError, binascii.Error): return False else: return True def hex_to_filename(path, hex): """Takes a hex sha and returns its filename relative to the given path.""" # os.path.join accepts bytes or unicode, but all args must be of the same # type. Make sure that hex which is expected to be bytes, is the same type # as path. if getattr(path, "encode", None) is not None: hex = hex.decode("ascii") dir = hex[:2] file = hex[2:] # Check from object dir return os.path.join(path, dir, file) def filename_to_hex(filename): """Takes an object filename and returns its corresponding hex sha.""" # grab the last (up to) two path components names = filename.rsplit(os.path.sep, 2)[-2:] errmsg = "Invalid object filename: %s" % filename assert len(names) == 2, errmsg base, rest = names assert len(base) == 2 and len(rest) == 38, errmsg hex = (base + rest).encode("ascii") hex_to_sha(hex) return hex def object_header(num_type: int, length: int) -> bytes: """Return an object header for the given numeric type and text length.""" return object_class(num_type).type_name + b" " + str(length).encode("ascii") + b"\0" def serializable_property(name: str, docstring: Optional[str] = None): """A property that helps tracking whether serialization is necessary.""" def set(obj, value): setattr(obj, "_" + name, value) obj._needs_serialization = True def get(obj): return getattr(obj, "_" + name) return property(get, set, doc=docstring) def object_class(type): """Get the object class corresponding to the given type. Args: type: Either a type name string or a numeric type. Returns: The ShaFile subclass corresponding to the given type, or None if type is not a valid type name/number. """ return _TYPE_MAP.get(type, None) def check_hexsha(hex, error_msg): """Check if a string is a valid hex sha string. Args: hex: Hex string to check error_msg: Error message to use in exception Raises: ObjectFormatException: Raised when the string is not valid """ if not valid_hexsha(hex): raise ObjectFormatException("%s %s" % (error_msg, hex)) def check_identity(identity, error_msg): """Check if the specified identity is valid. This will raise an exception if the identity is not valid. Args: identity: Identity string error_msg: Error message to use in exception """ email_start = identity.find(b"<") email_end = identity.find(b">") if ( email_start < 0 or email_end < 0 or email_end <= email_start or identity.find(b"<", email_start + 1) >= 0 or identity.find(b">", email_end + 1) >= 0 or not identity.endswith(b">") ): raise ObjectFormatException(error_msg) def check_time(time_seconds): """Check if the specified time is not prone to overflow error. This will raise an exception if the time is not valid. Args: time_info: author/committer/tagger info """ # Prevent overflow error if time_seconds > MAX_TIME: raise ObjectFormatException("Date field should not exceed %s" % MAX_TIME) def git_line(*items): """Formats items into a space separated line.""" return b" ".join(items) + b"\n" class FixedSha(object): """SHA object that behaves like hashlib's but is given a fixed value.""" __slots__ = ("_hexsha", "_sha") def __init__(self, hexsha): if getattr(hexsha, "encode", None) is not None: hexsha = hexsha.encode("ascii") if not isinstance(hexsha, bytes): raise TypeError("Expected bytes for hexsha, got %r" % hexsha) self._hexsha = hexsha self._sha = hex_to_sha(hexsha) def digest(self): """Return the raw SHA digest.""" return self._sha def hexdigest(self): """Return the hex SHA digest.""" return self._hexsha.decode("ascii") class ShaFile(object): """A git SHA file.""" __slots__ = ("_chunked_text", "_sha", "_needs_serialization") type_name = None # type: bytes type_num = None # type: int @staticmethod def _parse_legacy_object_header(magic, f): """Parse a legacy object, creating it but not reading the file.""" bufsize = 1024 decomp = zlib.decompressobj() header = decomp.decompress(magic) start = 0 end = -1 while end < 0: extra = f.read(bufsize) header += decomp.decompress(extra) magic += extra end = header.find(b"\0", start) start = len(header) header = header[:end] type_name, size = header.split(b" ", 1) try: int(size) # sanity check except ValueError as e: raise ObjectFormatException("Object size not an integer: %s" % e) obj_class = object_class(type_name) if not obj_class: raise ObjectFormatException("Not a known type: %s" % type_name) return obj_class() def _parse_legacy_object(self, map): """Parse a legacy object, setting the raw string.""" text = _decompress(map) header_end = text.find(b"\0") if header_end < 0: raise ObjectFormatException("Invalid object header, no \\0") self.set_raw_string(text[header_end + 1 :]) def as_legacy_object_chunks(self, compression_level=-1): """Return chunks representing the object in the experimental format. Returns: List of strings """ compobj = zlib.compressobj(compression_level) yield compobj.compress(self._header()) for chunk in self.as_raw_chunks(): yield compobj.compress(chunk) yield compobj.flush() def as_legacy_object(self, compression_level=-1): """Return string representing the object in the experimental format.""" return b"".join( self.as_legacy_object_chunks(compression_level=compression_level) ) def as_raw_chunks(self): """Return chunks with serialization of the object. Returns: List of strings, not necessarily one per line """ if self._needs_serialization: self._sha = None self._chunked_text = self._serialize() self._needs_serialization = False return self._chunked_text def as_raw_string(self): """Return raw string with serialization of the object. Returns: String object """ return b"".join(self.as_raw_chunks()) def __bytes__(self): """Return raw string serialization of this object.""" return self.as_raw_string() def __hash__(self): """Return unique hash for this object.""" return hash(self.id) def as_pretty_string(self): """Return a string representing this object, fit for display.""" return self.as_raw_string() def set_raw_string(self, text, sha=None): """Set the contents of this object from a serialized string.""" if not isinstance(text, bytes): raise TypeError("Expected bytes for text, got %r" % text) self.set_raw_chunks([text], sha) def set_raw_chunks(self, chunks, sha=None): """Set the contents of this object from a list of chunks.""" self._chunked_text = chunks self._deserialize(chunks) if sha is None: self._sha = None else: self._sha = FixedSha(sha) self._needs_serialization = False @staticmethod def _parse_object_header(magic, f): """Parse a new style object, creating it but not reading the file.""" num_type = (ord(magic[0:1]) >> 4) & 7 obj_class = object_class(num_type) if not obj_class: raise ObjectFormatException("Not a known type %d" % num_type) return obj_class() def _parse_object(self, map): """Parse a new style object, setting self._text.""" # skip type and size; type must have already been determined, and # we trust zlib to fail if it's otherwise corrupted byte = ord(map[0:1]) used = 1 while (byte & 0x80) != 0: byte = ord(map[used : used + 1]) used += 1 raw = map[used:] self.set_raw_string(_decompress(raw)) @classmethod def _is_legacy_object(cls, magic): b0 = ord(magic[0:1]) b1 = ord(magic[1:2]) word = (b0 << 8) + b1 return (b0 & 0x8F) == 0x08 and (word % 31) == 0 @classmethod def _parse_file(cls, f): map = f.read() if not map: raise EmptyFileException("Corrupted empty file detected") if cls._is_legacy_object(map): obj = cls._parse_legacy_object_header(map, f) obj._parse_legacy_object(map) else: obj = cls._parse_object_header(map, f) obj._parse_object(map) return obj def __init__(self): """Don't call this directly""" self._sha = None self._chunked_text = [] self._needs_serialization = True def _deserialize(self, chunks): raise NotImplementedError(self._deserialize) def _serialize(self): raise NotImplementedError(self._serialize) @classmethod def from_path(cls, path): """Open a SHA file from disk.""" with GitFile(path, "rb") as f: return cls.from_file(f) @classmethod def from_file(cls, f): """Get the contents of a SHA file on disk.""" try: obj = cls._parse_file(f) obj._sha = None return obj except (IndexError, ValueError): raise ObjectFormatException("invalid object header") @staticmethod def from_raw_string(type_num, string, sha=None): """Creates an object of the indicated type from the raw string given. Args: type_num: The numeric type of the object. string: The raw uncompressed contents. sha: Optional known sha for the object """ obj = object_class(type_num)() obj.set_raw_string(string, sha) return obj @staticmethod def from_raw_chunks(type_num, chunks, sha=None): """Creates an object of the indicated type from the raw chunks given. Args: type_num: The numeric type of the object. chunks: An iterable of the raw uncompressed contents. sha: Optional known sha for the object """ obj = object_class(type_num)() obj.set_raw_chunks(chunks, sha) return obj @classmethod def from_string(cls, string): """Create a ShaFile from a string.""" obj = cls() obj.set_raw_string(string) return obj def _check_has_member(self, member, error_msg): """Check that the object has a given member variable. Args: member: the member variable to check for error_msg: the message for an error if the member is missing Raises: ObjectFormatException: with the given error_msg if member is missing or is None """ if getattr(self, member, None) is None: raise ObjectFormatException(error_msg) def check(self): """Check this object for internal consistency. Raises: ObjectFormatException: if the object is malformed in some way ChecksumMismatch: if the object was created with a SHA that does not match its contents """ # TODO: if we find that error-checking during object parsing is a # performance bottleneck, those checks should be moved to the class's # check() method during optimization so we can still check the object # when necessary. old_sha = self.id try: self._deserialize(self.as_raw_chunks()) self._sha = None new_sha = self.id except Exception as e: raise ObjectFormatException(e) if old_sha != new_sha: raise ChecksumMismatch(new_sha, old_sha) def _header(self): return object_header(self.type, self.raw_length()) def raw_length(self): """Returns the length of the raw string of this object.""" ret = 0 for chunk in self.as_raw_chunks(): ret += len(chunk) return ret def sha(self): """The SHA1 object that is the name of this object.""" if self._sha is None or self._needs_serialization: # this is a local because as_raw_chunks() overwrites self._sha new_sha = sha1() new_sha.update(self._header()) for chunk in self.as_raw_chunks(): new_sha.update(chunk) self._sha = new_sha return self._sha def copy(self): """Create a new copy of this SHA1 object from its raw string""" obj_class = object_class(self.get_type()) return obj_class.from_raw_string(self.get_type(), self.as_raw_string(), self.id) @property def id(self): """The hex SHA of this object.""" return self.sha().hexdigest().encode("ascii") def get_type(self): """Return the type number for this object class.""" return self.type_num def set_type(self, type): """Set the type number for this object class.""" self.type_num = type # DEPRECATED: use type_num or type_name as needed. type = property(get_type, set_type) def __repr__(self): return "<%s %s>" % (self.__class__.__name__, self.id) def __ne__(self, other): """Check whether this object does not match the other.""" return not isinstance(other, ShaFile) or self.id != other.id def __eq__(self, other): """Return True if the SHAs of the two objects match.""" return isinstance(other, ShaFile) and self.id == other.id def __lt__(self, other): """Return whether SHA of this object is less than the other.""" if not isinstance(other, ShaFile): raise TypeError return self.id < other.id def __le__(self, other): """Check whether SHA of this object is less than or equal to the other.""" if not isinstance(other, ShaFile): raise TypeError return self.id <= other.id def __cmp__(self, other): """Compare the SHA of this object with that of the other object.""" if not isinstance(other, ShaFile): raise TypeError return cmp(self.id, other.id) # noqa: F821 class Blob(ShaFile): """A Git Blob object.""" __slots__ = () type_name = b"blob" type_num = 3 def __init__(self): super(Blob, self).__init__() self._chunked_text = [] self._needs_serialization = False def _get_data(self): return self.as_raw_string() def _set_data(self, data): self.set_raw_string(data) data = property( _get_data, _set_data, doc="The text contained within the blob object." ) def _get_chunked(self): return self._chunked_text def _set_chunked(self, chunks): self._chunked_text = chunks def _serialize(self): return self._chunked_text def _deserialize(self, chunks): self._chunked_text = chunks chunked = property( _get_chunked, _set_chunked, doc="The text in the blob object, as chunks (not necessarily lines)", ) @classmethod def from_path(cls, path): blob = ShaFile.from_path(path) if not isinstance(blob, cls): raise NotBlobError(path) return blob def check(self): """Check this object for internal consistency. Raises: ObjectFormatException: if the object is malformed in some way """ super(Blob, self).check() def splitlines(self): """Return list of lines in this blob. This preserves the original line endings. """ chunks = self.chunked if not chunks: return [] if len(chunks) == 1: return chunks[0].splitlines(True) remaining = None ret = [] for chunk in chunks: lines = chunk.splitlines(True) if len(lines) > 1: ret.append((remaining or b"") + lines[0]) ret.extend(lines[1:-1]) remaining = lines[-1] elif len(lines) == 1: if remaining is None: remaining = lines.pop() else: remaining += lines.pop() if remaining is not None: ret.append(remaining) return ret def _parse_message(chunks): """Parse a message with a list of fields and a body. Args: chunks: the raw chunks of the tag or commit object. Returns: iterator of tuples of (field, value), one per header line, in the order read from the text, possibly including duplicates. Includes a field named None for the freeform tag/commit text. """ f = BytesIO(b"".join(chunks)) k = None v = "" eof = False def _strip_last_newline(value): """Strip the last newline from value""" if value and value.endswith(b"\n"): return value[:-1] return value # Parse the headers # # Headers can contain newlines. The next line is indented with a space. # We store the latest key as 'k', and the accumulated value as 'v'. for line in f: if line.startswith(b" "): # Indented continuation of the previous line v += line[1:] else: if k is not None: # We parsed a new header, return its value yield (k, _strip_last_newline(v)) if line == b"\n": # Empty line indicates end of headers break (k, v) = line.split(b" ", 1) else: # We reached end of file before the headers ended. We still need to # return the previous header, then we need to return a None field for # the text. eof = True if k is not None: yield (k, _strip_last_newline(v)) yield (None, None) if not eof: # We didn't reach the end of file while parsing headers. We can return # the rest of the file as a message. yield (None, f.read()) f.close() class Tag(ShaFile): """A Git Tag object.""" type_name = b"tag" type_num = 4 __slots__ = ( "_tag_timezone_neg_utc", "_name", "_object_sha", "_object_class", "_tag_time", "_tag_timezone", "_tagger", "_message", "_signature", ) def __init__(self): super(Tag, self).__init__() self._tagger = None self._tag_time = None self._tag_timezone = None self._tag_timezone_neg_utc = False self._signature = None @classmethod def from_path(cls, filename): tag = ShaFile.from_path(filename) if not isinstance(tag, cls): raise NotTagError(filename) return tag def check(self): """Check this object for internal consistency. Raises: ObjectFormatException: if the object is malformed in some way """ super(Tag, self).check() self._check_has_member("_object_sha", "missing object sha") self._check_has_member("_object_class", "missing object type") self._check_has_member("_name", "missing tag name") if not self._name: raise ObjectFormatException("empty tag name") check_hexsha(self._object_sha, "invalid object sha") if getattr(self, "_tagger", None): check_identity(self._tagger, "invalid tagger") self._check_has_member("_tag_time", "missing tag time") check_time(self._tag_time) last = None for field, _ in _parse_message(self._chunked_text): if field == _OBJECT_HEADER and last is not None: raise ObjectFormatException("unexpected object") elif field == _TYPE_HEADER and last != _OBJECT_HEADER: raise ObjectFormatException("unexpected type") elif field == _TAG_HEADER and last != _TYPE_HEADER: raise ObjectFormatException("unexpected tag name") elif field == _TAGGER_HEADER and last != _TAG_HEADER: raise ObjectFormatException("unexpected tagger") last = field def _serialize(self): chunks = [] chunks.append(git_line(_OBJECT_HEADER, self._object_sha)) chunks.append(git_line(_TYPE_HEADER, self._object_class.type_name)) chunks.append(git_line(_TAG_HEADER, self._name)) if self._tagger: if self._tag_time is None: chunks.append(git_line(_TAGGER_HEADER, self._tagger)) else: chunks.append( git_line( _TAGGER_HEADER, self._tagger, str(self._tag_time).encode("ascii"), format_timezone(self._tag_timezone, self._tag_timezone_neg_utc), ) ) if self._message is not None: chunks.append(b"\n") # To close headers chunks.append(self._message) if self._signature is not None: chunks.append(self._signature) return chunks def _deserialize(self, chunks): """Grab the metadata attached to the tag""" self._tagger = None self._tag_time = None self._tag_timezone = None self._tag_timezone_neg_utc = False for field, value in _parse_message(chunks): if field == _OBJECT_HEADER: self._object_sha = value elif field == _TYPE_HEADER: obj_class = object_class(value) if not obj_class: raise ObjectFormatException("Not a known type: %s" % value) self._object_class = obj_class elif field == _TAG_HEADER: self._name = value elif field == _TAGGER_HEADER: ( self._tagger, self._tag_time, (self._tag_timezone, self._tag_timezone_neg_utc), ) = parse_time_entry(value) elif field is None: if value is None: self._message = None self._signature = None else: try: sig_idx = value.index(BEGIN_PGP_SIGNATURE) except ValueError: self._message = value self._signature = None else: self._message = value[:sig_idx] self._signature = value[sig_idx:] else: raise ObjectFormatException("Unknown field %s" % field) def _get_object(self): """Get the object pointed to by this tag. Returns: tuple of (object class, sha). """ return (self._object_class, self._object_sha) def _set_object(self, value): (self._object_class, self._object_sha) = value self._needs_serialization = True object = property(_get_object, _set_object) name = serializable_property("name", "The name of this tag") tagger = serializable_property( "tagger", "Returns the name of the person who created this tag" ) tag_time = serializable_property( "tag_time", "The creation timestamp of the tag. As the number of seconds " "since the epoch", ) tag_timezone = serializable_property( "tag_timezone", "The timezone that tag_time is in." ) message = serializable_property("message", "the message attached to this tag") signature = serializable_property("signature", "Optional detached GPG signature") def sign(self, keyid: Optional[str] = None): import gpg with gpg.Context(armor=True) as c: if keyid is not None: key = c.get_key(keyid) with gpg.Context(armor=True, signers=[key]) as ctx: self.signature, unused_result = ctx.sign( self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH, ) else: self.signature, unused_result = c.sign( self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH ) + def verify(self, keyids: Optional[Iterable[str]] = None): + """Verify GPG signature for this tag (if it is signed). + + Args: + keyids: Optional iterable of trusted keyids for this tag. + If this tag is not signed by any key in keyids verification will + fail. If not specified, this function only verifies that the tag + has a valid signature. + + Raises: + gpg.errors.BadSignatures: if GPG signature verification fails + gpg.errors.MissingSignatures: if tag was not signed by a key + specified in keyids + """ + if self._signature is None: + return + + import gpg + + with gpg.Context() as ctx: + data, result = ctx.verify( + self.as_raw_string()[: -len(self._signature)], + signature=self._signature, + ) + if keyids: + keys = [ + ctx.get_key(key) + for key in keyids + ] + for key in keys: + for subkey in keys: + for sig in result.signatures: + if subkey.can_sign and subkey.fpr == sig.fpr: + return + raise gpg.errors.MissingSignatures( + result, keys, results=(data, result) + ) + class TreeEntry(namedtuple("TreeEntry", ["path", "mode", "sha"])): """Named tuple encapsulating a single tree entry.""" def in_path(self, path): """Return a copy of this entry with the given path prepended.""" if not isinstance(self.path, bytes): raise TypeError("Expected bytes for path, got %r" % path) return TreeEntry(posixpath.join(path, self.path), self.mode, self.sha) def parse_tree(text, strict=False): """Parse a tree text. Args: text: Serialized text to parse Returns: iterator of tuples of (name, mode, sha) Raises: ObjectFormatException: if the object was malformed in some way """ count = 0 length = len(text) while count < length: mode_end = text.index(b" ", count) mode_text = text[count:mode_end] if strict and mode_text.startswith(b"0"): raise ObjectFormatException("Invalid mode '%s'" % mode_text) try: mode = int(mode_text, 8) except ValueError: raise ObjectFormatException("Invalid mode '%s'" % mode_text) name_end = text.index(b"\0", mode_end) name = text[mode_end + 1 : name_end] count = name_end + 21 sha = text[name_end + 1 : count] if len(sha) != 20: raise ObjectFormatException("Sha has invalid length") hexsha = sha_to_hex(sha) yield (name, mode, hexsha) def serialize_tree(items): """Serialize the items in a tree to a text. Args: items: Sorted iterable over (name, mode, sha) tuples Returns: Serialized tree text as chunks """ for name, mode, hexsha in items: yield ( ("%04o" % mode).encode("ascii") + b" " + name + b"\0" + hex_to_sha(hexsha) ) def sorted_tree_items(entries, name_order): """Iterate over a tree entries dictionary. Args: name_order: If True, iterate entries in order of their name. If False, iterate entries in tree order, that is, treat subtree entries as having '/' appended. entries: Dictionary mapping names to (mode, sha) tuples Returns: Iterator over (name, mode, hexsha) """ key_func = name_order and key_entry_name_order or key_entry for name, entry in sorted(entries.items(), key=key_func): mode, hexsha = entry # Stricter type checks than normal to mirror checks in the C version. mode = int(mode) if not isinstance(hexsha, bytes): raise TypeError("Expected bytes for SHA, got %r" % hexsha) yield TreeEntry(name, mode, hexsha) def key_entry(entry): """Sort key for tree entry. Args: entry: (name, value) tuplee """ (name, value) = entry if stat.S_ISDIR(value[0]): name += b"/" return name def key_entry_name_order(entry): """Sort key for tree entry in name order.""" return entry[0] def pretty_format_tree_entry(name, mode, hexsha, encoding="utf-8"): """Pretty format tree entry. Args: name: Name of the directory entry mode: Mode of entry hexsha: Hexsha of the referenced object Returns: string describing the tree entry """ if mode & stat.S_IFDIR: kind = "tree" else: kind = "blob" return "%04o %s %s\t%s\n" % ( mode, kind, hexsha.decode("ascii"), name.decode(encoding, "replace"), ) class Tree(ShaFile): """A Git tree object""" type_name = b"tree" type_num = 2 __slots__ = "_entries" def __init__(self): super(Tree, self).__init__() self._entries = {} @classmethod def from_path(cls, filename): tree = ShaFile.from_path(filename) if not isinstance(tree, cls): raise NotTreeError(filename) return tree def __contains__(self, name): return name in self._entries def __getitem__(self, name): return self._entries[name] def __setitem__(self, name, value): """Set a tree entry by name. Args: name: The name of the entry, as a string. value: A tuple of (mode, hexsha), where mode is the mode of the entry as an integral type and hexsha is the hex SHA of the entry as a string. """ mode, hexsha = value self._entries[name] = (mode, hexsha) self._needs_serialization = True def __delitem__(self, name): del self._entries[name] self._needs_serialization = True def __len__(self): return len(self._entries) def __iter__(self): return iter(self._entries) def add(self, name, mode, hexsha): """Add an entry to the tree. Args: mode: The mode of the entry as an integral type. Not all possible modes are supported by git; see check() for details. name: The name of the entry, as a string. hexsha: The hex SHA of the entry as a string. """ if isinstance(name, int) and isinstance(mode, bytes): (name, mode) = (mode, name) warnings.warn( "Please use Tree.add(name, mode, hexsha)", category=DeprecationWarning, stacklevel=2, ) self._entries[name] = mode, hexsha self._needs_serialization = True def iteritems(self, name_order=False): """Iterate over entries. Args: name_order: If True, iterate in name order instead of tree order. Returns: Iterator over (name, mode, sha) tuples """ return sorted_tree_items(self._entries, name_order) def items(self): """Return the sorted entries in this tree. Returns: List with (name, mode, sha) tuples """ return list(self.iteritems()) def _deserialize(self, chunks): """Grab the entries in the tree""" try: parsed_entries = parse_tree(b"".join(chunks)) except ValueError as e: raise ObjectFormatException(e) # TODO: list comprehension is for efficiency in the common (small) # case; if memory efficiency in the large case is a concern, use a # genexp. self._entries = dict([(n, (m, s)) for n, m, s in parsed_entries]) def check(self): """Check this object for internal consistency. Raises: ObjectFormatException: if the object is malformed in some way """ super(Tree, self).check() last = None allowed_modes = ( stat.S_IFREG | 0o755, stat.S_IFREG | 0o644, stat.S_IFLNK, stat.S_IFDIR, S_IFGITLINK, # TODO: optionally exclude as in git fsck --strict stat.S_IFREG | 0o664, ) for name, mode, sha in parse_tree(b"".join(self._chunked_text), True): check_hexsha(sha, "invalid sha %s" % sha) if b"/" in name or name in (b"", b".", b"..", b".git"): raise ObjectFormatException( "invalid name %s" % name.decode("utf-8", "replace") ) if mode not in allowed_modes: raise ObjectFormatException("invalid mode %06o" % mode) entry = (name, (mode, sha)) if last: if key_entry(last) > key_entry(entry): raise ObjectFormatException("entries not sorted") if name == last[0]: raise ObjectFormatException("duplicate entry %s" % name) last = entry def _serialize(self): return list(serialize_tree(self.iteritems())) def as_pretty_string(self): text = [] for name, mode, hexsha in self.iteritems(): text.append(pretty_format_tree_entry(name, mode, hexsha)) return "".join(text) def lookup_path(self, lookup_obj, path): """Look up an object in a Git tree. Args: lookup_obj: Callback for retrieving object by SHA1 path: Path to lookup Returns: A tuple of (mode, SHA) of the resulting path. """ parts = path.split(b"/") sha = self.id mode = None for p in parts: if not p: continue obj = lookup_obj(sha) if not isinstance(obj, Tree): raise NotTreeError(sha) mode, sha = obj[p] return mode, sha def parse_timezone(text): """Parse a timezone text fragment (e.g. '+0100'). Args: text: Text to parse. Returns: Tuple with timezone as seconds difference to UTC and a boolean indicating whether this was a UTC timezone prefixed with a negative sign (-0000). """ # cgit parses the first character as the sign, and the rest # as an integer (using strtol), which could also be negative. # We do the same for compatibility. See #697828. if not text[0] in b"+-": raise ValueError("Timezone must start with + or - (%(text)s)" % vars()) sign = text[:1] offset = int(text[1:]) if sign == b"-": offset = -offset unnecessary_negative_timezone = offset >= 0 and sign == b"-" signum = (offset < 0) and -1 or 1 offset = abs(offset) hours = int(offset / 100) minutes = offset % 100 return ( signum * (hours * 3600 + minutes * 60), unnecessary_negative_timezone, ) def format_timezone(offset, unnecessary_negative_timezone=False): """Format a timezone for Git serialization. Args: offset: Timezone offset as seconds difference to UTC unnecessary_negative_timezone: Whether to use a minus sign for UTC or positive timezones (-0000 and --700 rather than +0000 / +0700). """ if offset % 60 != 0: raise ValueError("Unable to handle non-minute offset.") if offset < 0 or unnecessary_negative_timezone: sign = "-" offset = -offset else: sign = "+" return ("%c%02d%02d" % (sign, offset / 3600, (offset / 60) % 60)).encode("ascii") def parse_time_entry(value): """Parse time entry behavior Args: value: Bytes representing a git commit/tag line Raises: ObjectFormatException in case of parsing error (malformed field date) Returns: Tuple of (author, time, (timezone, timezone_neg_utc)) """ try: sep = value.rindex(b"> ") except ValueError: return (value, None, (None, False)) try: person = value[0 : sep + 1] rest = value[sep + 2 :] timetext, timezonetext = rest.rsplit(b" ", 1) time = int(timetext) timezone, timezone_neg_utc = parse_timezone(timezonetext) except ValueError as e: raise ObjectFormatException(e) return person, time, (timezone, timezone_neg_utc) def parse_commit(chunks): """Parse a commit object from chunks. Args: chunks: Chunks to parse Returns: Tuple of (tree, parents, author_info, commit_info, encoding, mergetag, gpgsig, message, extra) """ parents = [] extra = [] tree = None author_info = (None, None, (None, None)) commit_info = (None, None, (None, None)) encoding = None mergetag = [] message = None gpgsig = None for field, value in _parse_message(chunks): # TODO(jelmer): Enforce ordering if field == _TREE_HEADER: tree = value elif field == _PARENT_HEADER: parents.append(value) elif field == _AUTHOR_HEADER: author_info = parse_time_entry(value) elif field == _COMMITTER_HEADER: commit_info = parse_time_entry(value) elif field == _ENCODING_HEADER: encoding = value elif field == _MERGETAG_HEADER: mergetag.append(Tag.from_string(value + b"\n")) elif field == _GPGSIG_HEADER: gpgsig = value elif field is None: message = value else: extra.append((field, value)) return ( tree, parents, author_info, commit_info, encoding, mergetag, gpgsig, message, extra, ) class Commit(ShaFile): """A git commit object""" type_name = b"commit" type_num = 1 __slots__ = ( "_parents", "_encoding", "_extra", "_author_timezone_neg_utc", "_commit_timezone_neg_utc", "_commit_time", "_author_time", "_author_timezone", "_commit_timezone", "_author", "_committer", "_tree", "_message", "_mergetag", "_gpgsig", ) def __init__(self): super(Commit, self).__init__() self._parents = [] self._encoding = None self._mergetag = [] self._gpgsig = None self._extra = [] self._author_timezone_neg_utc = False self._commit_timezone_neg_utc = False @classmethod def from_path(cls, path): commit = ShaFile.from_path(path) if not isinstance(commit, cls): raise NotCommitError(path) return commit def _deserialize(self, chunks): ( self._tree, self._parents, author_info, commit_info, self._encoding, self._mergetag, self._gpgsig, self._message, self._extra, ) = parse_commit(chunks) ( self._author, self._author_time, (self._author_timezone, self._author_timezone_neg_utc), ) = author_info ( self._committer, self._commit_time, (self._commit_timezone, self._commit_timezone_neg_utc), ) = commit_info def check(self): """Check this object for internal consistency. Raises: ObjectFormatException: if the object is malformed in some way """ super(Commit, self).check() self._check_has_member("_tree", "missing tree") self._check_has_member("_author", "missing author") self._check_has_member("_committer", "missing committer") self._check_has_member("_author_time", "missing author time") self._check_has_member("_commit_time", "missing commit time") for parent in self._parents: check_hexsha(parent, "invalid parent sha") check_hexsha(self._tree, "invalid tree sha") check_identity(self._author, "invalid author") check_identity(self._committer, "invalid committer") check_time(self._author_time) check_time(self._commit_time) last = None for field, _ in _parse_message(self._chunked_text): if field == _TREE_HEADER and last is not None: raise ObjectFormatException("unexpected tree") elif field == _PARENT_HEADER and last not in ( _PARENT_HEADER, _TREE_HEADER, ): raise ObjectFormatException("unexpected parent") elif field == _AUTHOR_HEADER and last not in ( _TREE_HEADER, _PARENT_HEADER, ): raise ObjectFormatException("unexpected author") elif field == _COMMITTER_HEADER and last != _AUTHOR_HEADER: raise ObjectFormatException("unexpected committer") elif field == _ENCODING_HEADER and last != _COMMITTER_HEADER: raise ObjectFormatException("unexpected encoding") last = field # TODO: optionally check for duplicate parents def _serialize(self): chunks = [] tree_bytes = self._tree.id if isinstance(self._tree, Tree) else self._tree chunks.append(git_line(_TREE_HEADER, tree_bytes)) for p in self._parents: chunks.append(git_line(_PARENT_HEADER, p)) chunks.append( git_line( _AUTHOR_HEADER, self._author, str(self._author_time).encode("ascii"), format_timezone(self._author_timezone, self._author_timezone_neg_utc), ) ) chunks.append( git_line( _COMMITTER_HEADER, self._committer, str(self._commit_time).encode("ascii"), format_timezone(self._commit_timezone, self._commit_timezone_neg_utc), ) ) if self.encoding: chunks.append(git_line(_ENCODING_HEADER, self.encoding)) for mergetag in self.mergetag: mergetag_chunks = mergetag.as_raw_string().split(b"\n") chunks.append(git_line(_MERGETAG_HEADER, mergetag_chunks[0])) # Embedded extra header needs leading space for chunk in mergetag_chunks[1:]: chunks.append(b" " + chunk + b"\n") # No trailing empty line if chunks[-1].endswith(b" \n"): chunks[-1] = chunks[-1][:-2] for k, v in self.extra: if b"\n" in k or b"\n" in v: raise AssertionError("newline in extra data: %r -> %r" % (k, v)) chunks.append(git_line(k, v)) if self.gpgsig: sig_chunks = self.gpgsig.split(b"\n") chunks.append(git_line(_GPGSIG_HEADER, sig_chunks[0])) for chunk in sig_chunks[1:]: chunks.append(git_line(b"", chunk)) chunks.append(b"\n") # There must be a new line after the headers chunks.append(self._message) return chunks tree = serializable_property("tree", "Tree that is the state of this commit") def _get_parents(self): """Return a list of parents of this commit.""" return self._parents def _set_parents(self, value): """Set a list of parents of this commit.""" self._needs_serialization = True self._parents = value parents = property( _get_parents, _set_parents, doc="Parents of this commit, by their SHA1.", ) def _get_extra(self): """Return extra settings of this commit.""" return self._extra extra = property( _get_extra, doc="Extra header fields not understood (presumably added in a " "newer version of git). Kept verbatim so the object can " "be correctly reserialized. For private commit metadata, use " "pseudo-headers in Commit.message, rather than this field.", ) author = serializable_property("author", "The name of the author of the commit") committer = serializable_property( "committer", "The name of the committer of the commit" ) message = serializable_property("message", "The commit message") commit_time = serializable_property( "commit_time", "The timestamp of the commit. As the number of seconds since the " "epoch.", ) commit_timezone = serializable_property( "commit_timezone", "The zone the commit time is in" ) author_time = serializable_property( "author_time", "The timestamp the commit was written. As the number of " "seconds since the epoch.", ) author_timezone = serializable_property( "author_timezone", "Returns the zone the author time is in." ) encoding = serializable_property("encoding", "Encoding of the commit message.") mergetag = serializable_property("mergetag", "Associated signed tag.") gpgsig = serializable_property("gpgsig", "GPG Signature.") OBJECT_CLASSES = ( Commit, Tree, Blob, Tag, ) _TYPE_MAP = {} # type: Dict[Union[bytes, int], Type[ShaFile]] for cls in OBJECT_CLASSES: _TYPE_MAP[cls.type_name] = cls _TYPE_MAP[cls.type_num] = cls # Hold on to the pure-python implementations for testing _parse_tree_py = parse_tree _sorted_tree_items_py = sorted_tree_items try: # Try to import C versions from dulwich._objects import parse_tree, sorted_tree_items # type: ignore except ImportError: pass diff --git a/dulwich/tests/compat/__init__.py b/dulwich/tests/compat/__init__.py index 515114c3..24747775 100644 --- a/dulwich/tests/compat/__init__.py +++ b/dulwich/tests/compat/__init__.py @@ -1,41 +1,42 @@ # __init__.py -- Compatibility tests for dulwich # Copyright (C) 2010 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Compatibility tests for Dulwich.""" import unittest def test_suite(): names = [ "client", "pack", "patch", + "porcelain", "repository", "server", "utils", "web", ] module_names = ["dulwich.tests.compat.test_" + name for name in names] result = unittest.TestSuite() loader = unittest.TestLoader() suite = loader.loadTestsFromNames(module_names) result.addTests(suite) return result diff --git a/dulwich/tests/compat/test_porcelain.py b/dulwich/tests/compat/test_porcelain.py new file mode 100644 index 00000000..6eb4a3d4 --- /dev/null +++ b/dulwich/tests/compat/test_porcelain.py @@ -0,0 +1,97 @@ +# test_porcelain .py -- Tests for dulwich.porcelain/CGit compatibility +# Copyright (C) 2010 Google, Inc. +# +# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU +# General Public License as public by the Free Software Foundation; version 2.0 +# or (at your option) any later version. You can redistribute it and/or +# modify it under the terms of either of these two licenses. +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# You should have received a copy of the licenses; if not, see +# for a copy of the GNU General Public License +# and for a copy of the Apache +# License, Version 2.0. +# + +"""Compatibility tests for dulwich.porcelain.""" + +import os +import platform +import sys +from unittest import skipIf + +from dulwich import porcelain +from dulwich.tests.utils import ( + build_commit_graph, +) +from dulwich.tests.compat.utils import ( + run_git_or_fail, + CompatTestCase, +) +from dulwich.tests.test_porcelain import ( + PorcelainGpgTestCase, +) + + +@skipIf(platform.python_implementation() == "PyPy" or sys.platform == "win32", "gpgme not easily available or supported on Windows and PyPy") +class TagCreateSignTestCase(PorcelainGpgTestCase, CompatTestCase): + def setUp(self): + super(TagCreateSignTestCase, self).setUp() + + def test_sign(self): + # Test that dulwich signatures can be verified by CGit + c1, c2, c3 = build_commit_graph( + self.repo.object_store, [[1], [2, 1], [3, 1, 2]] + ) + self.repo.refs[b"HEAD"] = c3.id + cfg = self.repo.get_config() + cfg.set(("user",), "signingKey", PorcelainGpgTestCase.DEFAULT_KEY_ID) + self.import_default_key() + + porcelain.tag_create( + self.repo.path, + b"tryme", + b"foo ", + b"bar", + annotated=True, + sign=True, + ) + + run_git_or_fail( + [ + "--git-dir={}".format(self.repo.controldir()), + "tag", + "-v", + "tryme" + ], + env=os.environ.copy(), + ) + + def test_verify(self): + # Test that CGit signatures can be verified by dulwich + c1, c2, c3 = build_commit_graph( + self.repo.object_store, [[1], [2, 1], [3, 1, 2]] + ) + self.repo.refs[b"HEAD"] = c3.id + self.import_default_key() + + run_git_or_fail( + [ + "--git-dir={}".format(self.repo.controldir()), + "tag", + "-u", + PorcelainGpgTestCase.DEFAULT_KEY_ID, + "-m", + "foo", + "verifyme", + ], + env=os.environ.copy(), + ) + tag = self.repo[b"refs/tags/verifyme"] + self.assertNotEqual(tag.signature, None) + tag.verify() diff --git a/dulwich/tests/test_porcelain.py b/dulwich/tests/test_porcelain.py index a2901169..0a0e1882 100644 --- a/dulwich/tests/test_porcelain.py +++ b/dulwich/tests/test_porcelain.py @@ -1,2738 +1,2738 @@ # test_porcelain.py -- porcelain tests # Copyright (C) 2013 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Tests for dulwich.porcelain.""" from io import BytesIO, StringIO import os import platform import re import shutil import stat import subprocess import sys import tarfile import tempfile import time from unittest import skipIf from dulwich import porcelain from dulwich.diff_tree import tree_changes from dulwich.errors import CommitError from dulwich.objects import ( Blob, Tag, Tree, ZERO_SHA, ) from dulwich.repo import ( NoIndexPresent, Repo, ) from dulwich.tests import ( TestCase, ) from dulwich.tests.utils import ( build_commit_graph, make_commit, make_object, ) def flat_walk_dir(dir_to_walk): for dirpath, _, filenames in os.walk(dir_to_walk): rel_dirpath = os.path.relpath(dirpath, dir_to_walk) if not dirpath == dir_to_walk: yield rel_dirpath for filename in filenames: if dirpath == dir_to_walk: yield filename else: yield os.path.join(rel_dirpath, filename) class PorcelainTestCase(TestCase): def setUp(self): super(PorcelainTestCase, self).setUp() self.test_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, self.test_dir) self.repo_path = os.path.join(self.test_dir, "repo") self.repo = Repo.init(self.repo_path, mkdir=True) self.addCleanup(self.repo.close) class PorcelainGpgTestCase(PorcelainTestCase): DEFAULT_KEY = """ -----BEGIN PGP PRIVATE KEY BLOCK----- lQVYBGBjIyIBDADAwydvMPQqeEiK54FG1DHwT5sQejAaJOb+PsOhVa4fLcKsrO3F g5CxO+/9BHCXAr8xQAtp/gOhDN05fyK3MFyGlL9s+Cd8xf34S3R4rN/qbF0oZmaa FW0MuGnniq54HINs8KshadVn1Dhi/GYSJ588qNFRl/qxFTYAk+zaGsgX/QgFfy0f djWXJLypZXu9D6DlyJ0cPSzUlfBkI2Ytx6grzIquRjY0FbkjK3l+iGsQ+ebRMdcP Sqd5iTN9XuzIUVoBFAZBRjibKV3N2wxlnCbfLlzCyDp7rktzSThzjJ2pVDuLrMAx 6/L9hIhwmFwdtY4FBFGvMR0b0Ugh3kCsRWr8sgj9I7dUoLHid6ObYhJFhnD3GzRc U+xX1uy3iTCqJDsG334aQIhC5Giuxln4SUZna2MNbq65ksh38N1aM/t3+Dc/TKVB rb5KWicRPCQ4DIQkHMDCSPyj+dvRLCPzIaPvHD7IrCfHYHOWuvvPGCpwjo0As3iP IecoMeguPLVaqgcAEQEAAQAL/i5/pQaUd4G7LDydpbixPS6r9UrfPrU/y5zvBP/p DCynPDutJ1oq539pZvXQ2VwEJJy7x0UVKkjyMndJLNWly9wHC7o8jkHx/NalVP47 LXR+GWbCdOOcYYbdAWcCNB3zOtzPnWhdAEagkc2G9xRQDIB0dLHLCIUpCbLP/CWM qlHnDsVMrVTWjgzcpsnyGgw8NeLYJtYGB8dsN+XgCCjo7a9LEvUBKNgdmWBbf14/ iBw7PCugazFcH9QYfZwzhsi3nqRRagTXHbxFRG0LD9Ro9qCEutHYGP2PJ59Nj8+M zaVkJj/OxWxVOGvn2q16mQBCjKpbWfqXZVVl+G5DGOmiSTZqXy+3j6JCKdOMy6Qd JBHOHhFZXYmWYaaPzoc33T/C3QhMfY5sOtUDLJmV05Wi4dyBeNBEslYgUuTk/jXb 5ZAie25eDdrsoqkcnSs2ZguMF7AXhe6il2zVhUUMs/6UZgd6I7I4Is0HXT/pnxEp uiTRFu4v8E+u+5a8O3pffe5boQYA3TsIxceen20qY+kRaTOkURHMZLn/y6KLW8bZ rNJyXWS9hBAcbbSGhfOwYfzbDCM17yPQO3E2zo8lcGdRklUdIIaCxQwtu36N5dfx OLCCQc5LmYdl/EAm91iAhrr7dNntZ18MU09gdzUu+ONZwu4CP3cJT83+qYZULso8 4Fvd/X8IEfGZ7kM+ylrdqBwtlrn8yYXtom+ows2M2UuNR53B+BUOd73kVLTkTCjE JH63+nE8BqG7tDLCMws+23SAA3xxBgDfDrr0x7zCozQKVQEqBzQr9Uoo/c/ZjAfi syzNSrDz+g5gqJYtuL9XpPJVWf6V1GXVyJlSbxR9CjTkBxmlPxpvV25IsbVSsh0o aqkf2eWpbCL6Qb2E0jd1rvf8sGeTTohzYfiSVVsC2t9ngRO/CmetizwQBvRzLGMZ 4mtAPiy7ZEDc2dFrPp7zlKISYmJZUx/DJVuZWuOrVMpBP+bSgJXoMTlICxZUqUnE 2VKVStb/L+Tl8XCwIWdrZb9BaDnHqfcGAM2B4HNPxP88Yj1tEDly/vqeb3vVMhj+ S1lunnLdgxp46YyuTMYAzj88eCGurRtzBsdxxlGAsioEnZGebEqAHQbieKq/DO6I MOMZHMSVBDqyyIx3assGlxSX8BSFW0lhKyT7i0XqnAgCJ9f/5oq0SbFGq+01VQb7 jIx9PbcYJORxsE0JG/CXXPv27bRtQXsudkWGSYvC0NLOgk4z8+kQpQtyFh16lujq WRwMeriu0qNDjCa1/eHIKDovhAZ3GyO5/9m1tBlUZXN0IFVzZXIgPHRlc3RAdGVz dC5jb20+iQHUBBMBCAA+FiEEjrR8MQ4fJK44PYMvfN2AClLmXiYFAmBjIyICGwMF CQPCZwAFCwkIBwIGFQoJCAsCBBYCAwECHgECF4AACgkQfN2AClLmXiZeGQwAoma6 2OJuX+OROtZR3eK6laY39FS2a8RgA6MTwU0htM4keSWBbDrQD05vUx1D/paD6XEu S2OUo8pGsarP6TE3S3yRT4ImHpnt52TiOemMErGCHACmmyDCOkvGV2Sg/pb0zINN sBMHMvDYBSZ2Xcvy5LGXbo5C/lja0Jjg5PsCWWuhrAVaNqJ8IqxhiHIy1F2H5RXj c++pjl2GyBIDR8IdQlG0EGNNpUgnL1zvUkr5Tbk/H8KJh+PgcBlgip9ocdADcSKI ITvxjingp16LGgo2jPpCqyfjp43n71FRJTJbuTqOZzGL9c5DwYoCt1BgX379ZLYx luzeGKu3Vz+L8fpM5fiTg35lXSpzw2mJdhVrBSt54oF+kjs0pON93OOW0TF3z8Oi 1FmJ6bMUHFrxp63/sTnryGCuYFgbWpb0QPP9i9TQvn3aajlLll19JkgoIh750JGh QH4JZixX9k32jzr38kzy9RA5FBqhz2egp7Z22uiIhmeR/2zhpFpAdX1uroF9nQVY BGBjIyIBDADghIo9wXnRxzfdDTvwnP8dHpLAIaPokgdpyLswqUCixJWiW2xcV6we UjEWwH6neN/t1uZYVehbrotxVPla+MPvzhxp6/cmG+2lhzEBOp6zRwnL1wIB6HoK JfpREhyMc8rLR0zMso1L1bJTyydvnu07a7BWo3VWKjilb0rEZZUSD/2hidx5HxMO JSoidLWed/PPuv6yht3NtA4UThlcfldm9G6PbqCdm1kMEKAkq0wVJvhPJ6gEFRNJ imgygfUwMDFXEIhQtxjgdV5Uoz3O5452VLoRsDlgpi3E0WDGj7WXDaO5uSU0T5aJ gVgHCP/fxZhHuQFk2YYIl5nCBpOZyWWI0IKmscTuEwzpkhICQDQFvcMZ5ibsl7wA 2P7YTrQfFDMjjzuaK80GYPfxDFlyKUyLqFt8w/QzsZLDLX7+jxIEpbRAaMw/JsWq m5BMxxbS3CIQiS5S3oSKDsNINelqWFfwvLhvlQra8gIxyNTlek25OdgG66BiiX+s eH8A/ql+F+MAEQEAAQAL/1jrNSLjMt9pwo6qFKClVQZP2vf7+sH7v7LeHIDXr3En YUnVYnOqB1FU5PspTp/+J9W25DB9CZLx7Gj8qeslFdiuLSOoIBB4RCToB3kAoeTH 0DHqW/GshFTrmJkuDp9zpo/ek6SIXJx5rHAyR9KVw0fizQprH2f6PcgLbTWeM61d Juqowmg37eCOyIKv7VQvFqEhYokLD+JNmrvg+Htg0DXGvdjRjAwPf/NezEXpj67a 6cHTp1/Chwp7pevG+3fTxaCJFesl5/TxxtnaBLE8m2uo/S6Hxgn9l0edonroe1Ql TjEqGLy27qi2z5Rem+v6GWNDRgvAWur13v8FNdyduHlioG/NgRsU9mE2MYeFsfi3 cfNpJQp/wC9PSCIXrb/45mkS8KyjZpCrIPB9RV/m0MREq01TPom7rstZc4A1pD0O t7AtUYS3e95zLyEmeLziPJ9fV4fgPmEudDr1uItnmV0LOskKlpg5sc0hhdrwYoob fkKt2dx6DqfMlcM1ZkUbLQYA4jwfpFJG4HmYvjL2xCJxM0ycjvMbqFN+4UjgYWVl RfOrm1V4Op86FjbRbV6OOCNhznotAg7mul4xtzrrTkK8o3YLBeJseDgl4AWuzXtN a9hE0XpK9gJoEHUuBOOsamVh2HpXESFyE5CclOV7JSh541TlZKfnqfZYCg4JSbp0 UijkawCL5bJJUiGGMD9rZUxIAKQO1DvUEzptS7Jl6S3y5sbIIhilp4KfYWbSk3PP u9CnZD5bLhEQp0elxnb/IL8PBgD+DpTeC8unkGKXUpbe9x0ISI6V1D6FmJq/FxNg 7fMa3QChfGiAyoTm80ZETynj+blRaDO3gY4lTLa3Opubof1EqK2QmwXmpyvXEZNY cQfQ2CCSGOWUCK8jEQamUPf1PWndZXJUmROI1WukhlL71V/ir6zQeVCv1wcwPwcl JPnAe87upEklnCYpvsEldwHUX9u0BWzoULIEsi+ddtHmT0KTeF/DHRy0W15jIHbj Fqhqckj1/6fmr7l7kIi/kN4vWe0F/0Q8IXX+cVMgbl3aIuaGcvENLGcoAsAtPGx8 8SfRgmfuHK64Y7hx1m+Bo215rxJzZRjqHTBPp0BmCi+JKkaavIBrYRbsx20gveI4 dzhLcUhBkiT4Q7oz0/VbGHS1CEf9KFeS/YOGj57s4yHauSVI0XdP9kBRTWmXvBkz sooB2cKHhwhUN7iiT1k717CiTNUT6Q/pcPFCyNuMoBBGQTU206JEgIjQvI3f8xMU MGmGVVQz9/k716ycnhb2JZ/Q/AyQIeHJiQG8BBgBCAAmFiEEjrR8MQ4fJK44PYMv fN2AClLmXiYFAmBjIyICGwwFCQPCZwAACgkQfN2AClLmXibetAwAi7KnMpFR2DOu JKMa+PyCLpaXFVp/Y3uzGXSmDZJ9PFJ8CzQlY4S61Zkfesq8woTmvk58SSxSgBAp UixUK0uFO/s0q5ibODgBXpUQIFW0uhrDpbA08pGunPo/E06Q+5kVocSh9raI1R16 7ke/FcFd5P7BNuXT1CJW70jcK3jh/L3SFZa+PewKwcgrNkQIg2411vek1VSQB+DP URb/OCqD7gFkj1/BaQgMxO1tZUx9tIt/YuwqnxIOOxjnD13aRinZ2bK1SEsG/dyx y19ZB0d6d7eTGdYNWIAClHbnzbsEm5QzcYsDBqGiRS6Je38Wc5qD+z0h/R1GJXjW d9QAenkb7v9v10yLZH0udW8PY5OQ5IjtcUMVppvAn5ZWsApw/eCFEEsvcNuYSnY2 FO+dmjq6Fc8XdqR12jaSaiaSFIdhkTN83HSdZ/luDBqP4mVDLhRnOkLnDZF1HDeR BcZYEcqkDeW64mdTo65ILOPQ+HMCK12AnnBsbyfbsWAUczkQ7GVq =YPjc -----END PGP PRIVATE KEY BLOCK----- """ DEFAULT_KEY_ID = "8EB47C310E1F24AE383D832F7CDD800A52E65E26" NON_DEFAULT_KEY = """ -----BEGIN PGP PRIVATE KEY BLOCK----- lQVYBGBjI0ABDADGWBRp+t02emfzUlhrc1psqIhhecFm6Em0Kv33cfDpnfoMF1tK Yy/4eLYIR7FmpdbFPcDThFNHbXJzBi00L1mp0XQE2l50h/2bDAAgREdZ+NVo5a7/ RSZjauNU1PxW6pnXMehEh1tyIQmV78jAukaakwaicrpIenMiFUN3fAKHnLuFffA6 t0f3LqJvTDhUw/o2vPgw5e6UDQhA1C+KTv1KXVrhJNo88a3hZqCZ76z3drKR411Q zYgT4DUb8lfnbN+z2wfqT9oM5cegh2k86/mxAA3BYOeQrhmQo/7uhezcgbxtdGZr YlbuaNDTSBrn10ZoaxLPo2dJe2zWxgD6MpvsGU1w3tcRW508qo/+xoWp2/pDzmok +uhOh1NAj9zB05VWBz1r7oBgCOIKpkD/LD4VKq59etsZ/UnrYDwKdXWZp7uhshkU M7N35lUJcR76a852dlMdrgpmY18+BP7+o7M+5ElHTiqQbMuE1nHTg8RgVpdV+tUx dg6GWY/XHf5asm8AEQEAAQAL/A85epOp+GnymmEQfI3+5D178D//Lwu9n86vECB6 xAHCqQtdjZnXpDp/1YUsL59P8nzgYRk7SoMskQDoQ/cB/XFuDOhEdMSgHaTVlnrj ktCCq6rqGnUosyolbb64vIfVaSqd/5SnCStpAsnaBoBYrAu4ZmV4xfjDQWwn0q5s u+r56mD0SkjPgbwk/b3qTVagVmf2OFzUgWwm1e/X+bA1oPag1NV8VS4hZPXswT4f qhiyqUFOgP6vUBcqehkjkIDIl/54xII7/P5tp3LIZawvIXqHKNTqYPCqaCqCj+SL vMYDIb6acjescfZoM71eAeHAANeFZzr/rwfBT+dEP6qKmPXNcvgE11X44ZCr04nT zOV/uDUifEvKT5qgtyJpSFEVr7EXubJPKoNNhoYqq9z1pYU7IedX5BloiVXKOKTY 0pk7JkLqf3g5fYtXh/wol1owemITJy5V5PgaqZvk491LkI6S+kWC7ANYUg+TDPIW afxW3E5N1CYV6XDAl0ZihbLcoQYAy0Ky/p/wayWKePyuPBLwx9O89GSONK2pQljZ yaAgxPQ5/i1vx6LIMg7k/722bXR9W3zOjWOin4eatPM3d2hkG96HFvnBqXSmXOPV 03Xqy1/B5Tj8E9naLKUHE/OBQEc363DgLLG9db5HfPlpAngeppYPdyWkhzXyzkgS PylaE5eW3zkdjEbYJ6RBTecTZEgBaMvJNPdWbn//frpP7kGvyiCg5Es+WjLInUZ6 0sdifcNTCewzLXK80v/y5mVOdJhPBgD5zs9cYdyiQJayqAuOr+He1eMHMVUbm9as qBmPrst398eBW9ZYF7eBfTSlUf6B+WnvyLKEGsUf/7IK0EWDlzoBuWzWiHjUAY1g m9eTV2MnvCCCefqCErWwfFo2nWOasAZA9sKD+ICIBY4tbtvSl4yfLBzTMwSvs9ZS K1ocPSYUnhm2miSWZ8RLZPH7roHQasNHpyq/AX7DahFf2S/bJ+46ZGZ8Pigr7hA+ MjmpQ4qVdb5SaViPmZhAKO+PjuCHm+EF/2H0Y3Sl4eXgxZWoQVOUeXdWg9eMfYrj XDtUMIFppV/QxbeztZKvJdfk64vt/crvLsOp0hOky9cKwY89r4QaHfexU3qR+qDq UlMvR1rHk7dS5HZAtw0xKsFJNkuDxvBkMqv8Los8zp3nUl+U99dfZOArzNkW38wx FPa0ixkC9za2BkDrWEA8vTnxw0A2upIFegDUhwOByrSyfPPnG3tKGeqt3Izb/kDk Q9vmo+HgxBOguMIvlzbBfQZwtbd/gXzlvPqCtCJBbm90aGVyIFRlc3QgVXNlciA8 dGVzdDJAdGVzdC5jb20+iQHUBBMBCAA+FiEEapM5P1DF5qzT1vtFuTYhLttOFMAF AmBjI0ACGwMFCQPCZwAFCwkIBwIGFQoJCAsCBBYCAwECHgECF4AACgkQuTYhLttO FMBRlAwAwVQJbAhR39vlSKh2ksjZvM+dZhNEP0UVtE+5D0Ukx3OHPY+zqe6Orkf9 FgXY0h6byr6gudsEnBs4wZ7LgJDiBY/qQBtq93Fy/hZurvDTsMdv9qpSjDroCfTO O1Q40aqlucoaTjtIGwFNXRmd6Xi9IB+dGnFgM0l68MXhkSVnj0LfAK5UxdIQ/4tq MdE0pWn1x+ebdjpBHO6Q4XY+vXfSqO2rOg3uxL54GR9IqNeWUNqIMvNyBO0XkGq5 93bCi4s1dDr101RQsb6MQxYDdZ5tdChyXBQnx5nMWaUALm0GRF8FoFEB4oMoF5gD 2nqSCdnMNVkWich46xvL2h10EzOujvaob+c4FZc+n8gk5GnkuigMOqMJ1xY/QrC6 Ce//RHm2k0NoEPFQaRsHJIQxwZZwmHkzREDnfeEj8hSExM1anQirmIsMtI8knD/8 Vl9HzNfeLCDPtcC28a1vXjsJCF7j4LRInpSgDzovFdARYvCs6equsb3UYRA17O9W bVHhX54dnQVXBGBjI0ABDADJMBYIcG0Yil9YxFs7aYzNbd7alUAr89VbY8eIGPHP 3INFPM1wlBQCu+4j6xdEbhMpppLBZ9A5TEylP4C6qLtPa+oLtPeuSw8gHDE10XE4 lbgPs376rL60XdImSOHhiduACUefYjqpcmFH9Bim1CC+koArYrSQJQx1Jri+OpnT aL/8UID0KzD/kEgMVGlHIVj9oJmb4+j9pW8I/g0wDSnIaEKFMxqu6SIVJ1GWj+MU MvZigjLCsNCZd7PnbOC5VeU3SsXj6he74Jx0AmGMPWIHi9M0DjHO5d1cCbXTnud8 xxM1bOh47aCTnMK5cVyIr+adihgJpVVhrndSM8aklBPRgtozrGNCgF2CkYU2P1bl xfloNr/8UZpM83o+s1aObBszzRNLxnpNORqoLqjfPtLEPQnagxE+4EapCq0NZ/x6 yO5VTwwpNljdFAEk40uGuKyn1QA3uNMHy5DlpLl+tU7t1KEovdZ+OVYsYKZhVzw0 MTpKogk9JI7AN0q62ronPskAEQEAAQAL+O8BUSt1ZCVjPSIXIsrR+ZOSkszZwgJ1 CWIoh0IHYD2vmcMHGIhFYgBdgerpvhptKhaw7GcXDScEnYkyh5s4GE2hxclik1tb j/x1gYCN8BNoyeDdPFxQG73qN12D99QYEctpOsz9xPLIDwmL0j1ehAfhwqHIAPm9 Ca+i8JYMx/F+35S/jnKDXRI+NVlwbiEyXKXxxIqNlpy9i8sDBGexO5H5Sg0zSN/B 1duLekGDbiDw6gLc6bCgnS+0JOUpU07Z2fccMOY9ncjKGD2uIb/ePPUaek92GCQy q0eorCIVbrcQsRc5sSsNtnRKQTQtxioROeDg7kf2oWySeHTswlXW/219ihrSXgte HJd+rPm7DYLEeGLRny8bRKv8rQdAtApHaJE4dAATXeY4RYo4NlXHYaztGYtU6kiM /3zCfWAe9Nn+Wh9jMTZrjefUCagS5r6ZqAh7veNo/vgIGaCLh0a1Ypa0Yk9KFrn3 LYEM3zgk3m3bn+7qgy5cUYXoJ3DGJJEhBgDPonpW0WElqLs5ZMem1ha85SC38F0I kAaSuzuzv3eORiKWuyJGF32Q2XHa1RHQs1JtUKd8rxFer3b8Oq71zLz6JtVc9dmR udvgcJYX0PC11F6WGjZFSSp39dajFp0A5DKUs39F3w7J1yuDM56TDIN810ywufGA HARY1pZbUJAy/dTqjFnCbNjpAakor3hVzqxcmUG+7Y2X9c2AGncT1MqAQC3M8JZc uZvkK8A9cMk8B914ryYE7VsZMdMhyTwHmykGAPgNLLa3RDETeGeGCKWI+ZPOoU0i b5JtJZ1dP3tNwfZKuZBZXKW9gqYqyBa/qhMip84SP30pr/TvulcdAFC759HK8sQZ yJ6Vw24Pc+5ssRxrQUEw1rvJPWhmQCmCOZHBMQl5T6eaTOpR5u3aUKTMlxPKhK9e C1dCSTnI/nyL8An3VKnLy+K/LI42YGphBVLLJmBewuTVDIJviWRdntiG8dElyEJM OywUltk32CEmqgsD9tPO8rXZjnMrMn3gfsiaoQYA6/6/e2utkHr7gAoWBgrBBdqV Hsvqh5Ro2DjLAOpZItO/EdCJfDAmbTYOa04535sBDP2tcH/vipPOPpbr1Y9Y/mNs KCulNxedyqAmEkKOcerLUP5UHju0AB6VBjHJFdU2mqT+UjPyBk7WeKXgFomyoYMv 3KpNOFWRxi0Xji4kKHbttA6Hy3UcGPr9acyUAlDYeKmxbSUYIPhw32bbGrX9+F5Y riTufRsG3jftQVo9zqdcQSD/5pUTMn3EYbEcohYB2YWJAbwEGAEIACYWIQRqkzk/ UMXmrNPW+0W5NiEu204UwAUCYGMjQAIbDAUJA8JnAAAKCRC5NiEu204UwDICC/9o q0illSIAuBHCImbNcOAJmno6ZZ1OkqtQrEmmKjIxUEkMZDvEaAUuGwCyfn3RcaWQ m3HAv0HRtYiBebN9rgfMGEEp9prmTuAOxc4vWfMOoYgo2vLNfaKwLREHrm7NzHSo ovb+ZwWpm724DU6IMdaVpc5LzBPArG0nUcOTZ15Lc2akpbhFjxBHKKimkk0V1YwU lIyn7I5wHbJ5qz1YjaCjUYi6xLwHDxStIE2vR2dzHiVKNZBKfhRd7BIYfpBEvNGS RKR1moy3QUKw71Q1fE+TcbK6eFsbjROxq2OZSTy371zG9hLccroM0cZl8pBlnRpX sn3g7h5kZVzZ0VnOM3A8f29v0P9LE6r+p4oaWnBh9QuNq50hYPyA6CJNF73A+Shc AanKpb2pqswnk1CVhAzh+l7JhOR5RUVOMCv9mb3TwYQcE7qhMovHWhLmpFhlfO4a +AMn3f/774DKYGUigIzR45dhZFFkGvvb85uEP67GqgSv/zTISviuuc4A6Ze9ALs= =kOKh -----END PGP PRIVATE KEY BLOCK----- """ NON_DEFAULT_KEY_ID = "6A93393F50C5E6ACD3D6FB45B936212EDB4E14C0" def setUp(self): super(PorcelainGpgTestCase, self).setUp() self.gpg_dir = os.path.join(self.test_dir, "gpg") os.mkdir(self.gpg_dir, mode=0o700) self.addCleanup(shutil.rmtree, self.gpg_dir) self._old_gnupghome = os.environ.get("GNUPGHOME") os.environ["GNUPGHOME"] = self.gpg_dir if self._old_gnupghome is None: self.addCleanup(os.environ.__delitem__, "GNUPGHOME") else: self.addCleanup(os.environ.__setitem__, "GNUPGHOME", self._old_gnupghome) def import_default_key(self): subprocess.run( ["gpg", "--import"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, input=PorcelainGpgTestCase.DEFAULT_KEY, universal_newlines=True, ) def import_non_default_key(self): subprocess.run( ["gpg", "--import"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, input=PorcelainGpgTestCase.NON_DEFAULT_KEY, universal_newlines=True, ) class ArchiveTests(PorcelainTestCase): """Tests for the archive command.""" def test_simple(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]] ) self.repo.refs[b"refs/heads/master"] = c3.id out = BytesIO() err = BytesIO() porcelain.archive( self.repo.path, b"refs/heads/master", outstream=out, errstream=err ) self.assertEqual(b"", err.getvalue()) tf = tarfile.TarFile(fileobj=out) self.addCleanup(tf.close) self.assertEqual([], tf.getnames()) class UpdateServerInfoTests(PorcelainTestCase): def test_simple(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]] ) self.repo.refs[b"refs/heads/foo"] = c3.id porcelain.update_server_info(self.repo.path) self.assertTrue( os.path.exists(os.path.join(self.repo.controldir(), "info", "refs")) ) class CommitTests(PorcelainTestCase): def test_custom_author(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]] ) self.repo.refs[b"refs/heads/foo"] = c3.id sha = porcelain.commit( self.repo.path, message=b"Some message", author=b"Joe ", committer=b"Bob ", ) self.assertTrue(isinstance(sha, bytes)) self.assertEqual(len(sha), 40) def test_unicode(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]] ) self.repo.refs[b"refs/heads/foo"] = c3.id sha = porcelain.commit( self.repo.path, message="Some message", author="Joe ", committer="Bob ", ) self.assertTrue(isinstance(sha, bytes)) self.assertEqual(len(sha), 40) def test_no_verify(self): if os.name != "posix": self.skipTest("shell hook tests requires POSIX shell") self.assertTrue(os.path.exists("/bin/sh")) hooks_dir = os.path.join(self.repo.controldir(), "hooks") os.makedirs(hooks_dir, exist_ok=True) self.addCleanup(shutil.rmtree, hooks_dir) c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]] ) hook_fail = "#!/bin/sh\nexit 1" # hooks are executed in pre-commit, commit-msg order # test commit-msg failure first, then pre-commit failure, then # no_verify to skip both hooks commit_msg = os.path.join(hooks_dir, "commit-msg") with open(commit_msg, "w") as f: f.write(hook_fail) os.chmod(commit_msg, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) with self.assertRaises(CommitError): porcelain.commit( self.repo.path, message="Some message", author="Joe ", committer="Bob ", ) pre_commit = os.path.join(hooks_dir, "pre-commit") with open(pre_commit, "w") as f: f.write(hook_fail) os.chmod(pre_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) with self.assertRaises(CommitError): porcelain.commit( self.repo.path, message="Some message", author="Joe ", committer="Bob ", ) sha = porcelain.commit( self.repo.path, message="Some message", author="Joe ", committer="Bob ", no_verify=True, ) self.assertTrue(isinstance(sha, bytes)) self.assertEqual(len(sha), 40) class CleanTests(PorcelainTestCase): def put_files(self, tracked, ignored, untracked, empty_dirs): """Put the described files in the wd""" all_files = tracked | ignored | untracked for file_path in all_files: abs_path = os.path.join(self.repo.path, file_path) # File may need to be written in a dir that doesn't exist yet, so # create the parent dir(s) as necessary parent_dir = os.path.dirname(abs_path) try: os.makedirs(parent_dir) except FileExistsError: pass with open(abs_path, "w") as f: f.write("") with open(os.path.join(self.repo.path, ".gitignore"), "w") as f: f.writelines(ignored) for dir_path in empty_dirs: os.mkdir(os.path.join(self.repo.path, "empty_dir")) files_to_add = [os.path.join(self.repo.path, t) for t in tracked] porcelain.add(repo=self.repo.path, paths=files_to_add) porcelain.commit(repo=self.repo.path, message="init commit") def assert_wd(self, expected_paths): """Assert paths of files and dirs in wd are same as expected_paths""" control_dir_rel = os.path.relpath(self.repo._controldir, self.repo.path) # normalize paths to simplify comparison across platforms found_paths = { os.path.normpath(p) for p in flat_walk_dir(self.repo.path) if not p.split(os.sep)[0] == control_dir_rel } norm_expected_paths = {os.path.normpath(p) for p in expected_paths} self.assertEqual(found_paths, norm_expected_paths) def test_from_root(self): self.put_files( tracked={"tracked_file", "tracked_dir/tracked_file", ".gitignore"}, ignored={"ignored_file"}, untracked={ "untracked_file", "tracked_dir/untracked_dir/untracked_file", "untracked_dir/untracked_dir/untracked_file", }, empty_dirs={"empty_dir"}, ) porcelain.clean(repo=self.repo.path, target_dir=self.repo.path) self.assert_wd( { "tracked_file", "tracked_dir/tracked_file", ".gitignore", "ignored_file", "tracked_dir", } ) def test_from_subdir(self): self.put_files( tracked={"tracked_file", "tracked_dir/tracked_file", ".gitignore"}, ignored={"ignored_file"}, untracked={ "untracked_file", "tracked_dir/untracked_dir/untracked_file", "untracked_dir/untracked_dir/untracked_file", }, empty_dirs={"empty_dir"}, ) porcelain.clean( repo=self.repo, target_dir=os.path.join(self.repo.path, "untracked_dir"), ) self.assert_wd( { "tracked_file", "tracked_dir/tracked_file", ".gitignore", "ignored_file", "untracked_file", "tracked_dir/untracked_dir/untracked_file", "empty_dir", "untracked_dir", "tracked_dir", "tracked_dir/untracked_dir", } ) class CloneTests(PorcelainTestCase): def test_simple_local(self): f1_1 = make_object(Blob, data=b"f1") commit_spec = [[1], [2, 1], [3, 1, 2]] trees = { 1: [(b"f1", f1_1), (b"f2", f1_1)], 2: [(b"f1", f1_1), (b"f2", f1_1)], 3: [(b"f1", f1_1), (b"f2", f1_1)], } c1, c2, c3 = build_commit_graph(self.repo.object_store, commit_spec, trees) self.repo.refs[b"refs/heads/master"] = c3.id self.repo.refs[b"refs/tags/foo"] = c3.id target_path = tempfile.mkdtemp() errstream = BytesIO() self.addCleanup(shutil.rmtree, target_path) r = porcelain.clone( self.repo.path, target_path, checkout=False, errstream=errstream ) self.addCleanup(r.close) self.assertEqual(r.path, target_path) target_repo = Repo(target_path) self.assertEqual(0, len(target_repo.open_index())) self.assertEqual(c3.id, target_repo.refs[b"refs/tags/foo"]) self.assertTrue(b"f1" not in os.listdir(target_path)) self.assertTrue(b"f2" not in os.listdir(target_path)) c = r.get_config() encoded_path = self.repo.path if not isinstance(encoded_path, bytes): encoded_path = encoded_path.encode("utf-8") self.assertEqual(encoded_path, c.get((b"remote", b"origin"), b"url")) self.assertEqual( b"+refs/heads/*:refs/remotes/origin/*", c.get((b"remote", b"origin"), b"fetch"), ) def test_simple_local_with_checkout(self): f1_1 = make_object(Blob, data=b"f1") commit_spec = [[1], [2, 1], [3, 1, 2]] trees = { 1: [(b"f1", f1_1), (b"f2", f1_1)], 2: [(b"f1", f1_1), (b"f2", f1_1)], 3: [(b"f1", f1_1), (b"f2", f1_1)], } c1, c2, c3 = build_commit_graph(self.repo.object_store, commit_spec, trees) self.repo.refs[b"refs/heads/master"] = c3.id target_path = tempfile.mkdtemp() errstream = BytesIO() self.addCleanup(shutil.rmtree, target_path) with porcelain.clone( self.repo.path, target_path, checkout=True, errstream=errstream ) as r: self.assertEqual(r.path, target_path) with Repo(target_path) as r: self.assertEqual(r.head(), c3.id) self.assertTrue("f1" in os.listdir(target_path)) self.assertTrue("f2" in os.listdir(target_path)) def test_bare_local_with_checkout(self): f1_1 = make_object(Blob, data=b"f1") commit_spec = [[1], [2, 1], [3, 1, 2]] trees = { 1: [(b"f1", f1_1), (b"f2", f1_1)], 2: [(b"f1", f1_1), (b"f2", f1_1)], 3: [(b"f1", f1_1), (b"f2", f1_1)], } c1, c2, c3 = build_commit_graph(self.repo.object_store, commit_spec, trees) self.repo.refs[b"refs/heads/master"] = c3.id target_path = tempfile.mkdtemp() errstream = BytesIO() self.addCleanup(shutil.rmtree, target_path) with porcelain.clone( self.repo.path, target_path, bare=True, errstream=errstream ) as r: self.assertEqual(r.path, target_path) with Repo(target_path) as r: r.head() self.assertRaises(NoIndexPresent, r.open_index) self.assertFalse(b"f1" in os.listdir(target_path)) self.assertFalse(b"f2" in os.listdir(target_path)) def test_no_checkout_with_bare(self): f1_1 = make_object(Blob, data=b"f1") commit_spec = [[1]] trees = {1: [(b"f1", f1_1), (b"f2", f1_1)]} (c1,) = build_commit_graph(self.repo.object_store, commit_spec, trees) self.repo.refs[b"refs/heads/master"] = c1.id self.repo.refs[b"HEAD"] = c1.id target_path = tempfile.mkdtemp() errstream = BytesIO() self.addCleanup(shutil.rmtree, target_path) self.assertRaises( porcelain.Error, porcelain.clone, self.repo.path, target_path, checkout=True, bare=True, errstream=errstream, ) def test_no_head_no_checkout(self): f1_1 = make_object(Blob, data=b"f1") commit_spec = [[1]] trees = {1: [(b"f1", f1_1), (b"f2", f1_1)]} (c1,) = build_commit_graph(self.repo.object_store, commit_spec, trees) self.repo.refs[b"refs/heads/master"] = c1.id target_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, target_path) errstream = BytesIO() r = porcelain.clone( self.repo.path, target_path, checkout=True, errstream=errstream ) r.close() def test_no_head_no_checkout_outstream_errstream_autofallback(self): f1_1 = make_object(Blob, data=b"f1") commit_spec = [[1]] trees = {1: [(b"f1", f1_1), (b"f2", f1_1)]} (c1,) = build_commit_graph(self.repo.object_store, commit_spec, trees) self.repo.refs[b"refs/heads/master"] = c1.id target_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, target_path) errstream = porcelain.NoneStream() r = porcelain.clone( self.repo.path, target_path, checkout=True, errstream=errstream ) r.close() def test_source_broken(self): target_path = tempfile.mkdtemp() self.assertRaises(Exception, porcelain.clone, "/nonexistant/repo", target_path) self.assertFalse(os.path.exists(target_path)) def test_fetch_symref(self): f1_1 = make_object(Blob, data=b"f1") trees = {1: [(b"f1", f1_1), (b"f2", f1_1)]} [c1] = build_commit_graph(self.repo.object_store, [[1]], trees) self.repo.refs.set_symbolic_ref(b"HEAD", b"refs/heads/else") self.repo.refs[b"refs/heads/else"] = c1.id target_path = tempfile.mkdtemp() errstream = BytesIO() self.addCleanup(shutil.rmtree, target_path) r = porcelain.clone( self.repo.path, target_path, checkout=False, errstream=errstream ) self.addCleanup(r.close) self.assertEqual(r.path, target_path) target_repo = Repo(target_path) self.assertEqual(0, len(target_repo.open_index())) self.assertEqual(c1.id, target_repo.refs[b"refs/heads/else"]) self.assertEqual(c1.id, target_repo.refs[b"HEAD"]) self.assertEqual({b"HEAD": b"refs/heads/else"}, target_repo.refs.get_symrefs()) class InitTests(TestCase): def test_non_bare(self): repo_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, repo_dir) porcelain.init(repo_dir) def test_bare(self): repo_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, repo_dir) porcelain.init(repo_dir, bare=True) class AddTests(PorcelainTestCase): def test_add_default_paths(self): # create a file for initial commit fullpath = os.path.join(self.repo.path, "blah") with open(fullpath, "w") as f: f.write("\n") porcelain.add(repo=self.repo.path, paths=[fullpath]) porcelain.commit( repo=self.repo.path, message=b"test", author=b"test ", committer=b"test ", ) # Add a second test file and a file in a directory with open(os.path.join(self.repo.path, "foo"), "w") as f: f.write("\n") os.mkdir(os.path.join(self.repo.path, "adir")) with open(os.path.join(self.repo.path, "adir", "afile"), "w") as f: f.write("\n") cwd = os.getcwd() try: os.chdir(self.repo.path) self.assertEqual(set(["foo", "blah", "adir", ".git"]), set(os.listdir("."))) self.assertEqual( (["foo", os.path.join("adir", "afile")], set()), porcelain.add(self.repo.path), ) finally: os.chdir(cwd) # Check that foo was added and nothing in .git was modified index = self.repo.open_index() self.assertEqual(sorted(index), [b"adir/afile", b"blah", b"foo"]) def test_add_default_paths_subdir(self): os.mkdir(os.path.join(self.repo.path, "foo")) with open(os.path.join(self.repo.path, "blah"), "w") as f: f.write("\n") with open(os.path.join(self.repo.path, "foo", "blie"), "w") as f: f.write("\n") cwd = os.getcwd() try: os.chdir(os.path.join(self.repo.path, "foo")) porcelain.add(repo=self.repo.path) porcelain.commit( repo=self.repo.path, message=b"test", author=b"test ", committer=b"test ", ) finally: os.chdir(cwd) index = self.repo.open_index() self.assertEqual(sorted(index), [b"foo/blie"]) def test_add_file(self): fullpath = os.path.join(self.repo.path, "foo") with open(fullpath, "w") as f: f.write("BAR") porcelain.add(self.repo.path, paths=[fullpath]) self.assertIn(b"foo", self.repo.open_index()) def test_add_ignored(self): with open(os.path.join(self.repo.path, ".gitignore"), "w") as f: f.write("foo\nsubdir/") with open(os.path.join(self.repo.path, "foo"), "w") as f: f.write("BAR") with open(os.path.join(self.repo.path, "bar"), "w") as f: f.write("BAR") os.mkdir(os.path.join(self.repo.path, "subdir")) with open(os.path.join(self.repo.path, "subdir", "baz"), "w") as f: f.write("BAZ") (added, ignored) = porcelain.add( self.repo.path, paths=[ os.path.join(self.repo.path, "foo"), os.path.join(self.repo.path, "bar"), os.path.join(self.repo.path, "subdir"), ], ) self.assertIn(b"bar", self.repo.open_index()) self.assertEqual(set(["bar"]), set(added)) self.assertEqual(set(["foo", os.path.join("subdir", "")]), ignored) def test_add_file_absolute_path(self): # Absolute paths are (not yet) supported with open(os.path.join(self.repo.path, "foo"), "w") as f: f.write("BAR") porcelain.add(self.repo, paths=[os.path.join(self.repo.path, "foo")]) self.assertIn(b"foo", self.repo.open_index()) def test_add_not_in_repo(self): with open(os.path.join(self.test_dir, "foo"), "w") as f: f.write("BAR") self.assertRaises( ValueError, porcelain.add, self.repo, paths=[os.path.join(self.test_dir, "foo")], ) self.assertRaises( (ValueError, FileNotFoundError), porcelain.add, self.repo, paths=["../foo"], ) self.assertEqual([], list(self.repo.open_index())) def test_add_file_clrf_conversion(self): # Set the right configuration to the repo c = self.repo.get_config() c.set("core", "autocrlf", "input") c.write_to_path() # Add a file with CRLF line-ending fullpath = os.path.join(self.repo.path, "foo") with open(fullpath, "wb") as f: f.write(b"line1\r\nline2") porcelain.add(self.repo.path, paths=[fullpath]) # The line-endings should have been converted to LF index = self.repo.open_index() self.assertIn(b"foo", index) entry = index[b"foo"] blob = self.repo[entry.sha] self.assertEqual(blob.data, b"line1\nline2") class RemoveTests(PorcelainTestCase): def test_remove_file(self): fullpath = os.path.join(self.repo.path, "foo") with open(fullpath, "w") as f: f.write("BAR") porcelain.add(self.repo.path, paths=[fullpath]) porcelain.commit( repo=self.repo, message=b"test", author=b"test ", committer=b"test ", ) self.assertTrue(os.path.exists(os.path.join(self.repo.path, "foo"))) cwd = os.getcwd() try: os.chdir(self.repo.path) porcelain.remove(self.repo.path, paths=["foo"]) finally: os.chdir(cwd) self.assertFalse(os.path.exists(os.path.join(self.repo.path, "foo"))) def test_remove_file_staged(self): fullpath = os.path.join(self.repo.path, "foo") with open(fullpath, "w") as f: f.write("BAR") cwd = os.getcwd() try: os.chdir(self.repo.path) porcelain.add(self.repo.path, paths=[fullpath]) self.assertRaises(Exception, porcelain.rm, self.repo.path, paths=["foo"]) finally: os.chdir(cwd) def test_remove_file_removed_on_disk(self): fullpath = os.path.join(self.repo.path, "foo") with open(fullpath, "w") as f: f.write("BAR") porcelain.add(self.repo.path, paths=[fullpath]) cwd = os.getcwd() try: os.chdir(self.repo.path) os.remove(fullpath) porcelain.remove(self.repo.path, paths=["foo"]) finally: os.chdir(cwd) self.assertFalse(os.path.exists(os.path.join(self.repo.path, "foo"))) class LogTests(PorcelainTestCase): def test_simple(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]] ) self.repo.refs[b"HEAD"] = c3.id outstream = StringIO() porcelain.log(self.repo.path, outstream=outstream) self.assertEqual(3, outstream.getvalue().count("-" * 50)) def test_max_entries(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]] ) self.repo.refs[b"HEAD"] = c3.id outstream = StringIO() porcelain.log(self.repo.path, outstream=outstream, max_entries=1) self.assertEqual(1, outstream.getvalue().count("-" * 50)) class ShowTests(PorcelainTestCase): def test_nolist(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]] ) self.repo.refs[b"HEAD"] = c3.id outstream = StringIO() porcelain.show(self.repo.path, objects=c3.id, outstream=outstream) self.assertTrue(outstream.getvalue().startswith("-" * 50)) def test_simple(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]] ) self.repo.refs[b"HEAD"] = c3.id outstream = StringIO() porcelain.show(self.repo.path, objects=[c3.id], outstream=outstream) self.assertTrue(outstream.getvalue().startswith("-" * 50)) def test_blob(self): b = Blob.from_string(b"The Foo\n") self.repo.object_store.add_object(b) outstream = StringIO() porcelain.show(self.repo.path, objects=[b.id], outstream=outstream) self.assertEqual(outstream.getvalue(), "The Foo\n") def test_commit_no_parent(self): a = Blob.from_string(b"The Foo\n") ta = Tree() ta.add(b"somename", 0o100644, a.id) ca = make_commit(tree=ta.id) self.repo.object_store.add_objects([(a, None), (ta, None), (ca, None)]) outstream = StringIO() porcelain.show(self.repo.path, objects=[ca.id], outstream=outstream) self.assertMultiLineEqual( outstream.getvalue(), """\ -------------------------------------------------- commit: 344da06c1bb85901270b3e8875c988a027ec087d Author: Test Author Committer: Test Committer Date: Fri Jan 01 2010 00:00:00 +0000 Test message. diff --git a/somename b/somename new file mode 100644 index 0000000..ea5c7bf --- /dev/null +++ b/somename @@ -0,0 +1 @@ +The Foo """, ) def test_tag(self): a = Blob.from_string(b"The Foo\n") ta = Tree() ta.add(b"somename", 0o100644, a.id) ca = make_commit(tree=ta.id) self.repo.object_store.add_objects([(a, None), (ta, None), (ca, None)]) porcelain.tag_create( self.repo.path, b"tryme", b"foo ", b"bar", annotated=True, objectish=ca.id, tag_time=1552854211, tag_timezone=0, ) outstream = StringIO() porcelain.show(self.repo, objects=[b"refs/tags/tryme"], outstream=outstream) self.maxDiff = None self.assertMultiLineEqual( outstream.getvalue(), """\ Tagger: foo Date: Sun Mar 17 2019 20:23:31 +0000 bar -------------------------------------------------- commit: 344da06c1bb85901270b3e8875c988a027ec087d Author: Test Author Committer: Test Committer Date: Fri Jan 01 2010 00:00:00 +0000 Test message. diff --git a/somename b/somename new file mode 100644 index 0000000..ea5c7bf --- /dev/null +++ b/somename @@ -0,0 +1 @@ +The Foo """, ) def test_commit_with_change(self): a = Blob.from_string(b"The Foo\n") ta = Tree() ta.add(b"somename", 0o100644, a.id) ca = make_commit(tree=ta.id) b = Blob.from_string(b"The Bar\n") tb = Tree() tb.add(b"somename", 0o100644, b.id) cb = make_commit(tree=tb.id, parents=[ca.id]) self.repo.object_store.add_objects( [ (a, None), (b, None), (ta, None), (tb, None), (ca, None), (cb, None), ] ) outstream = StringIO() porcelain.show(self.repo.path, objects=[cb.id], outstream=outstream) self.assertMultiLineEqual( outstream.getvalue(), """\ -------------------------------------------------- commit: 2c6b6c9cb72c130956657e1fdae58e5b103744fa Author: Test Author Committer: Test Committer Date: Fri Jan 01 2010 00:00:00 +0000 Test message. diff --git a/somename b/somename index ea5c7bf..fd38bcb 100644 --- a/somename +++ b/somename @@ -1 +1 @@ -The Foo +The Bar """, ) class SymbolicRefTests(PorcelainTestCase): def test_set_wrong_symbolic_ref(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]] ) self.repo.refs[b"HEAD"] = c3.id self.assertRaises( porcelain.Error, porcelain.symbolic_ref, self.repo.path, b"foobar" ) def test_set_force_wrong_symbolic_ref(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]] ) self.repo.refs[b"HEAD"] = c3.id porcelain.symbolic_ref(self.repo.path, b"force_foobar", force=True) # test if we actually changed the file with self.repo.get_named_file("HEAD") as f: new_ref = f.read() self.assertEqual(new_ref, b"ref: refs/heads/force_foobar\n") def test_set_symbolic_ref(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]] ) self.repo.refs[b"HEAD"] = c3.id porcelain.symbolic_ref(self.repo.path, b"master") def test_set_symbolic_ref_other_than_master(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]], attrs=dict(refs="develop"), ) self.repo.refs[b"HEAD"] = c3.id self.repo.refs[b"refs/heads/develop"] = c3.id porcelain.symbolic_ref(self.repo.path, b"develop") # test if we actually changed the file with self.repo.get_named_file("HEAD") as f: new_ref = f.read() self.assertEqual(new_ref, b"ref: refs/heads/develop\n") class DiffTreeTests(PorcelainTestCase): def test_empty(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]] ) self.repo.refs[b"HEAD"] = c3.id outstream = BytesIO() porcelain.diff_tree(self.repo.path, c2.tree, c3.tree, outstream=outstream) self.assertEqual(outstream.getvalue(), b"") class CommitTreeTests(PorcelainTestCase): def test_simple(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]] ) b = Blob() b.data = b"foo the bar" t = Tree() t.add(b"somename", 0o100644, b.id) self.repo.object_store.add_object(t) self.repo.object_store.add_object(b) sha = porcelain.commit_tree( self.repo.path, t.id, message=b"Withcommit.", author=b"Joe ", committer=b"Jane ", ) self.assertTrue(isinstance(sha, bytes)) self.assertEqual(len(sha), 40) class RevListTests(PorcelainTestCase): def test_simple(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]] ) outstream = BytesIO() porcelain.rev_list(self.repo.path, [c3.id], outstream=outstream) self.assertEqual( c3.id + b"\n" + c2.id + b"\n" + c1.id + b"\n", outstream.getvalue() ) @skipIf(platform.python_implementation() == "PyPy" or sys.platform == "win32", "gpgme not easily available or supported on Windows and PyPy") class TagCreateSignTests(PorcelainGpgTestCase): def test_default_key(self): + import gpg + c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]] ) self.repo.refs[b"HEAD"] = c3.id cfg = self.repo.get_config() cfg.set(("user",), "signingKey", PorcelainGpgTestCase.DEFAULT_KEY_ID) self.import_default_key() porcelain.tag_create( self.repo.path, b"tryme", b"foo ", b"bar", annotated=True, sign=True, ) tags = self.repo.refs.as_dict(b"refs/tags") self.assertEqual(list(tags.keys()), [b"tryme"]) tag = self.repo[b"refs/tags/tryme"] self.assertTrue(isinstance(tag, Tag)) self.assertEqual(b"foo ", tag.tagger) self.assertEqual(b"bar\n", tag.message) self.assertLess(time.time() - tag.tag_time, 5) - # GPG Signatures aren't deterministic, so we can't do a static assertion. - # Instead we need to check the signature can be verified by git tag = self.repo[b'refs/tags/tryme'] - # TODO(jelmer): Remove calls to C git, and call tag.verify() instead - - # perhaps moving git call to compat testsuite? - subprocess.run( - ["git", "--git-dir={}".format(self.repo.controldir()), "tag", "-v", "tryme"], - check=True, - stderr=subprocess.DEVNULL, - stdout=subprocess.DEVNULL, + # GPG Signatures aren't deterministic, so we can't do a static assertion. + tag.verify() + tag.verify(keyids=[PorcelainGpgTestCase.DEFAULT_KEY_ID]) + + self.import_non_default_key() + self.assertRaises( + gpg.errors.MissingSignatures, + tag.verify, + keyids=[PorcelainGpgTestCase.NON_DEFAULT_KEY_ID], + ) + + tag._chunked_text = [b"bad data", tag._signature] + self.assertRaises( + gpg.errors.BadSignatures, + tag.verify, ) def test_non_default_key(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]] ) self.repo.refs[b"HEAD"] = c3.id cfg = self.repo.get_config() cfg.set(("user",), "signingKey", PorcelainGpgTestCase.DEFAULT_KEY_ID) self.import_non_default_key() porcelain.tag_create( self.repo.path, b"tryme", b"foo ", b"bar", annotated=True, sign=PorcelainGpgTestCase.NON_DEFAULT_KEY_ID, ) tags = self.repo.refs.as_dict(b"refs/tags") self.assertEqual(list(tags.keys()), [b"tryme"]) tag = self.repo[b"refs/tags/tryme"] self.assertTrue(isinstance(tag, Tag)) self.assertEqual(b"foo ", tag.tagger) self.assertEqual(b"bar\n", tag.message) self.assertLess(time.time() - tag.tag_time, 5) tag = self.repo[b'refs/tags/tryme'] # GPG Signatures aren't deterministic, so we can't do a static assertion. - # Instead we need to check the signature can be verified by git - # TODO(jelmer): Remove calls to C git, and call tag.verify() instead - - # perhaps moving git call to compat testsuite? - subprocess.run( - ["git", "--git-dir={}".format(self.repo.controldir()), "tag", "-v", "tryme"], - check=True, - stderr=subprocess.DEVNULL, - stdout=subprocess.DEVNULL, - ) + tag.verify() class TagCreateTests(PorcelainTestCase): def test_annotated(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]] ) self.repo.refs[b"HEAD"] = c3.id porcelain.tag_create( self.repo.path, b"tryme", b"foo ", b"bar", annotated=True, ) tags = self.repo.refs.as_dict(b"refs/tags") self.assertEqual(list(tags.keys()), [b"tryme"]) tag = self.repo[b"refs/tags/tryme"] self.assertTrue(isinstance(tag, Tag)) self.assertEqual(b"foo ", tag.tagger) self.assertEqual(b"bar\n", tag.message) self.assertLess(time.time() - tag.tag_time, 5) def test_unannotated(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]] ) self.repo.refs[b"HEAD"] = c3.id porcelain.tag_create(self.repo.path, b"tryme", annotated=False) tags = self.repo.refs.as_dict(b"refs/tags") self.assertEqual(list(tags.keys()), [b"tryme"]) self.repo[b"refs/tags/tryme"] self.assertEqual(list(tags.values()), [self.repo.head()]) def test_unannotated_unicode(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]] ) self.repo.refs[b"HEAD"] = c3.id porcelain.tag_create(self.repo.path, "tryme", annotated=False) tags = self.repo.refs.as_dict(b"refs/tags") self.assertEqual(list(tags.keys()), [b"tryme"]) self.repo[b"refs/tags/tryme"] self.assertEqual(list(tags.values()), [self.repo.head()]) class TagListTests(PorcelainTestCase): def test_empty(self): tags = porcelain.tag_list(self.repo.path) self.assertEqual([], tags) def test_simple(self): self.repo.refs[b"refs/tags/foo"] = b"aa" * 20 self.repo.refs[b"refs/tags/bar/bla"] = b"bb" * 20 tags = porcelain.tag_list(self.repo.path) self.assertEqual([b"bar/bla", b"foo"], tags) class TagDeleteTests(PorcelainTestCase): def test_simple(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo[b"HEAD"] = c1.id porcelain.tag_create(self.repo, b"foo") self.assertTrue(b"foo" in porcelain.tag_list(self.repo)) porcelain.tag_delete(self.repo, b"foo") self.assertFalse(b"foo" in porcelain.tag_list(self.repo)) class ResetTests(PorcelainTestCase): def test_hard_head(self): fullpath = os.path.join(self.repo.path, "foo") with open(fullpath, "w") as f: f.write("BAR") porcelain.add(self.repo.path, paths=[fullpath]) porcelain.commit( self.repo.path, message=b"Some message", committer=b"Jane ", author=b"John ", ) with open(os.path.join(self.repo.path, "foo"), "wb") as f: f.write(b"OOH") porcelain.reset(self.repo, "hard", b"HEAD") index = self.repo.open_index() changes = list( tree_changes( self.repo, index.commit(self.repo.object_store), self.repo[b"HEAD"].tree, ) ) self.assertEqual([], changes) def test_hard_commit(self): fullpath = os.path.join(self.repo.path, "foo") with open(fullpath, "w") as f: f.write("BAR") porcelain.add(self.repo.path, paths=[fullpath]) sha = porcelain.commit( self.repo.path, message=b"Some message", committer=b"Jane ", author=b"John ", ) with open(fullpath, "wb") as f: f.write(b"BAZ") porcelain.add(self.repo.path, paths=[fullpath]) porcelain.commit( self.repo.path, message=b"Some other message", committer=b"Jane ", author=b"John ", ) porcelain.reset(self.repo, "hard", sha) index = self.repo.open_index() changes = list( tree_changes( self.repo, index.commit(self.repo.object_store), self.repo[sha].tree, ) ) self.assertEqual([], changes) class PushTests(PorcelainTestCase): def test_simple(self): """ Basic test of porcelain push where self.repo is the remote. First clone the remote, commit a file to the clone, then push the changes back to the remote. """ outstream = BytesIO() errstream = BytesIO() porcelain.commit( repo=self.repo.path, message=b"init", author=b"author ", committer=b"committer ", ) # Setup target repo cloned from temp test repo clone_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, clone_path) target_repo = porcelain.clone( self.repo.path, target=clone_path, errstream=errstream ) try: self.assertEqual(target_repo[b"HEAD"], self.repo[b"HEAD"]) finally: target_repo.close() # create a second file to be pushed back to origin handle, fullpath = tempfile.mkstemp(dir=clone_path) os.close(handle) porcelain.add(repo=clone_path, paths=[fullpath]) porcelain.commit( repo=clone_path, message=b"push", author=b"author ", committer=b"committer ", ) # Setup a non-checked out branch in the remote refs_path = b"refs/heads/foo" new_id = self.repo[b"HEAD"].id self.assertNotEqual(new_id, ZERO_SHA) self.repo.refs[refs_path] = new_id # Push to the remote porcelain.push( clone_path, "origin", b"HEAD:" + refs_path, outstream=outstream, errstream=errstream, ) self.assertEqual( target_repo.refs[b"refs/remotes/origin/foo"], target_repo.refs[b"HEAD"], ) # Check that the target and source with Repo(clone_path) as r_clone: self.assertEqual( { b"HEAD": new_id, b"refs/heads/foo": r_clone[b"HEAD"].id, b"refs/heads/master": new_id, }, self.repo.get_refs(), ) self.assertEqual(r_clone[b"HEAD"].id, self.repo[refs_path].id) # Get the change in the target repo corresponding to the add # this will be in the foo branch. change = list( tree_changes( self.repo, self.repo[b"HEAD"].tree, self.repo[b"refs/heads/foo"].tree, ) )[0] self.assertEqual( os.path.basename(fullpath), change.new.path.decode("ascii") ) def test_local_missing(self): """Pushing a new branch.""" outstream = BytesIO() errstream = BytesIO() # Setup target repo cloned from temp test repo clone_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, clone_path) target_repo = porcelain.init(clone_path) target_repo.close() self.assertRaises( porcelain.Error, porcelain.push, self.repo, clone_path, b"HEAD:refs/heads/master", outstream=outstream, errstream=errstream, ) def test_new(self): """Pushing a new branch.""" outstream = BytesIO() errstream = BytesIO() # Setup target repo cloned from temp test repo clone_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, clone_path) target_repo = porcelain.init(clone_path) target_repo.close() # create a second file to be pushed back to origin handle, fullpath = tempfile.mkstemp(dir=clone_path) os.close(handle) porcelain.add(repo=clone_path, paths=[fullpath]) new_id = porcelain.commit( repo=self.repo, message=b"push", author=b"author ", committer=b"committer ", ) # Push to the remote porcelain.push( self.repo, clone_path, b"HEAD:refs/heads/master", outstream=outstream, errstream=errstream, ) with Repo(clone_path) as r_clone: self.assertEqual( { b"HEAD": new_id, b"refs/heads/master": new_id, }, r_clone.get_refs(), ) def test_delete(self): """Basic test of porcelain push, removing a branch.""" outstream = BytesIO() errstream = BytesIO() porcelain.commit( repo=self.repo.path, message=b"init", author=b"author ", committer=b"committer ", ) # Setup target repo cloned from temp test repo clone_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, clone_path) target_repo = porcelain.clone( self.repo.path, target=clone_path, errstream=errstream ) target_repo.close() # Setup a non-checked out branch in the remote refs_path = b"refs/heads/foo" new_id = self.repo[b"HEAD"].id self.assertNotEqual(new_id, ZERO_SHA) self.repo.refs[refs_path] = new_id # Push to the remote porcelain.push( clone_path, self.repo.path, b":" + refs_path, outstream=outstream, errstream=errstream, ) self.assertEqual( { b"HEAD": new_id, b"refs/heads/master": new_id, }, self.repo.get_refs(), ) def test_diverged(self): outstream = BytesIO() errstream = BytesIO() porcelain.commit( repo=self.repo.path, message=b"init", author=b"author ", committer=b"committer ", ) # Setup target repo cloned from temp test repo clone_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, clone_path) target_repo = porcelain.clone( self.repo.path, target=clone_path, errstream=errstream ) target_repo.close() remote_id = porcelain.commit( repo=self.repo.path, message=b"remote change", author=b"author ", committer=b"committer ", ) local_id = porcelain.commit( repo=clone_path, message=b"local change", author=b"author ", committer=b"committer ", ) outstream = BytesIO() errstream = BytesIO() # Push to the remote self.assertRaises( porcelain.DivergedBranches, porcelain.push, clone_path, self.repo.path, b"refs/heads/master", outstream=outstream, errstream=errstream, ) self.assertEqual( { b"HEAD": remote_id, b"refs/heads/master": remote_id, }, self.repo.get_refs(), ) self.assertEqual(b"", outstream.getvalue()) self.assertEqual(b"", errstream.getvalue()) outstream = BytesIO() errstream = BytesIO() # Push to the remote with --force porcelain.push( clone_path, self.repo.path, b"refs/heads/master", outstream=outstream, errstream=errstream, force=True, ) self.assertEqual( { b"HEAD": local_id, b"refs/heads/master": local_id, }, self.repo.get_refs(), ) self.assertEqual(b"", outstream.getvalue()) self.assertTrue(re.match(b"Push to .* successful.\n", errstream.getvalue())) class PullTests(PorcelainTestCase): def setUp(self): super(PullTests, self).setUp() # create a file for initial commit handle, fullpath = tempfile.mkstemp(dir=self.repo.path) os.close(handle) porcelain.add(repo=self.repo.path, paths=fullpath) porcelain.commit( repo=self.repo.path, message=b"test", author=b"test ", committer=b"test ", ) # Setup target repo self.target_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, self.target_path) target_repo = porcelain.clone( self.repo.path, target=self.target_path, errstream=BytesIO() ) target_repo.close() # create a second file to be pushed handle, fullpath = tempfile.mkstemp(dir=self.repo.path) os.close(handle) porcelain.add(repo=self.repo.path, paths=fullpath) porcelain.commit( repo=self.repo.path, message=b"test2", author=b"test2 ", committer=b"test2 ", ) self.assertIn(b"refs/heads/master", self.repo.refs) self.assertIn(b"refs/heads/master", target_repo.refs) def test_simple(self): outstream = BytesIO() errstream = BytesIO() # Pull changes into the cloned repo porcelain.pull( self.target_path, self.repo.path, b"refs/heads/master", outstream=outstream, errstream=errstream, ) # Check the target repo for pushed changes with Repo(self.target_path) as r: self.assertEqual(r[b"HEAD"].id, self.repo[b"HEAD"].id) def test_diverged(self): outstream = BytesIO() errstream = BytesIO() c3a = porcelain.commit( repo=self.target_path, message=b"test3a", author=b"test2 ", committer=b"test2 ", ) porcelain.commit( repo=self.repo.path, message=b"test3b", author=b"test2 ", committer=b"test2 ", ) # Pull changes into the cloned repo self.assertRaises( porcelain.DivergedBranches, porcelain.pull, self.target_path, self.repo.path, b"refs/heads/master", outstream=outstream, errstream=errstream, ) # Check the target repo for pushed changes with Repo(self.target_path) as r: self.assertEqual(r[b"refs/heads/master"].id, c3a) self.assertRaises( NotImplementedError, porcelain.pull, self.target_path, self.repo.path, b"refs/heads/master", outstream=outstream, errstream=errstream, fast_forward=False, ) # Check the target repo for pushed changes with Repo(self.target_path) as r: self.assertEqual(r[b"refs/heads/master"].id, c3a) def test_no_refspec(self): outstream = BytesIO() errstream = BytesIO() # Pull changes into the cloned repo porcelain.pull( self.target_path, self.repo.path, outstream=outstream, errstream=errstream, ) # Check the target repo for pushed changes with Repo(self.target_path) as r: self.assertEqual(r[b"HEAD"].id, self.repo[b"HEAD"].id) def test_no_remote_location(self): outstream = BytesIO() errstream = BytesIO() # Pull changes into the cloned repo porcelain.pull( self.target_path, refspecs=b"refs/heads/master", outstream=outstream, errstream=errstream, ) # Check the target repo for pushed changes with Repo(self.target_path) as r: self.assertEqual(r[b"HEAD"].id, self.repo[b"HEAD"].id) class StatusTests(PorcelainTestCase): def test_empty(self): results = porcelain.status(self.repo) self.assertEqual({"add": [], "delete": [], "modify": []}, results.staged) self.assertEqual([], results.unstaged) def test_status_base(self): """Integration test for `status` functionality.""" # Commit a dummy file then modify it fullpath = os.path.join(self.repo.path, "foo") with open(fullpath, "w") as f: f.write("origstuff") porcelain.add(repo=self.repo.path, paths=[fullpath]) porcelain.commit( repo=self.repo.path, message=b"test status", author=b"author ", committer=b"committer ", ) # modify access and modify time of path os.utime(fullpath, (0, 0)) with open(fullpath, "wb") as f: f.write(b"stuff") # Make a dummy file and stage it filename_add = "bar" fullpath = os.path.join(self.repo.path, filename_add) with open(fullpath, "w") as f: f.write("stuff") porcelain.add(repo=self.repo.path, paths=fullpath) results = porcelain.status(self.repo) self.assertEqual(results.staged["add"][0], filename_add.encode("ascii")) self.assertEqual(results.unstaged, [b"foo"]) def test_status_all(self): del_path = os.path.join(self.repo.path, "foo") mod_path = os.path.join(self.repo.path, "bar") add_path = os.path.join(self.repo.path, "baz") us_path = os.path.join(self.repo.path, "blye") ut_path = os.path.join(self.repo.path, "blyat") with open(del_path, "w") as f: f.write("origstuff") with open(mod_path, "w") as f: f.write("origstuff") with open(us_path, "w") as f: f.write("origstuff") porcelain.add(repo=self.repo.path, paths=[del_path, mod_path, us_path]) porcelain.commit( repo=self.repo.path, message=b"test status", author=b"author ", committer=b"committer ", ) porcelain.remove(self.repo.path, [del_path]) with open(add_path, "w") as f: f.write("origstuff") with open(mod_path, "w") as f: f.write("more_origstuff") with open(us_path, "w") as f: f.write("more_origstuff") porcelain.add(repo=self.repo.path, paths=[add_path, mod_path]) with open(us_path, "w") as f: f.write("\norigstuff") with open(ut_path, "w") as f: f.write("origstuff") results = porcelain.status(self.repo.path) self.assertDictEqual( {"add": [b"baz"], "delete": [b"foo"], "modify": [b"bar"]}, results.staged, ) self.assertListEqual(results.unstaged, [b"blye"]) self.assertListEqual(results.untracked, ["blyat"]) def test_status_crlf_mismatch(self): # First make a commit as if the file has been added on a Linux system # or with core.autocrlf=True file_path = os.path.join(self.repo.path, "crlf") with open(file_path, "wb") as f: f.write(b"line1\nline2") porcelain.add(repo=self.repo.path, paths=[file_path]) porcelain.commit( repo=self.repo.path, message=b"test status", author=b"author ", committer=b"committer ", ) # Then update the file as if it was created by CGit on a Windows # system with core.autocrlf=true with open(file_path, "wb") as f: f.write(b"line1\r\nline2") results = porcelain.status(self.repo) self.assertDictEqual({"add": [], "delete": [], "modify": []}, results.staged) self.assertListEqual(results.unstaged, [b"crlf"]) self.assertListEqual(results.untracked, []) def test_status_autocrlf_true(self): # First make a commit as if the file has been added on a Linux system # or with core.autocrlf=True file_path = os.path.join(self.repo.path, "crlf") with open(file_path, "wb") as f: f.write(b"line1\nline2") porcelain.add(repo=self.repo.path, paths=[file_path]) porcelain.commit( repo=self.repo.path, message=b"test status", author=b"author ", committer=b"committer ", ) # Then update the file as if it was created by CGit on a Windows # system with core.autocrlf=true with open(file_path, "wb") as f: f.write(b"line1\r\nline2") # TODO: It should be set automatically by looking at the configuration c = self.repo.get_config() c.set("core", "autocrlf", True) c.write_to_path() results = porcelain.status(self.repo) self.assertDictEqual({"add": [], "delete": [], "modify": []}, results.staged) self.assertListEqual(results.unstaged, []) self.assertListEqual(results.untracked, []) def test_status_autocrlf_input(self): # Commit existing file with CRLF file_path = os.path.join(self.repo.path, "crlf-exists") with open(file_path, "wb") as f: f.write(b"line1\r\nline2") porcelain.add(repo=self.repo.path, paths=[file_path]) porcelain.commit( repo=self.repo.path, message=b"test status", author=b"author ", committer=b"committer ", ) c = self.repo.get_config() c.set("core", "autocrlf", "input") c.write_to_path() # Add new (untracked) file file_path = os.path.join(self.repo.path, "crlf-new") with open(file_path, "wb") as f: f.write(b"line1\r\nline2") porcelain.add(repo=self.repo.path, paths=[file_path]) results = porcelain.status(self.repo) self.assertDictEqual({"add": [b"crlf-new"], "delete": [], "modify": []}, results.staged) self.assertListEqual(results.unstaged, []) self.assertListEqual(results.untracked, []) def test_get_tree_changes_add(self): """Unit test for get_tree_changes add.""" # Make a dummy file, stage filename = "bar" fullpath = os.path.join(self.repo.path, filename) with open(fullpath, "w") as f: f.write("stuff") porcelain.add(repo=self.repo.path, paths=fullpath) porcelain.commit( repo=self.repo.path, message=b"test status", author=b"author ", committer=b"committer ", ) filename = "foo" fullpath = os.path.join(self.repo.path, filename) with open(fullpath, "w") as f: f.write("stuff") porcelain.add(repo=self.repo.path, paths=fullpath) changes = porcelain.get_tree_changes(self.repo.path) self.assertEqual(changes["add"][0], filename.encode("ascii")) self.assertEqual(len(changes["add"]), 1) self.assertEqual(len(changes["modify"]), 0) self.assertEqual(len(changes["delete"]), 0) def test_get_tree_changes_modify(self): """Unit test for get_tree_changes modify.""" # Make a dummy file, stage, commit, modify filename = "foo" fullpath = os.path.join(self.repo.path, filename) with open(fullpath, "w") as f: f.write("stuff") porcelain.add(repo=self.repo.path, paths=fullpath) porcelain.commit( repo=self.repo.path, message=b"test status", author=b"author ", committer=b"committer ", ) with open(fullpath, "w") as f: f.write("otherstuff") porcelain.add(repo=self.repo.path, paths=fullpath) changes = porcelain.get_tree_changes(self.repo.path) self.assertEqual(changes["modify"][0], filename.encode("ascii")) self.assertEqual(len(changes["add"]), 0) self.assertEqual(len(changes["modify"]), 1) self.assertEqual(len(changes["delete"]), 0) def test_get_tree_changes_delete(self): """Unit test for get_tree_changes delete.""" # Make a dummy file, stage, commit, remove filename = "foo" fullpath = os.path.join(self.repo.path, filename) with open(fullpath, "w") as f: f.write("stuff") porcelain.add(repo=self.repo.path, paths=fullpath) porcelain.commit( repo=self.repo.path, message=b"test status", author=b"author ", committer=b"committer ", ) cwd = os.getcwd() try: os.chdir(self.repo.path) porcelain.remove(repo=self.repo.path, paths=[filename]) finally: os.chdir(cwd) changes = porcelain.get_tree_changes(self.repo.path) self.assertEqual(changes["delete"][0], filename.encode("ascii")) self.assertEqual(len(changes["add"]), 0) self.assertEqual(len(changes["modify"]), 0) self.assertEqual(len(changes["delete"]), 1) def test_get_untracked_paths(self): with open(os.path.join(self.repo.path, ".gitignore"), "w") as f: f.write("ignored\n") with open(os.path.join(self.repo.path, "ignored"), "w") as f: f.write("blah\n") with open(os.path.join(self.repo.path, "notignored"), "w") as f: f.write("blah\n") os.symlink( os.path.join(self.repo.path, os.pardir, "external_target"), os.path.join(self.repo.path, "link"), ) self.assertEqual( set(["ignored", "notignored", ".gitignore", "link"]), set( porcelain.get_untracked_paths( self.repo.path, self.repo.path, self.repo.open_index() ) ), ) self.assertEqual( set([".gitignore", "notignored", "link"]), set(porcelain.status(self.repo).untracked), ) self.assertEqual( set([".gitignore", "notignored", "ignored", "link"]), set(porcelain.status(self.repo, ignored=True).untracked), ) def test_get_untracked_paths_subrepo(self): with open(os.path.join(self.repo.path, ".gitignore"), "w") as f: f.write("nested/\n") with open(os.path.join(self.repo.path, "notignored"), "w") as f: f.write("blah\n") subrepo = Repo.init(os.path.join(self.repo.path, "nested"), mkdir=True) with open(os.path.join(subrepo.path, "ignored"), "w") as f: f.write("bleep\n") with open(os.path.join(subrepo.path, "with"), "w") as f: f.write("bloop\n") with open(os.path.join(subrepo.path, "manager"), "w") as f: f.write("blop\n") self.assertEqual( set([".gitignore", "notignored", os.path.join("nested", "")]), set( porcelain.get_untracked_paths( self.repo.path, self.repo.path, self.repo.open_index() ) ), ) self.assertEqual( set([".gitignore", "notignored"]), set( porcelain.get_untracked_paths( self.repo.path, self.repo.path, self.repo.open_index(), exclude_ignored=True, ) ), ) self.assertEqual( set(["ignored", "with", "manager"]), set( porcelain.get_untracked_paths( subrepo.path, subrepo.path, subrepo.open_index() ) ), ) self.assertEqual( set(), set( porcelain.get_untracked_paths( subrepo.path, self.repo.path, self.repo.open_index(), ) ), ) self.assertEqual( set([os.path.join('nested', 'ignored'), os.path.join('nested', 'with'), os.path.join('nested', 'manager')]), set( porcelain.get_untracked_paths( self.repo.path, subrepo.path, self.repo.open_index(), ) ), ) def test_get_untracked_paths_subdir(self): with open(os.path.join(self.repo.path, ".gitignore"), "w") as f: f.write("subdir/\nignored") with open(os.path.join(self.repo.path, "notignored"), "w") as f: f.write("blah\n") os.mkdir(os.path.join(self.repo.path, "subdir")) with open(os.path.join(self.repo.path, "ignored"), "w") as f: f.write("foo") with open(os.path.join(self.repo.path, "subdir", "ignored"), "w") as f: f.write("foo") self.assertEqual( set( [ ".gitignore", "notignored", "ignored", os.path.join("subdir", ""), ] ), set( porcelain.get_untracked_paths( self.repo.path, self.repo.path, self.repo.open_index(), ) ) ) self.assertEqual( set([".gitignore", "notignored"]), set( porcelain.get_untracked_paths( self.repo.path, self.repo.path, self.repo.open_index(), exclude_ignored=True, ) ) ) # TODO(jelmer): Add test for dulwich.porcelain.daemon class UploadPackTests(PorcelainTestCase): """Tests for upload_pack.""" def test_upload_pack(self): outf = BytesIO() exitcode = porcelain.upload_pack(self.repo.path, BytesIO(b"0000"), outf) outlines = outf.getvalue().splitlines() self.assertEqual([b"0000"], outlines) self.assertEqual(0, exitcode) class ReceivePackTests(PorcelainTestCase): """Tests for receive_pack.""" def test_receive_pack(self): filename = "foo" fullpath = os.path.join(self.repo.path, filename) with open(fullpath, "w") as f: f.write("stuff") porcelain.add(repo=self.repo.path, paths=fullpath) self.repo.do_commit( message=b"test status", author=b"author ", committer=b"committer ", author_timestamp=1402354300, commit_timestamp=1402354300, author_timezone=0, commit_timezone=0, ) outf = BytesIO() exitcode = porcelain.receive_pack(self.repo.path, BytesIO(b"0000"), outf) outlines = outf.getvalue().splitlines() self.assertEqual( [ b"0091319b56ce3aee2d489f759736a79cc552c9bb86d9 HEAD\x00 report-status " # noqa: E501 b"delete-refs quiet ofs-delta side-band-64k " b"no-done symref=HEAD:refs/heads/master", b"003f319b56ce3aee2d489f759736a79cc552c9bb86d9 refs/heads/master", b"0000", ], outlines, ) self.assertEqual(0, exitcode) class BranchListTests(PorcelainTestCase): def test_standard(self): self.assertEqual(set([]), set(porcelain.branch_list(self.repo))) def test_new_branch(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo[b"HEAD"] = c1.id porcelain.branch_create(self.repo, b"foo") self.assertEqual( set([b"master", b"foo"]), set(porcelain.branch_list(self.repo)) ) class BranchCreateTests(PorcelainTestCase): def test_branch_exists(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo[b"HEAD"] = c1.id porcelain.branch_create(self.repo, b"foo") self.assertRaises(porcelain.Error, porcelain.branch_create, self.repo, b"foo") porcelain.branch_create(self.repo, b"foo", force=True) def test_new_branch(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo[b"HEAD"] = c1.id porcelain.branch_create(self.repo, b"foo") self.assertEqual( set([b"master", b"foo"]), set(porcelain.branch_list(self.repo)) ) class BranchDeleteTests(PorcelainTestCase): def test_simple(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo[b"HEAD"] = c1.id porcelain.branch_create(self.repo, b"foo") self.assertTrue(b"foo" in porcelain.branch_list(self.repo)) porcelain.branch_delete(self.repo, b"foo") self.assertFalse(b"foo" in porcelain.branch_list(self.repo)) def test_simple_unicode(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo[b"HEAD"] = c1.id porcelain.branch_create(self.repo, "foo") self.assertTrue(b"foo" in porcelain.branch_list(self.repo)) porcelain.branch_delete(self.repo, "foo") self.assertFalse(b"foo" in porcelain.branch_list(self.repo)) class FetchTests(PorcelainTestCase): def test_simple(self): outstream = BytesIO() errstream = BytesIO() # create a file for initial commit handle, fullpath = tempfile.mkstemp(dir=self.repo.path) os.close(handle) porcelain.add(repo=self.repo.path, paths=fullpath) porcelain.commit( repo=self.repo.path, message=b"test", author=b"test ", committer=b"test ", ) # Setup target repo target_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, target_path) target_repo = porcelain.clone( self.repo.path, target=target_path, errstream=errstream ) # create a second file to be pushed handle, fullpath = tempfile.mkstemp(dir=self.repo.path) os.close(handle) porcelain.add(repo=self.repo.path, paths=fullpath) porcelain.commit( repo=self.repo.path, message=b"test2", author=b"test2 ", committer=b"test2 ", ) self.assertFalse(self.repo[b"HEAD"].id in target_repo) target_repo.close() # Fetch changes into the cloned repo porcelain.fetch(target_path, "origin", outstream=outstream, errstream=errstream) # Assert that fetch updated the local image of the remote self.assert_correct_remote_refs(target_repo.get_refs(), self.repo.get_refs()) # Check the target repo for pushed changes with Repo(target_path) as r: self.assertTrue(self.repo[b"HEAD"].id in r) def test_with_remote_name(self): remote_name = "origin" outstream = BytesIO() errstream = BytesIO() # create a file for initial commit handle, fullpath = tempfile.mkstemp(dir=self.repo.path) os.close(handle) porcelain.add(repo=self.repo.path, paths=fullpath) porcelain.commit( repo=self.repo.path, message=b"test", author=b"test ", committer=b"test ", ) # Setup target repo target_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, target_path) target_repo = porcelain.clone( self.repo.path, target=target_path, errstream=errstream ) # Capture current refs target_refs = target_repo.get_refs() # create a second file to be pushed handle, fullpath = tempfile.mkstemp(dir=self.repo.path) os.close(handle) porcelain.add(repo=self.repo.path, paths=fullpath) porcelain.commit( repo=self.repo.path, message=b"test2", author=b"test2 ", committer=b"test2 ", ) self.assertFalse(self.repo[b"HEAD"].id in target_repo) target_config = target_repo.get_config() target_config.set( (b"remote", remote_name.encode()), b"url", self.repo.path.encode() ) target_repo.close() # Fetch changes into the cloned repo porcelain.fetch( target_path, remote_name, outstream=outstream, errstream=errstream ) # Assert that fetch updated the local image of the remote self.assert_correct_remote_refs(target_repo.get_refs(), self.repo.get_refs()) # Check the target repo for pushed changes, as well as updates # for the refs with Repo(target_path) as r: self.assertTrue(self.repo[b"HEAD"].id in r) self.assertNotEqual(self.repo.get_refs(), target_refs) def assert_correct_remote_refs( self, local_refs, remote_refs, remote_name=b"origin" ): """Assert that known remote refs corresponds to actual remote refs.""" local_ref_prefix = b"refs/heads" remote_ref_prefix = b"refs/remotes/" + remote_name locally_known_remote_refs = { k[len(remote_ref_prefix) + 1 :]: v for k, v in local_refs.items() if k.startswith(remote_ref_prefix) } normalized_remote_refs = { k[len(local_ref_prefix) + 1 :]: v for k, v in remote_refs.items() if k.startswith(local_ref_prefix) } self.assertEqual(locally_known_remote_refs, normalized_remote_refs) class RepackTests(PorcelainTestCase): def test_empty(self): porcelain.repack(self.repo) def test_simple(self): handle, fullpath = tempfile.mkstemp(dir=self.repo.path) os.close(handle) porcelain.add(repo=self.repo.path, paths=fullpath) porcelain.repack(self.repo) class LsTreeTests(PorcelainTestCase): def test_empty(self): porcelain.commit( repo=self.repo.path, message=b"test status", author=b"author ", committer=b"committer ", ) f = StringIO() porcelain.ls_tree(self.repo, b"HEAD", outstream=f) self.assertEqual(f.getvalue(), "") def test_simple(self): # Commit a dummy file then modify it fullpath = os.path.join(self.repo.path, "foo") with open(fullpath, "w") as f: f.write("origstuff") porcelain.add(repo=self.repo.path, paths=[fullpath]) porcelain.commit( repo=self.repo.path, message=b"test status", author=b"author ", committer=b"committer ", ) f = StringIO() porcelain.ls_tree(self.repo, b"HEAD", outstream=f) self.assertEqual( f.getvalue(), "100644 blob 8b82634d7eae019850bb883f06abf428c58bc9aa\tfoo\n", ) def test_recursive(self): # Create a directory then write a dummy file in it dirpath = os.path.join(self.repo.path, "adir") filepath = os.path.join(dirpath, "afile") os.mkdir(dirpath) with open(filepath, "w") as f: f.write("origstuff") porcelain.add(repo=self.repo.path, paths=[filepath]) porcelain.commit( repo=self.repo.path, message=b"test status", author=b"author ", committer=b"committer ", ) f = StringIO() porcelain.ls_tree(self.repo, b"HEAD", outstream=f) self.assertEqual( f.getvalue(), "40000 tree b145cc69a5e17693e24d8a7be0016ed8075de66d\tadir\n", ) f = StringIO() porcelain.ls_tree(self.repo, b"HEAD", outstream=f, recursive=True) self.assertEqual( f.getvalue(), "40000 tree b145cc69a5e17693e24d8a7be0016ed8075de66d\tadir\n" "100644 blob 8b82634d7eae019850bb883f06abf428c58bc9aa\tadir" "/afile\n", ) class LsRemoteTests(PorcelainTestCase): def test_empty(self): self.assertEqual({}, porcelain.ls_remote(self.repo.path)) def test_some(self): cid = porcelain.commit( repo=self.repo.path, message=b"test status", author=b"author ", committer=b"committer ", ) self.assertEqual( {b"refs/heads/master": cid, b"HEAD": cid}, porcelain.ls_remote(self.repo.path), ) class LsFilesTests(PorcelainTestCase): def test_empty(self): self.assertEqual([], list(porcelain.ls_files(self.repo))) def test_simple(self): # Commit a dummy file then modify it fullpath = os.path.join(self.repo.path, "foo") with open(fullpath, "w") as f: f.write("origstuff") porcelain.add(repo=self.repo.path, paths=[fullpath]) self.assertEqual([b"foo"], list(porcelain.ls_files(self.repo))) class RemoteAddTests(PorcelainTestCase): def test_new(self): porcelain.remote_add(self.repo, "jelmer", "git://jelmer.uk/code/dulwich") c = self.repo.get_config() self.assertEqual( c.get((b"remote", b"jelmer"), b"url"), b"git://jelmer.uk/code/dulwich", ) def test_exists(self): porcelain.remote_add(self.repo, "jelmer", "git://jelmer.uk/code/dulwich") self.assertRaises( porcelain.RemoteExists, porcelain.remote_add, self.repo, "jelmer", "git://jelmer.uk/code/dulwich", ) class CheckIgnoreTests(PorcelainTestCase): def test_check_ignored(self): with open(os.path.join(self.repo.path, ".gitignore"), "w") as f: f.write("foo") foo_path = os.path.join(self.repo.path, "foo") with open(foo_path, "w") as f: f.write("BAR") bar_path = os.path.join(self.repo.path, "bar") with open(bar_path, "w") as f: f.write("BAR") self.assertEqual(["foo"], list(porcelain.check_ignore(self.repo, [foo_path]))) self.assertEqual([], list(porcelain.check_ignore(self.repo, [bar_path]))) def test_check_added_abs(self): path = os.path.join(self.repo.path, "foo") with open(path, "w") as f: f.write("BAR") self.repo.stage(["foo"]) with open(os.path.join(self.repo.path, ".gitignore"), "w") as f: f.write("foo\n") self.assertEqual([], list(porcelain.check_ignore(self.repo, [path]))) self.assertEqual( ["foo"], list(porcelain.check_ignore(self.repo, [path], no_index=True)), ) def test_check_added_rel(self): with open(os.path.join(self.repo.path, "foo"), "w") as f: f.write("BAR") self.repo.stage(["foo"]) with open(os.path.join(self.repo.path, ".gitignore"), "w") as f: f.write("foo\n") cwd = os.getcwd() os.mkdir(os.path.join(self.repo.path, "bar")) os.chdir(os.path.join(self.repo.path, "bar")) try: self.assertEqual(list(porcelain.check_ignore(self.repo, ["../foo"])), []) self.assertEqual( ["../foo"], list(porcelain.check_ignore(self.repo, ["../foo"], no_index=True)), ) finally: os.chdir(cwd) class UpdateHeadTests(PorcelainTestCase): def test_set_to_branch(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo.refs[b"refs/heads/blah"] = c1.id porcelain.update_head(self.repo, "blah") self.assertEqual(c1.id, self.repo.head()) self.assertEqual(b"ref: refs/heads/blah", self.repo.refs.read_ref(b"HEAD")) def test_set_to_branch_detached(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo.refs[b"refs/heads/blah"] = c1.id porcelain.update_head(self.repo, "blah", detached=True) self.assertEqual(c1.id, self.repo.head()) self.assertEqual(c1.id, self.repo.refs.read_ref(b"HEAD")) def test_set_to_commit_detached(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo.refs[b"refs/heads/blah"] = c1.id porcelain.update_head(self.repo, c1.id, detached=True) self.assertEqual(c1.id, self.repo.head()) self.assertEqual(c1.id, self.repo.refs.read_ref(b"HEAD")) def test_set_new_branch(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo.refs[b"refs/heads/blah"] = c1.id porcelain.update_head(self.repo, "blah", new_branch="bar") self.assertEqual(c1.id, self.repo.head()) self.assertEqual(b"ref: refs/heads/bar", self.repo.refs.read_ref(b"HEAD")) class MailmapTests(PorcelainTestCase): def test_no_mailmap(self): self.assertEqual( b"Jelmer Vernooij ", porcelain.check_mailmap(self.repo, b"Jelmer Vernooij "), ) def test_mailmap_lookup(self): with open(os.path.join(self.repo.path, ".mailmap"), "wb") as f: f.write( b"""\ Jelmer Vernooij """ ) self.assertEqual( b"Jelmer Vernooij ", porcelain.check_mailmap(self.repo, b"Jelmer Vernooij "), ) class FsckTests(PorcelainTestCase): def test_none(self): self.assertEqual([], list(porcelain.fsck(self.repo))) def test_git_dir(self): obj = Tree() a = Blob() a.data = b"foo" obj.add(b".git", 0o100644, a.id) self.repo.object_store.add_objects([(a, None), (obj, None)]) self.assertEqual( [(obj.id, "invalid name .git")], [(sha, str(e)) for (sha, e) in porcelain.fsck(self.repo)], ) class DescribeTests(PorcelainTestCase): def test_no_commits(self): self.assertRaises(KeyError, porcelain.describe, self.repo.path) def test_single_commit(self): fullpath = os.path.join(self.repo.path, "foo") with open(fullpath, "w") as f: f.write("BAR") porcelain.add(repo=self.repo.path, paths=[fullpath]) sha = porcelain.commit( self.repo.path, message=b"Some message", author=b"Joe ", committer=b"Bob ", ) self.assertEqual( "g{}".format(sha[:7].decode("ascii")), porcelain.describe(self.repo.path), ) def test_tag(self): fullpath = os.path.join(self.repo.path, "foo") with open(fullpath, "w") as f: f.write("BAR") porcelain.add(repo=self.repo.path, paths=[fullpath]) porcelain.commit( self.repo.path, message=b"Some message", author=b"Joe ", committer=b"Bob ", ) porcelain.tag_create( self.repo.path, b"tryme", b"foo ", b"bar", annotated=True, ) self.assertEqual("tryme", porcelain.describe(self.repo.path)) def test_tag_and_commit(self): fullpath = os.path.join(self.repo.path, "foo") with open(fullpath, "w") as f: f.write("BAR") porcelain.add(repo=self.repo.path, paths=[fullpath]) porcelain.commit( self.repo.path, message=b"Some message", author=b"Joe ", committer=b"Bob ", ) porcelain.tag_create( self.repo.path, b"tryme", b"foo ", b"bar", annotated=True, ) with open(fullpath, "w") as f: f.write("BAR2") porcelain.add(repo=self.repo.path, paths=[fullpath]) sha = porcelain.commit( self.repo.path, message=b"Some message", author=b"Joe ", committer=b"Bob ", ) self.assertEqual( "tryme-1-g{}".format(sha[:7].decode("ascii")), porcelain.describe(self.repo.path), ) class PathToTreeTests(PorcelainTestCase): def setUp(self): super(PathToTreeTests, self).setUp() self.fp = os.path.join(self.test_dir, "bar") with open(self.fp, "w") as f: f.write("something") oldcwd = os.getcwd() self.addCleanup(os.chdir, oldcwd) os.chdir(self.test_dir) def test_path_to_tree_path_base(self): self.assertEqual(b"bar", porcelain.path_to_tree_path(self.test_dir, self.fp)) self.assertEqual(b"bar", porcelain.path_to_tree_path(".", "./bar")) self.assertEqual(b"bar", porcelain.path_to_tree_path(".", "bar")) cwd = os.getcwd() self.assertEqual( b"bar", porcelain.path_to_tree_path(".", os.path.join(cwd, "bar")) ) self.assertEqual(b"bar", porcelain.path_to_tree_path(cwd, "bar")) def test_path_to_tree_path_syntax(self): self.assertEqual(b"bar", porcelain.path_to_tree_path(".", "./bar")) def test_path_to_tree_path_error(self): with self.assertRaises(ValueError): with tempfile.TemporaryDirectory() as od: porcelain.path_to_tree_path(od, self.fp) def test_path_to_tree_path_rel(self): cwd = os.getcwd() os.mkdir(os.path.join(self.repo.path, "foo")) os.mkdir(os.path.join(self.repo.path, "foo/bar")) try: os.chdir(os.path.join(self.repo.path, "foo/bar")) with open("baz", "w") as f: f.write("contents") self.assertEqual(b"bar/baz", porcelain.path_to_tree_path("..", "baz")) self.assertEqual( b"bar/baz", porcelain.path_to_tree_path( os.path.join(os.getcwd(), ".."), os.path.join(os.getcwd(), "baz"), ), ) self.assertEqual( b"bar/baz", porcelain.path_to_tree_path("..", os.path.join(os.getcwd(), "baz")), ) self.assertEqual( b"bar/baz", porcelain.path_to_tree_path(os.path.join(os.getcwd(), ".."), "baz"), ) finally: os.chdir(cwd) class GetObjectByPathTests(PorcelainTestCase): def test_simple(self): fullpath = os.path.join(self.repo.path, "foo") with open(fullpath, "w") as f: f.write("BAR") porcelain.add(repo=self.repo.path, paths=[fullpath]) porcelain.commit( self.repo.path, message=b"Some message", author=b"Joe ", committer=b"Bob ", ) self.assertEqual(b"BAR", porcelain.get_object_by_path(self.repo, "foo").data) self.assertEqual(b"BAR", porcelain.get_object_by_path(self.repo, b"foo").data) def test_encoding(self): fullpath = os.path.join(self.repo.path, "foo") with open(fullpath, "w") as f: f.write("BAR") porcelain.add(repo=self.repo.path, paths=[fullpath]) porcelain.commit( self.repo.path, message=b"Some message", author=b"Joe ", committer=b"Bob ", encoding=b"utf-8", ) self.assertEqual(b"BAR", porcelain.get_object_by_path(self.repo, "foo").data) self.assertEqual(b"BAR", porcelain.get_object_by_path(self.repo, b"foo").data) def test_missing(self): self.assertRaises(KeyError, porcelain.get_object_by_path, self.repo, "foo") class WriteTreeTests(PorcelainTestCase): def test_simple(self): fullpath = os.path.join(self.repo.path, "foo") with open(fullpath, "w") as f: f.write("BAR") porcelain.add(repo=self.repo.path, paths=[fullpath]) self.assertEqual( b"d2092c8a9f311f0311083bf8d177f2ca0ab5b241", porcelain.write_tree(self.repo), ) class ActiveBranchTests(PorcelainTestCase): def test_simple(self): self.assertEqual(b"master", porcelain.active_branch(self.repo))